Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request response browser3 #576

Closed
wants to merge 13 commits into from
Prev Previous commit
Next Next commit
Checkpoint
  • Loading branch information
bretambrose committed Aug 7, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 8d8cdda9fd9ebb82600f3b0e55d2f03a218df389
71 changes: 52 additions & 19 deletions lib/browser/mqtt_request_response.ts
Original file line number Diff line number Diff line change
@@ -74,6 +74,11 @@ interface ResponsePathEntry {
correlationTokenPath: string,
}

interface ServiceTaskWrapper {
serviceTask : ReturnType<typeof setTimeout>;
nextserviceTime : number;
}

/**
* Native implementation of an MQTT-based request-response client tuned for AWS MQTT services.
*
@@ -83,14 +88,15 @@ interface ResponsePathEntry {
*/
export class RequestResponseClient extends BufferedEventEmitter implements mqtt_request_response.IRequestResponseClient {

private operationTimeoutInSeconds: number,
private nextOperationId: number = 1;
private operationTimeoutInSeconds: number;
private nextOperationId : number = 1;
private protocolClientAdapter : protocol_client_adapter.ProtocolClientAdapter;
private subscriptionManager : subscription_manager.SubscriptionManager;
private state: mqtt_request_response_internal.RequestResponseClientState = mqtt_request_response_internal.RequestResponseClientState.Ready;
private state : mqtt_request_response_internal.RequestResponseClientState = mqtt_request_response_internal.RequestResponseClientState.Ready;
private serviceTask? : ServiceTaskWrapper;

private operations : Map<number, Operation> = new Map<number, Operation>();
private operationsByTopicFilter : Map<string, Set<number>> = new Map<string, Set<number>>(); // topic filter -> set of operation ids
private streamingOperationsByTopicFilter : Map<string, Set<number>> = new Map<string, Set<number>>(); // topic filter -> set of operation ids
private correlationTokenPathsByResponsePaths : Map<string, ResponsePathEntry> = new Map<string, ResponsePathEntry>(); // response topic -> response path entry
private operationsByCorrelationToken : Map<string, number> = new Map<string, number>(); // correlation token -> operation id

@@ -187,18 +193,22 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
let operation : RequestResponseOperation = {
id: id,
type: OperationType.RequestResponse,
options: requestOptions,
state: OperationState.Queued,
resultPromise: resultPromise
}
pendingSubscriptionCount: requestOptions.subscriptionTopicFilters.length,
inClientTables: false,
options: requestOptions,
resultPromise: resultPromise,
};

this.operations.set(id, operation);
this.operationQueue.push(id);

setTimeout(() => {
this.cancelOperation(id);
this.completeRequestResponseOperationWithError(id, new CrtError("Operation Timeout"));
}, this.operationTimeoutInSeconds * 1000)

this.wakeServiceTask();

return resultPromise.promise;
}

@@ -216,16 +226,43 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
throw new CrtError("NYI");
}

private closeAllOperations() : void {
// NYI
private service() {

}

private clearServiceTask() {
if (this.serviceTask) {
clearTimeout(this.serviceTask.serviceTask);
this.serviceTask = undefined;
}
}

private cancelOperation(id: number) {
private tryScheduleServiceTask(serviceTime: number) {
if (this.serviceTask) {
if (serviceTime >= this.serviceTask.nextserviceTime) {
return;
}

this.clearServiceTask();
}

let futureMs = Math.max(0, Date.now() - serviceTime);
this.serviceTask = {
serviceTask: setTimeout(() => { this.service(); }, futureMs),
nextserviceTime: serviceTime,
}
}

private wakeServiceTask() : void {
this.tryScheduleServiceTask(Date.now());
}

private closeAllOperations() : void {
// NYI
}

private removeOperationFromTopicFilterSet(topicFilter: string, id: number) {
let operationSet = this.operationsByTopicFilter.get(topicFilter);
private removeStreamingOperationFromTopicFilterSet(topicFilter: string, id: number) {
let operationSet = this.streamingOperationsByTopicFilter.get(topicFilter);
if (!operationSet) {
return;
}
@@ -235,7 +272,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
return;
}

??;
this.streamingOperationsByTopicFilter.delete(topicFilter);
}

private decRefResponsePaths(topic: string) {
@@ -254,10 +291,6 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
this.operations.delete(operation.id);

if (operation.inClientTables) {
for (let topicFilter of operation.options.subscriptionTopicFilters) {
this.removeOperationFromTopicFilterSet(topicFilter, operation.id);
}

for (let responsePath of operation.options.responsePaths) {
this.decRefResponsePaths(responsePath.topic);
}
@@ -271,7 +304,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
this.operations.delete(operation.id);

if (operation.inClientTables) {
this.removeOperationFromTopicFilterSet(operation.options.subscriptionTopicFilter, operation.id);
this.removeStreamingOperationFromTopicFilterSet(operation.options.subscriptionTopicFilter, operation.id);
}
}

Loading