Skip to content

Commit

Permalink
Merge pull request #87 from nevermined-io/fix/task-log-socket
Browse files Browse the repository at this point in the history
Fix/task log socket
  • Loading branch information
clriesco authored Dec 4, 2024
2 parents 9f7758c + 5a6d11d commit 03330f7
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nevermined-io/payments",
"version": "0.7.3",
"version": "0.7.4",
"description": "Typescript SDK to interact with the Nevermined Payments Protocol",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
Expand Down
13 changes: 10 additions & 3 deletions src/api/nvm-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ export class NVMBackendApi {
})

await this.socketClient.emit('_join-tasks', JSON.stringify({ tasks, history }))
await this.socketClient.on('task-log', this.handleTaskLog.bind(this))
await this.socketClient.on('task-log', this.handleTaskLog.bind(this, tasks))
} catch (error) {
throw new PaymentsError(
`Unable to initialize websocket client: ${this.opts.webSocketHost} - ${(error as Error).message}`,
Expand All @@ -223,15 +223,22 @@ export class NVMBackendApi {
}

/**
* Handles the 'task-log' event from the websocket.
* Handles the '_task-log' event from the websocket.
* Parses the incoming data, retrieves the corresponding callback,
* executes it, and removes the callback if the task is completed or failed.
*
* @param boundTasks - The list of task IDs that the callback is bound to.
* @param data - The data received from the websocket event.
*/
private handleTaskLog(data: any): void {
private handleTaskLog(boundTasks: string[], data: any): void {
const parsedData = JSON.parse(data)
const { task_id: taskId } = parsedData

// If the task ID is not in the list of bound tasks, ignore the event
if (!boundTasks.includes(taskId)) {
return
}

const callback = this.taskCallbacks.get(taskId)
if (callback) {
// Execute the stored callback
Expand Down
29 changes: 24 additions & 5 deletions tests/e2e/payments.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ describe('Payments API (e2e)', () => {
let completedTaskDID: string
let failedTaskId: string
let failedTaskDID: string
let logsReceived = 0

describe('Payments Setup', () => {
it('The Payments client can be initialized correctly', () => {
Expand Down Expand Up @@ -238,7 +237,6 @@ describe('Payments API (e2e)', () => {
subscriberQueryOpts,
async (data) => {
console.log('Task Log received', data)
logsReceived++
}
)

Expand Down Expand Up @@ -353,17 +351,38 @@ describe('Payments API (e2e)', () => {
)

it('Subscriber should be able to receive logs', async () => {


const aiTask: CreateTaskDto = {
query: 'https://www.youtube.com/watch?v=0tZFQs7qBfQ',
name: 'transcribe',
additional_params: [],
artifacts: [],
}
const accessConfig = await paymentsSubscriber.getServiceAccessConfig(agentDID)
const queryOpts = {
accessToken: accessConfig.accessToken,
proxyHost: accessConfig.neverminedProxyUri,
}

let logsReceived = 0
const taskResult = await paymentsSubscriber.query.createTask(agentDID, aiTask, queryOpts, async (data) => {
console.log('New Task Log received', data)
logsReceived++
})
const taskId = taskResult.data.task.task_id
const logMessage: TaskLogMessage = {
level: 'info',
task_status: AgentExecutionStatus.Completed,
task_id: createdTaskId,
task_id: taskId,
message: 'This is a log message',
}

console.log(`Sending log message for task ${logMessage.task_id}`)

await paymentsBuilder.query.logTask(logMessage)

await sleep(2_000)
await sleep(5_000)
console.log(`# Logs received: ${logsReceived}`)
expect(logsReceived).toBeGreaterThan(0)

}, TEST_TIMEOUT)
Expand Down

0 comments on commit 03330f7

Please sign in to comment.