Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add config to process messages concurrently #2026

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

sairanjit
Copy link
Contributor

  • Add support to process the messages concurrently

cc: @Zzocker : Taking it further from #1276

@TimoGlastra From the earlier conversations how can we handle the queue from inbound transport?

  • Can we use the event emitter ?

Copy link

changeset-bot bot commented Sep 9, 2024

⚠️ No Changeset found

Latest commit: fe7c5db

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@sairanjit sairanjit marked this pull request as ready for review September 13, 2024 08:15
@sairanjit sairanjit force-pushed the feat/concurrent-message-processing branch from da1b2a1 to 3281bc5 Compare September 13, 2024 08:16
Copy link
Contributor

@TimoGlastra TimoGlastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good updates! Thanks

packages/core/src/agent/AgentConfig.ts Outdated Show resolved Hide resolved
packages/core/src/types.ts Outdated Show resolved Hide resolved
Comment on lines 88 to 89
filter((e) => e.payload.message.id === encryptedMessage.id),
filter((e) => e.payload.message.type === encryptedMessage.type),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an encrypted message does not have an id and type right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this

Copy link
Contributor Author

@sairanjit sairanjit Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • But here there are no common properties I can map to. Can you suggest me a way to add a guard here or filter specific encrypted message?

packages/node/src/transport/HttpInboundTransport.ts Outdated Show resolved Hide resolved
Zzocker and others added 5 commits September 13, 2024 16:35
…ncurrently

Signed-off-by: Pritam Singh <pkspritam16@gmail.com>
Signed-off-by: Sai Ranjit Tummalapalli <sairanjit.tummalapalli@ayanworks.com>
Signed-off-by: Sai Ranjit Tummalapalli <sairanjit.tummalapalli@ayanworks.com>
Signed-off-by: Sai Ranjit Tummalapalli <sairanjit.tummalapalli@ayanworks.com>
Signed-off-by: Sai Ranjit Tummalapalli <sairanjit.tummalapalli@ayanworks.com>
@sairanjit sairanjit force-pushed the feat/concurrent-message-processing branch from 05caec4 to 9a37419 Compare September 13, 2024 11:06
Signed-off-by: Sai Ranjit Tummalapalli <sairanjit.tummalapalli@ayanworks.com>
Signed-off-by: Sai Ranjit Tummalapalli <sairanjit.tummalapalli@ayanworks.com>
observable
.pipe(
filter((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => deepEquality(e.payload.encryptedMessage, encryptedMessage)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just check for equality right (===)? AS the object will be passed around? Using deep object equality can be expensive

@@ -64,7 +65,7 @@ class Dispatcher {
await next()
}

public async dispatch(messageContext: InboundMessageContext): Promise<void> {
public async dispatch(messageContext: InboundMessageContext, encryptedMessage?: EncryptedMessage): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we add encryptedMessage to the InboundMessageContext ( as it's related to the incoming message)

filter((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => deepEquality(e.payload.encryptedMessage, encryptedMessage)),
timeout({
first: 10000, // timeout after 10 seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's less relevant here to await the processing? As we don't do anything afterwards

filter((e) => e.type === AgentEventTypes.AgentMessageProcessed),
filter((e) => deepEquality(e.payload.encryptedMessage, encryptedMessage)),
timeout({
first: 10000, // timeout after 10 seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this as a param to the constructor of the HttpInboundTransport class?

Signed-off-by: Sai Ranjit Tummalapalli <sairanjit.tummalapalli@ayanworks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants