Skip to content

Commit

Permalink
Merge pull request #31 from pinax-network/feature/plaintext
Browse files Browse the repository at this point in the history
improve CLI arguments & add plaintext
  • Loading branch information
DenisCarriere authored Mar 3, 2024
2 parents 6d2f9b5 + 59829ab commit da31d07
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 55 deletions.
26 changes: 19 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Usage: substreams-sink run [options]
Substreams Sink

Options:
-v, --version version for substreams-sink
-e --substreams-endpoint <string> Substreams gRPC endpoint to stream data from (env: SUBSTREAMS_ENDPOINT)
--manifest <string> URL of Substreams package (env: MANIFEST)
--module-name <string> Name of the output module (declared in the manifest) (env: MODULE_NAME)
Expand All @@ -52,15 +53,16 @@ Options:
--substreams-api-key <string> API key for the Substream endpoint (env: SUBSTREAMS_API_KEY)
--delay-before-start <int> Delay (ms) before starting Substreams (default: 0, env: DELAY_BEFORE_START)
--cursor <string> Cursor to stream from. Leave blank for no cursor
--production-mode <boolean> Enable production mode, allows cached Substreams data if available (default: "false", env: PRODUCTION_MODE)
--production-mode <boolean> Enable production mode, allows cached Substreams data if available (choices: "true", "false", default: false, env: PRODUCTION_MODE)
--final-blocks-only <boolean> Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (choices: "true", "false", default: false, env: FINAL_BLOCKS_ONLY)
--inactivity-seconds <int> If set, the sink will stop when inactive for over a certain amount of seconds (default: 300, env: INACTIVITY_SECONDS)
--headers [string...] Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS)
--plaintext <boolean> Establish GRPC connection in plaintext (choices: "true", "false", default: false, env: PLAIN_TEXT)
--verbose <boolean> Enable verbose logging (choices: "true", "false", default: false, env: VERBOSE)
--hostname <string> The process will listen on this hostname for any HTTP and Prometheus metrics requests (default: "localhost", env: HOSTNAME)
--port <int> The process will listen on this port for any HTTP and Prometheus metrics requests (default: 9102, env: PORT)
--metrics-labels [string...] To apply generic labels to all default metrics (ex: --labels foo=bar) (default: {}, env: METRICS_LABELS)
--collect-default-metrics <boolean> Collect default metrics (default: "false", env: COLLECT_DEFAULT_METRICS)
--headers [string...] Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS)
--final-blocks-only <boolean> Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (default: "false", env: FINAL_BLOCKS_ONLY)
--verbose <boolean> Enable verbose logging (default: "false", env: VERBOSE)
--collect-default-metrics <boolean> Collect default metrics (choices: "true", "false", default: false, env: COLLECT_DEFAULT_METRICS)
-h, --help display help for command
```
Expand All @@ -84,12 +86,22 @@ STOP_BLOCK=1000020
**example.js**
```js
import pkg from "./package.json" assert { type: "json" };
import { commander, setup, prometheus, http, logger, fileCursor } from "substreams-sink";

const pkg = {
name: "substreams-sink",
version: "0.0.1",
description: "Substreams Sink long description",
}

// Setup CLI using Commander
const program = commander.program(pkg);
const command = commander.addRunOptions(program);
logger.setName(pkg.name);

// Setup CLI using Commander
const program = commander.program(pkg);
const command = commander.run(program, pkg);
const command = commander.addRunOptions(program);
logger.setName(pkg.name);

// Custom Prometheus Counters
Expand Down
9 changes: 7 additions & 2 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import pkg from "./package.json" assert { type: "json" };
import { commander, setup, prometheus, http, logger, fileCursor } from "./dist/index.js";

const pkg = {
name: "substreams-sink",
version: "0.0.1",
description: "Substreams Sink long description",
}

// Setup CLI using Commander
const program = commander.program(pkg);
const command = commander.run(program, pkg);
const command = commander.addRunOptions(program);
logger.setName(pkg.name);

// Custom Prometheus Counters
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink",
"version": "0.15.2",
"version": "0.16.0",
"description": "Substreams Sink",
"type": "module",
"exports": "./dist/index.js",
Expand Down
53 changes: 33 additions & 20 deletions src/commander.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
import "dotenv/config";
import { Command, Option } from "commander";
import { DEFAULT_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_VERBOSE, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_COLLECT_DEFAULT_METRICS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS, DEFAULT_PRODUCTION_MODE, DEFAULT_FINAL_BLOCKS_ONLY } from "./config.js";
import { DEFAULT_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS } from "./config.js";

export interface Package {
name: string;
version: string;
description: string;
name?: string;
version?: string;
description?: string;
}

export interface RunOptions {
substreamsEndpoint: string;
manifest: string;
moduleName: string;
params: string[];
startBlock: string;
stopBlock: string;
startBlock: number | bigint | undefined;
stopBlock: number | bigint | `+${number}` | undefined;
substreamsApiKey: string;
substreamsApiToken: string; // Deprecated
delayBeforeStart: number;
cursor: string;
productionMode: string;
productionMode: boolean;
inactivitySeconds: number;
hostname: string;
port: number;
metricsLabels: string[];
collectDefaultMetrics: string;
collectDefaultMetrics: boolean;
headers: Headers;
verbose: string;
finalBlocksOnly: string;
verbose: boolean;
finalBlocksOnly: boolean;
plaintext: boolean;
}

export function program(pkg: Package) {
export function program(pkg?: Package) {
const program = new Command();
program.name(pkg.name).version(pkg.version, "-v, --version", `version for ${pkg.name}`);
const name = pkg?.name ?? "substreams-sink";
program.name(name);
if ( pkg?.version ) program.version(pkg.version, "-v, --version", `version for ${name}`);
if ( pkg?.description ) program.description(pkg.description);
program.command("completion").description("Generate the autocompletion script for the specified shell");
program.command("help").description("Display help for command");
program.showHelpAfterError();
Expand Down Expand Up @@ -60,8 +64,8 @@ function handleHeaders(value: string, previous: Headers) {
return headers;
}

export function run(program: Command, pkg: Package, options: AddRunOptions = {}) {
return addRunOptions(program.command("run"), pkg, options);
export function run(program: Command, options: AddRunOptions = {}) {
return addRunOptions(program.command("run"), options);
}

export function list(program: Command) {
Expand All @@ -82,10 +86,18 @@ interface AddRunOptions {
metrics?: boolean;
}

export function addRunOptions(program: Command, pkg: Package, options: AddRunOptions = {}) {
function parseBoolean(value?: string) {
if ( value !== undefined ) return value.toLocaleLowerCase() === "true";
return false;
}

function addBoolean(flags: string, description: string, env: string) {
return new Option(flags, description).default(false).env(env).choices(["true", "false"]).argParser(parseBoolean);
}

export function addRunOptions(program: Command, options: AddRunOptions = {}) {
const command = program
.showHelpAfterError()
.description(pkg.description)
.addOption(new Option("-e --substreams-endpoint <string>", "Substreams gRPC endpoint to stream data from").makeOptionMandatory().env("SUBSTREAMS_ENDPOINT"))
.addOption(new Option("--manifest <string>", "URL of Substreams package").makeOptionMandatory().env("MANIFEST"))
.addOption(new Option("--module-name <string>", "Name of the output module (declared in the manifest)").makeOptionMandatory().env("MODULE_NAME"))
Expand All @@ -96,11 +108,12 @@ export function addRunOptions(program: Command, pkg: Package, options: AddRunOpt
.addOption(new Option("--substreams-api-token <string>", "(DEPRECATED) API token for the Substream endpoint").hideHelp(true).env("SUBSTREAMS_API_TOKEN"))
.addOption(new Option("--delay-before-start <int>", "Delay (ms) before starting Substreams").default(DEFAULT_DELAY_BEFORE_START).env("DELAY_BEFORE_START"))
.addOption(new Option("--cursor <string>", "Cursor to stream from. Leave blank for no cursor"))
.addOption(new Option("--production-mode <boolean>", "Enable production mode, allows cached Substreams data if available").default(DEFAULT_PRODUCTION_MODE).env("PRODUCTION_MODE"))
.addOption(addBoolean("--production-mode <boolean>", "Enable production mode, allows cached Substreams data if available", "PRODUCTION_MODE"))
.addOption(addBoolean("--final-blocks-only <boolean>", "Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD", "FINAL_BLOCKS_ONLY"))
.addOption(new Option("--inactivity-seconds <int>", "If set, the sink will stop when inactive for over a certain amount of seconds").default(DEFAULT_INACTIVITY_SECONDS).env("INACTIVITY_SECONDS"))
.addOption(new Option("--headers [string...]", "Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA)").default(DEFAULT_HEADERS).env("HEADERS").argParser(handleHeaders))
.addOption(new Option("--final-blocks-only <boolean>", "Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD").default(DEFAULT_FINAL_BLOCKS_ONLY).env("FINAL_BLOCKS_ONLY"))
.addOption(new Option("--verbose <boolean>", "Enable verbose logging").default(DEFAULT_VERBOSE).env("VERBOSE"))
.addOption(addBoolean("--plaintext <boolean>", "Establish GRPC connection in plaintext", "PLAIN_TEXT"))
.addOption(addBoolean("--verbose <boolean>", "Enable verbose logging", "VERBOSE"));

// HTTP and Prometheus metrics options
if ( options.http !== false ) {
Expand All @@ -111,7 +124,7 @@ export function addRunOptions(program: Command, pkg: Package, options: AddRunOpt
if ( options.metrics !== false ) {
command
.addOption(new Option("--metrics-labels [string...]", "To apply generic labels to all default metrics (ex: --labels foo=bar)").default(DEFAULT_METRICS_LABELS).env("METRICS_LABELS").argParser(handleMetricsLabels))
.addOption(new Option("--collect-default-metrics <boolean>", "Collect default metrics").default(DEFAULT_COLLECT_DEFAULT_METRICS).env("COLLECT_DEFAULT_METRICS"))
.addOption(addBoolean("--collect-default-metrics <boolean>", "Collect default metrics", "COLLECT_DEFAULT_METRICS"));
}
return command;
}
4 changes: 0 additions & 4 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@
export const DEFAULT_HOSTNAME = "localhost";
export const DEFAULT_PORT = 9102;
export const DEFAULT_CURSOR_PATH = "cursor.lock";
export const DEFAULT_VERBOSE = "false";
export const DEFAULT_INACTIVITY_SECONDS = 300;
export const DEFAULT_PRODUCTION_MODE = "false";
export const DEFAULT_DELAY_BEFORE_START = 0;
export const DEFAULT_METRICS_LABELS = {};
export const DEFAULT_COLLECT_DEFAULT_METRICS = "false";
export const DEFAULT_START_BLOCK = "-1";
export const DEFAULT_PARAMS = [];
export const DEFAULT_HEADERS = new Headers();
export const DEFAULT_FINAL_BLOCKS_ONLY = "false";
5 changes: 2 additions & 3 deletions src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ export const server = http.createServer(async (req, res) => {
}
});

export async function listen(options: RunOptions) {
export async function listen(options: RunOptions): Promise<http.Server> {
const hostname = options.hostname;
const port = options.port;
return new Promise(resolve => {
server.listen(port, hostname, () => {
logger.info("prometheus server", { hostname, port });
resolve(true);
resolve(server);
});
})
}
4 changes: 1 addition & 3 deletions src/list.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { readPackage } from "@substreams/manifest";
import { getModules } from "@substreams/core";

import { logger } from "./logger.js";

export async function list(url: string) {
const spkg = await readPackage(url)
const compatible = []

for (const { name, output } of getModules(spkg)) {
if (!output) continue;
logger.info('module', { name, output })
console.log('module', { name, output })
compatible.push(name)
}

Expand Down
2 changes: 0 additions & 2 deletions src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ export function onPrometheusMetrics(emitter: BlockEmitter) {
}

export function handleSession(session: SessionInit) {
logger.info("session", { traceId: String(session.traceId), resolvedStartBlock: String(session.resolvedStartBlock), linearHandoffBlock: String(session.linearHandoffBlock), maxParallelWorkers: String(session.maxParallelWorkers) });
const labelNames = ["trace_id", "resolved_start_block", "linear_handoff_block", "max_parallel_workers"];
const gauge = registerGauge("session", "Substreams Session", labelNames) as Gauge;
gauge.labels({
Expand All @@ -106,7 +105,6 @@ export function handleSession(session: SessionInit) {
}

export function handleManifest(emitter: BlockEmitter, moduleHash: string, options: RunOptions) {
logger.info("manifest", { moduleHash, manifest: options.manifest, substreamsEndpoint: options.substreamsEndpoint, finalBlocksOnly: options.finalBlocksOnly, productionMode: options.productionMode });
const labelNames = ["module_hash", "manifest", "output_module", "substreams_endpoint", "start_block_num", "stop_block_num", "production_mode", "final_blocks_only"];
const gauge = registerGauge("manifest", "Substreams manifest and sha256 hash of map module", labelNames) as Gauge;
gauge.labels({
Expand Down
25 changes: 12 additions & 13 deletions src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { health } from "./health.js";

export async function setup(options: RunOptions) {
// Configure logging with TSLog
if (String(options.verbose) === "true") logger.enable();
if (options.verbose) logger.enable();

// Download Substream package
const manifest = options.manifest;
Expand All @@ -24,20 +24,19 @@ export async function setup(options: RunOptions) {
const token = options.substreamsApiKey ?? options.substreamsApiToken;
if ( token?.includes(".")) throw new Error("JWT token is not longer supported, please use Substreams API key instead");

// append https if not present
if (baseUrl.match(/http/) === null) {
baseUrl = `https://${baseUrl}`;
}

// User parameters
const outputModule = options.moduleName;
const startBlockNum = options.startBlock as any;
const stopBlockNum = options.stopBlock as any;
const params = options.params;
const headers = options.headers;
const startCursor = options.cursor;
const productionMode = String(options.productionMode) === "true";
const finalBlocksOnly = String(options.finalBlocksOnly) === "true";
const stopBlockNum = options.stopBlock;
const { moduleName: outputModule, cursor: startCursor } = options; // renamed otions
const { params, headers, productionMode, finalBlocksOnly, plaintext } = options;

// append https/http if not present
if (!baseUrl.startsWith("http")) {
baseUrl = `${plaintext ? "http" : "https"}://${baseUrl}`;
}
if ( plaintext && baseUrl.startsWith("https")) {
throw new Error("--plaintext mode is not supported with https");
}

// Adding default headers
headers.set("X-User-Agent", "substreams-sink");
Expand Down

0 comments on commit da31d07

Please sign in to comment.