Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jribbink committed Nov 22, 2024
1 parent 2f3dcea commit ced629a
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 122 deletions.
23 changes: 22 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion packages/transport-http/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
"@onflow/rlp": "1.2.3-alpha.0",
"@onflow/sdk": "1.5.4-alpha.1",
"@onflow/types": "1.4.1-alpha.0",
"jest": "^29.7.0"
"jest": "^29.7.0",
"jest-websocket-mock": "^2.5.0",
"mock-socket": "^9.3.1"
},
"source": "src/sdk-send-http.ts",
"main": "dist/sdk-send-http.js",
Expand Down
47 changes: 0 additions & 47 deletions packages/transport-http/src/subscriptions/mocks/websocket.ts

This file was deleted.

141 changes: 86 additions & 55 deletions packages/transport-http/src/subscriptions/stream-controller.test.ts
Original file line number Diff line number Diff line change
@@ -1,69 +1,100 @@
import { createMockWebSocket } from "./mocks/websocket"
import { SubscribeMessageResponse } from "./models"
import { StreamController } from "./stream-controller"
import { SubscriptionResponse, SubscriptionTopic } from "./types"

let mockWs: ReturnType<typeof createMockWebSocket>
import WS from "jest-websocket-mock"
import {WebSocket as mockSocket} from "mock-socket"
import {
SubscribeMessageRequest,
SubscribeMessageResponse,
SubscriptionDataMessage,
UnsubscribeMessageRequest,
} from "./models"
import {StreamController} from "./stream-controller"
import {SubscriptionTopic} from "./types"

jest.mock("../websocket", () => ({
WebSocket: jest.fn().mockImplementation(() => mockWs.WebSocket),
WebSocket: mockSocket,
}))

describe("StreamController", () => {
beforeEach(() => {
mockWs = createMockWebSocket()
})
test("constructor", () => {
const config = {
hostname: "hostname",
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new StreamController(config)
expect(streamController).toBeDefined()
})

test("subscribes, receives data, and unsubscribes", async () => {
const config = {
hostname: "wss://localhost:8080",
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new StreamController(config)
const topic = "topic" as SubscriptionTopic
const args = {key: "value"} as any
const onData = jest.fn()
const onError = jest.fn()

const mockWs = new WS("wss://localhost:8080")

let serverPromise = (async () => {
await mockWs.connected

const msg = (await mockWs.nextMessage) as string
const data = JSON.parse(msg) as SubscribeMessageRequest
expect(data).toEqual({
action: "subscribe",
topic,
arguments: args,
})

test("constructor", () => {
const config = {
hostname: "hostname",
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new StreamController(config)
expect(streamController).toBeDefined()
})
const response: SubscribeMessageResponse = {
id: "id",
action: "subscribe",
success: true,
topic,
}
mockWs.send(JSON.stringify(response))
})()

test("subscribe sends subscribe message", async () => {
const config = {
hostname: "hostname",
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new StreamController(config)
const topic = "topic" as SubscriptionTopic
const args = { key: "value" } as any
const onData = jest.fn()
const onError = jest.fn()
const [subscription] = await Promise.all([
streamController.subscribe({
topic,
args,
onData,
onError,
}),
serverPromise,
])

console.log("YO")
expect(subscription).toBeDefined()
expect(subscription.unsubscribe).toBeInstanceOf(Function)

mockWs.mockConnection.onmessage.mockImplementation((event) => {
console.log(event)
const data = JSON.parse(event.data)
expect(data).toEqual({
topic,
arguments: args,
})
serverPromise = (async () => {
const data = {
id: "id",
data: {key: "value"},
} as SubscriptionDataMessage
mockWs.send(JSON.stringify(data))
})()

const response: SubscribeMessageResponse = {
id: "id",
action: "subscribe",
success: true,
topic,
}
mockWs.mockConnection.send(response)
})
await serverPromise

const subscription = await streamController.subscribe({
topic,
args,
onData,
onError,
})
expect(onData).toHaveBeenCalledTimes(1)
expect(onData).toHaveBeenCalledWith({key: "value"})
expect(onError).toHaveBeenCalledTimes(0)

expect(mockWs.mockConnection.onmessage).toHaveBeenCalledTimes(1)
serverPromise = (async () => {
const msg = (await mockWs.nextMessage) as string
const data = JSON.parse(msg) as UnsubscribeMessageRequest
expect(data).toEqual({
action: "unsubscribe",
id: "id",
})
})()


})
subscription.unsubscribe()
await serverPromise
})
})
37 changes: 19 additions & 18 deletions packages/transport-http/src/subscriptions/stream-controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import {MessageResponse, SubscriptionDataMessage, UnsubscribeMessageResponse} from "./models"
import {
MessageResponse,
SubscriptionDataMessage,
UnsubscribeMessageResponse,
} from "./models"
import {
SubscribeMessageRequest,
SubscribeMessageResponse,
Expand Down Expand Up @@ -53,16 +57,12 @@ export class StreamController implements SubscriptionTransport {

// Lazy connect to the socket when the first subscription is made
private async connect() {
return new Promise<void>((resolve) => {
return new Promise<void>(resolve => {
// If the socket is already open, do nothing
if (this.socket?.readyState === WS_OPEN) {
return
}

console.log("CONNECTING")

console.log("WS", WebSocket)

this.socket = new WebSocket(this.config.hostname)
this.socket.onmessage = event => {
const data = JSON.parse(event.data) as
Expand Down Expand Up @@ -91,12 +91,13 @@ export class StreamController implements SubscriptionTransport {
}

this.socket.onopen = () => {
console.log("WebSocket connection established")
// Restore subscriptions
Promise.all(this.subscriptions.map(async sub => {
Promise.all(
this.subscriptions.map(async sub => {
const response = await this.sendSubscribe(sub)
sub.remoteId = response.id
})).then(() => {
})
).then(() => {
resolve()
})
}
Expand All @@ -120,7 +121,11 @@ export class StreamController implements SubscriptionTransport {
// Validate the number of reconnection attempts
if (this.reconnectAttempts >= this.config.reconnectAttempts) {
this.subscriptions.forEach(sub => {
sub.onError(new Error(`Failed to reconnect to the server after ${this.reconnectAttempts} attempts`))
sub.onError(
new Error(
`Failed to reconnect to the server after ${this.reconnectAttempts} attempts`
)
)
})
this.subscriptions = []
this.reconnectAttempts = 0
Expand All @@ -144,12 +149,9 @@ export class StreamController implements SubscriptionTransport {
onData: (data: SubscriptionResponse<T>) => void
onError: (error: Error) => void
}): Promise<Subscription> {
console.log("TRY CONN")
// Connect the socket if it's not already open
await this.connect()

console.log("CONN?")

// Track the subscription locally
const sub: SubscriptionInfo<T> = {
id: this.counter++,
Expand Down Expand Up @@ -241,7 +243,7 @@ export class StreamController implements SubscriptionTransport {
private async waitForResponse<T extends MessageResponse>(): Promise<T> {
// TODO: NOOP, waiting for AN team to decide what to do here, this is a placeholder
return new Promise(resolve => {
this.socket?.addEventListener("message", event => {
this.socket?.addEventListener("message", event => {
const data = JSON.parse(event.data) as T
if (data.action) {
resolve(data)
Expand All @@ -252,10 +254,9 @@ export class StreamController implements SubscriptionTransport {

// Update the subscription checkpoint when a message is received
// These checkpoints are used to resume subscriptions after disconnects
private updateSubscriptionCheckpoint<T extends SubscriptionTopic = SubscriptionTopic>(
sub: SubscriptionInfo<T>,
message: SubscriptionDataMessage
) {
private updateSubscriptionCheckpoint<
T extends SubscriptionTopic = SubscriptionTopic,
>(sub: SubscriptionInfo<T>, message: SubscriptionDataMessage) {
// TODO: Will be implemented with each subscription topic
}
}

0 comments on commit ced629a

Please sign in to comment.