Skip to content

Commit

Permalink
VSC-326 Split service manager class to allow several connections to s…
Browse files Browse the repository at this point in the history
…ame registry
  • Loading branch information
HampusAdolfsson committed Aug 27, 2024
1 parent d1acbd4 commit 0b35b63
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 60 deletions.
69 changes: 69 additions & 0 deletions src/thrift/thriftServiceRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

import * as Thrift from "thrift";
import * as CSpyServiceRegistry from "./bindings/CSpyServiceRegistry";
import { Protocol, ServiceLocation, Transport } from "./bindings/ServiceRegistry_types";
import { AddressInfo, Server } from "net";
import { ThriftClient } from "./thriftClient";

/**
* A wrapper around a remote thrift service registry launched by e.g. cspyserver
* or iarservicelauncher. Allows registering and looking up thrift services.
*/
export class ThriftServiceRegistry {
private static readonly SERVICE_LOOKUP_TIMEOUT = 1000;
private readonly activeServers: Server[] = [];

constructor(readonly registryLocation: ServiceLocation) { }

/**
* Connects to a service with the given name. The service must already be started
* (or in the process of starting), otherwise this method will reject.
* @param serviceId The name to give the service
* @param serviceType The type of the service with the given name (usually the top-level import of the service module)
*/
public async findService<T>(serviceId: string, serviceType: Thrift.TClientConstructor<T>): Promise<ThriftClient<T>> {
const registry = await ThriftClient.connect(this.registryLocation, CSpyServiceRegistry);

const location = await registry.service.waitForService(serviceId, ThriftServiceRegistry.SERVICE_LOOKUP_TIMEOUT);
const service = await ThriftClient.connect(location, serviceType);

registry.close();

return service;
}

/**
* Start and register a new service in this service registry.
* @param serviceId The name to give the service
* @param serviceType The type of service to register (usually given as the top-level import of the service module)
* @param handler The handler implementing the service
* @typeParam Pr The processor type for the service, usually serviceType.Processor
* @typeParam Ha The handler type for the service, usually object (thrift doesn't provide typescript types for service handlers)
*/
async startService<Pr, Ha>(serviceId: string, serviceType: Thrift.TProcessorConstructor<Pr, Ha>, handler: Ha): Promise<ServiceLocation> {
const serverOpt = {
transport: Thrift.TBufferedTransport,
protocol: Thrift.TBinaryProtocol,
};
const server = Thrift.createServer(serviceType, handler, serverOpt).
on("error", e => console.error(`Error in thrift server '${serviceId}': ${e.toString()}`)).
listen(0); // port 0 lets node figure out what to use

const port = (server.address() as AddressInfo).port; // this cast is safe since we know it's an IP socket
const location = new ServiceLocation({ host: "localhost", port: port, protocol: Protocol.Binary, transport: Transport.Socket });
const registry = await ThriftClient.connect(this.registryLocation, CSpyServiceRegistry);
await registry.service.registerService(serviceId, location);

this.activeServers.push(server);

registry.close();
return location;
}

public dispose() {
this.activeServers.forEach(server => server.close());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import * as Thrift from "thrift";
import * as Path from "path";
import * as Fs from "fs";
import * as Os from "os";
import { Protocol, ServiceLocation, Transport } from "./bindings/ServiceRegistry_types";
import { ServiceLocation } from "./bindings/ServiceRegistry_types";
import { ChildProcess, spawn } from "child_process";
import { ThriftClient } from "./thriftClient";
import { v4 as uuidv4 } from "uuid";
import * as CSpyServiceRegistry from "./bindings/CSpyServiceRegistry";
import { AddressInfo, Server } from "net";
import { ThriftServiceRegistry } from "./thriftServiceRegistry";

/** Callback for when the service manager crashes */
type CrashHandler = (exitCode: number | null) => void;
Expand All @@ -22,28 +22,34 @@ export interface ProcessMonitor {
}

/**
* Provides and manages thrift services for a workbench, by launching and connecting to a service registry.
* Since there are multiple ways of launching a service registry (i.e. iarservicelauncher and cspyserver),
* this class attempts to be generic enough to support all of them.
* This class helps launch and shut down proccesses (i.e. cspyserver or
* iarservicelauncher) that provide a thrift service registry. The service
* registry itself can be accessed via {@link serviceRegistry}.
*/
export class ThriftServiceManager {
private static readonly SERVICE_LOOKUP_TIMEOUT = 1000;
export class ThriftServiceRegistryProcess {
readonly serviceRegistry: ThriftServiceRegistry;

private static readonly PROCESS_LAUNCH_TIMEOUT = 20000;
private static readonly PROCESS_EXIT_TIMEOUT = 15000;
private crashHandlers: CrashHandler[] = [];
private readonly activeServers: Server[] = [];

/**
* Create a new service manager by launching the given process with the given arguments,
* and waiting for the given service to become available.
* @param execPath Path to the executable to launch.
* @param args Arguments to launch the executable with.
* @param stopProcess Function to call when shuttdownnn down the registry. This should cause the process to exit *eventually*,
* @param stopProcess Function to call when shutting down the registry. This should cause the process to exit *eventually*,
* but does not need to kill it right away.
* @param serviceToAwait The name of a service to wait for after starting, to make sure everying is started.
* @param procMon Receives information about the registry process that may be useful to log.
*/
public static launch(execPath: string, args: string[], stopProcess: (mngr: ThriftServiceManager) => Promise<void>, serviceToAwait?: string, procMon?: ProcessMonitor): Promise<ThriftServiceManager> {
public static launch(
execPath: string,
args: string[],
stopProcess: (mngr: ThriftServiceRegistry) => Promise<void>,
serviceToAwait?: string,
procMon?: ProcessMonitor): Promise<ThriftServiceRegistryProcess> {

const tmpDir = getTmpDir();
const expectedLocationFile = Path.join(tmpDir, "CSpyServer2-ServiceRegistry.txt");
const locationPromise = waitForServiceRegistryLocationFile(expectedLocationFile);
Expand All @@ -67,12 +73,17 @@ export class ThriftServiceManager {
// then optionally wait for some service to be registered in the registry
locationPromise.then(async location => {
if (serviceToAwait) await waitForServiceToBeOnline(location, serviceToAwait);
resolve(new ThriftServiceManager(process, location, stopProcess));
resolve(new ThriftServiceRegistryProcess(location, process, stopProcess));
});
});
}

private constructor(private readonly process: ChildProcess, private readonly registryLocation: ServiceLocation, private readonly stopProcess: (mngr: ThriftServiceManager) => Promise<void>) {
private constructor(
registryLocation: ServiceLocation,
private readonly process: ChildProcess,
private readonly stopProcess: (mngr: ThriftServiceRegistry) => Promise<void>) {
this.serviceRegistry = new ThriftServiceRegistry(registryLocation);

process.once("exit", code => {
this.crashHandlers.forEach(handler => handler(code));
});
Expand All @@ -82,8 +93,8 @@ export class ThriftServiceManager {
// Prevent crash handlers from being called from an expected exit.
this.crashHandlers = [];

await this.stopProcess(this);
this.activeServers.forEach(server => server.close());
await this.stopProcess(this.serviceRegistry);
this.serviceRegistry.dispose();

// Wait for service registry process to exit
if (this.process.exitCode === null) {
Expand All @@ -92,7 +103,7 @@ export class ThriftServiceManager {
setTimeout(() => {
resolve();
this.process.kill();
}, ThriftServiceManager.PROCESS_EXIT_TIMEOUT);
}, ThriftServiceRegistryProcess.PROCESS_EXIT_TIMEOUT);
});
}
}
Expand All @@ -103,51 +114,6 @@ export class ThriftServiceManager {
public addCrashHandler(handler: CrashHandler) {
this.crashHandlers.push(handler);
}

/**
* Connects to a service with the given name. The service must already be started
* (or in the process of starting), otherwise this method will reject.
* @param serviceId The name to give the service
* @param serviceType The type of the service with the given name (usually the top-level import of the service module)
*/
public async findService<T>(serviceId: string, serviceType: Thrift.TClientConstructor<T>): Promise<ThriftClient<T>> {
const registry = await ThriftClient.connect(this.registryLocation, CSpyServiceRegistry);

const location = await registry.service.waitForService(serviceId, ThriftServiceManager.SERVICE_LOOKUP_TIMEOUT);
const service = await ThriftClient.connect(location, serviceType);

registry.close();

return service;
}

/**
* Start and register a new service in this service registry.
* @param serviceId The name to give the service
* @param serviceType The type of service to register (usually given as the top-level import of the service module)
* @param handler The handler implementing the service
* @typeParam Pr The processor type for the service, usually serviceType.Processor
* @typeParam Ha The handler type for the service, usually object (thrift doesn't provide typescript types for service handlers)
*/
async startService<Pr, Ha>(serviceId: string, serviceType: Thrift.TProcessorConstructor<Pr, Ha>, handler: Ha): Promise<ServiceLocation> {
const serverOpt = {
transport: Thrift.TBufferedTransport,
protocol: Thrift.TBinaryProtocol,
};
const server = Thrift.createServer(serviceType, handler, serverOpt).
on("error", e => console.error(`Error in thrift server '${serviceId}': ${e.toString()}`)).
listen(0); // port 0 lets node figure out what to use

const port = (server.address() as AddressInfo).port; // this cast is safe since we know it's an IP socket
const location = new ServiceLocation({ host: "localhost", port: port, protocol: Protocol.Binary, transport: Transport.Socket });
const registry = await ThriftClient.connect(this.registryLocation, CSpyServiceRegistry);
await registry.service.registerService(serviceId, location);

this.activeServers.push(server);

registry.close();
return location;
}
}

// Waits for a service registry location file to appear, then reads it.
Expand Down

0 comments on commit 0b35b63

Please sign in to comment.