Skip to content

Commit

Permalink
Merge pull request #83 from nevermined-io/feat/log_and_auth
Browse files Browse the repository at this point in the history
Logging tasks messages + new ws auth
  • Loading branch information
aaitor authored Nov 12, 2024
2 parents 29b8147 + 3c05540 commit a5caa9a
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 59 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. Dates are d

Generated by [`auto-changelog`](https://github.com/CookPete/auto-changelog).

#### [v0.7.0](https://github.com/nevermined-io/payments/compare/v0.6.1...v0.7.0)

> 12 November 2024
- Automated docs update [`c3884b0`](https://github.com/nevermined-io/payments/commit/c3884b0d309b7350185e4f7b76e24e7c0ca8e6c3)
- feat: logging task messages [`0515b31`](https://github.com/nevermined-io/payments/commit/0515b316eda2c96ca6932a3d6b6f61efb914d9c7)
- fix: new authentication [`6256409`](https://github.com/nevermined-io/payments/commit/6256409392d630e6bf7b75ca4e86a47e6b4bc990)

#### [v0.6.1](https://github.com/nevermined-io/payments/compare/v0.6.0...v0.6.1)

> 29 October 2024
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nevermined-io/payments",
"version": "0.6.1",
"version": "0.7.1",
"description": "Typescript SDK to interact with the Nevermined Payments Protocol",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
Expand Down
100 changes: 65 additions & 35 deletions src/api/nvm-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import axios from 'axios'
import { decodeJwt } from 'jose'
import { io } from 'socket.io-client'
import { sleep } from '../common/helper'
import { AgentExecutionStatus } from '../common/types'
import { AgentExecutionStatus, TaskLogMessage } from '../common/types'
import { isEthereumAddress } from '../utils'
import { PaymentsError } from '../common/payments.error'

export interface BackendApiOptions {
/**
Expand Down Expand Up @@ -41,14 +42,19 @@ export interface BackendApiOptions {

export interface BackendWebSocketOptions {
/**
* The path to connect to the websocket server
* The websocket transports to use
*/
path?: string
transports: string[]

/**
* The websocket transports to use
* Authentication parameters
*/
transports?: string[]
auth: { token: string }

/**
* The path to connect to the websocket server
*/
path?: string

/**
* The bearer token to use in the websocket connection
Expand Down Expand Up @@ -89,11 +95,7 @@ export class NVMBackendApi {
private _defaultSocketOptions: BackendWebSocketOptions = {
// path: '',
transports: ['websocket'],
transportOptions: {
websocket: {
extraHeaders: {},
},
},
auth: { token: '' },
}

constructor(opts: BackendApiOptions) {
Expand All @@ -106,22 +108,16 @@ export class NVMBackendApi {
if (opts.webSocketOptions?.bearerToken) {
// If the user pass a specific websocketoptions bearer token we use that one
opts.webSocketOptions = {
...this._defaultSocketOptions,
...opts.webSocketOptions,
transportOptions: {
websocket: {
extraHeaders: { Authorization: `Bearer ${opts.webSocketOptions!.bearerToken}` },
},
},
auth: { token: `Bearer ${opts.webSocketOptions!.bearerToken}` },
}
} else if (opts.apiKey) {
// If not use the api key
opts.webSocketOptions = {
...this._defaultSocketOptions,
...opts.webSocketOptions,
transportOptions: {
websocket: {
extraHeaders: { Authorization: `Bearer ${opts.apiKey}` },
},
},
auth: { token: `Bearer ${opts.apiKey}` },
}
}

Expand All @@ -137,7 +133,6 @@ export class NVMBackendApi {
try {
if (this.opts.apiKey && this.opts.apiKey.length > 0) {
const jwt = decodeJwt(this.opts.apiKey)
// if (jwt.sub && !jwt.sub.match(/^0x[a-fA-F0-9]{40}$/)) {
if (isEthereumAddress(jwt.sub)) {
this.userRoomId = `room:${jwt.sub}`
this.hasKey = true
Expand All @@ -157,32 +152,63 @@ export class NVMBackendApi {
}
}

public async connectSocket(_callback: (err?: any) => any, opts: SubscriptionOptions) {
private async _connectInternalSocketClient() {
if (!this.hasKey)
throw new Error('Unable to subscribe to the server becase a key was not provided')

if (this.socketClient && this.socketClient.connected) {
// nvm-backend:: Already connected to the websocket server
if (this.isWebSocketConnected()) {
// `_connectInternalSocketClient:: Already connected to the websocket server with id ${this.socketClient.id}`,
return
}

this.socketClient = io(this.opts.webSocketHost!, this.opts.webSocketOptions)
await this.socketClient.connect()
for (let i = 0; i < 10; i++) {
if (this.isWebSocketConnected()) return
await sleep(500)
}
if (!this.isWebSocketConnected()) {
throw new Error('Unable to connect to the websocket server')
}
}

protected async connectSocketSubscriber(
_callback: (err?: any) => any,
opts: SubscriptionOptions,
) {
try {
// nvm-backend:: Connecting to websocket server: ${this.opts.webSocketHost}
this.socketClient = io(this.opts.webSocketHost!, this.opts.webSocketOptions)
await this.socketClient.connect()
this._connectInternalSocketClient()

await this.socketClient.on('_connected', async () => {
this._subscribe(_callback, opts)
})
for (let i = 0; i < 5; i++) {
await sleep(1_000)
if (this.socketClient.connected) {
break
}
}
if (!this.socketClient.connected) {
throw new Error('Unable to connect to the websocket server')
} catch (error) {
throw new PaymentsError(
`Unable to initialize websocket client: ${this.opts.webSocketHost} - ${(error as Error).message}`,
)
}
}

protected async connectTasksSocket(_callback: (err?: any) => any, tasks: string[]) {
try {
if (tasks.length === 0) {
throw new Error('No task rooms to join in configuration')
}

this._connectInternalSocketClient()

// `connectTasksSocket:: Is connected? ${this.isWebSocketConnected()}`

await this.socketClient.on('_connected', async () => {
// `connectTasksSocket:: Joining tasks: ${JSON.stringify(tasks)}`
await this.socketClient.emit('_join-tasks', JSON.stringify({ tasks }))
await this.socketClient.on('task-log', (data: any) => {
_callback(data)
})
})
} catch (error) {
throw new Error(
throw new PaymentsError(
`Unable to initialize websocket client: ${this.opts.webSocketHost} - ${(error as Error).message}`,
)
}
Expand Down Expand Up @@ -231,6 +257,10 @@ export class NVMBackendApi {
this.socketClient.emit('_emit-steps', JSON.stringify(message))
}

protected async _emitTaskLog(logMessage: TaskLogMessage) {
this.socketClient.emit('_task-log', JSON.stringify(logMessage))
}

disconnect() {
this.disconnectSocket()
// nvm-backend:: Disconnected from the server
Expand Down
42 changes: 38 additions & 4 deletions src/api/query-api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AgentExecutionStatus, Step } from '../common/types'
import { AgentExecutionStatus, CreateTaskDto, Step, TaskLogMessage } from '../common/types'
import { isStepIdValid } from '../utils'
import {
BackendApiOptions,
Expand Down Expand Up @@ -78,7 +78,20 @@ export class AIQueryApi extends NVMBackendApi {
_callback: (err?: any) => any,
opts: SubscriptionOptions = DefaultSubscriptionOptions,
) {
await super.connectSocket(_callback, opts)
await super.connectSocketSubscriber(_callback, opts)
}

/**
* It subscribes to receive the logs generated during the execution of a task/s
*
* @remarks
* This method is used by users/subscribers of AI agents after they create a task on them
*
* @param _callback - The callback to execute when a new task log event is received
* @param tasks - The list of tasks to subscribe to
*/
async subscribeTasksLogs(_callback: (err?: any) => any, tasks: string[]) {
await super.connectTasksSocket(_callback, tasks)
}

/**
Expand Down Expand Up @@ -116,9 +129,15 @@ export class AIQueryApi extends NVMBackendApi {
* @param did - Agent DID
* @param task - Task object. The task object should contain the query to execute and the name of the task. All the attributes here: @see https://docs.nevermined.io/docs/protocol/query-protocol#tasks-attributes
* @param queryOpts - The query options @see {@link Payments.getServiceAccessConfig}
* @param _callback - The callback to execute when a new task log event is received (optional)
* @returns The result of the operation
*/
async createTask(did: string, task: any, queryOpts: AIQueryOptions) {
async createTask(
did: string,
task: CreateTaskDto,
queryOpts: AIQueryOptions,
_callback?: (err?: any) => any,
) {
const endpoint = TASK_ENDPOINT.replace('{did}', did)
const reqOptions: HTTPRequestOptions = {
sendThroughProxy: true,
Expand All @@ -127,7 +146,11 @@ export class AIQueryApi extends NVMBackendApi {
headers: { Authorization: `Bearer ${queryOpts.accessToken}` },
}),
}
return this.post(endpoint, task, reqOptions)
const result = await this.post(endpoint, task, reqOptions)
if (result.status === 201 && _callback) {
await this.subscribeTasksLogs(_callback, [result.data.task.task_id])
}
return result
}

/**
Expand Down Expand Up @@ -334,4 +357,15 @@ export class AIQueryApi extends NVMBackendApi {
async getTasksFromAgents() {
return this.get(GET_AGENTS_ENDPOINT, { sendThroughProxy: false })
}

/**
* It emits a log message related to a task
*
* @remarks
* This method is used by the AI Agent to emit log messages
*
*/
async logTask(logMessage: TaskLogMessage) {
super._emitTaskLog(logMessage)
}
}
49 changes: 49 additions & 0 deletions src/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,52 @@ export interface Artifact {
*/
url: string
}

export interface TaskLogMessage {
/**
* Log level
*/
level: 'info' | 'error' | 'warning' | 'debug'

/**
* The log message
*/
message: string

/**
* Identifier of the task associated with the log
*/
task_id: string

/**
* The status of the task
*/
task_status?: AgentExecutionStatus

/**
* The step id associated with the log message if any
*/
step_id?: string
}

export interface CreateTaskDto {
/**
* The query parameter for the task
*/
query: string

/**
* The name of the task
*/
name?: string

/**
* Additional parameters required for the task
*/
additional_params?: { [name: string]: string }[]

/**
* Additional artifacts required for the task
*/
artifacts?: Artifact[]
}
Loading

0 comments on commit a5caa9a

Please sign in to comment.