Skip to content

Claviz/bellboy

Repository files navigation

bellboy gh workflow codecov npm

Highly performant JavaScript data stream ETL engine.

How it works?

Bellboy streams input data row by row. Every row, in turn, goes through user-defined function where it can be transformed. When enough data is collected in batch, it is being loaded to destination.

Installation

Before install, make sure you are using latest version of Node.js.

npm install bellboy

If you will be using bellboy with the native msnodesqlv8 driver, add it as a dependency.

npm install msnodesqlv8

Example

This example shows how bellboy can extract rows from the Excel file, modify it on the fly, load to the Postgres database, move processed file to the other folder and process remaining files.

Just in five simple steps.

const bellboy = require("bellboy");
const fs = require("fs");
const path = require("path");

(async () => {
  const srcPath = `C:/source`;

  // 1. create a processor which will process
  // Excel files in the folder one by one
  const processor = new bellboy.ExcelProcessor({
    path: srcPath,
    hasHeader: true,
  });

  // 2. create a destination which will add a new 'status'
  // field to each row and load processed data into a Postgres database
  const destination = new bellboy.PostgresDestination({
    connection: {
      user: "user",
      password: "password",
      host: "localhost",
      database: "bellboy",
    },
    table: "stats",
    recordGenerator: async function* (record) {
      yield {
        ...record.raw.obj,
        status: "done",
      };
    },
  });

  // 3. create a job which will glue the processor and the destination together
  const job = new bellboy.Job(processor, [destination]);

  // 4. tell bellboy to move the file away as soon as it was processed
  job.on("endProcessingStream", async (file) => {
    const filePath = path.join(srcPath, file);
    const newFilePath = path.join(`./destination`, file);
    await fs.renameSync(filePath, newFilePath);
  });

  // 5. Log all error events
  job.onAny(async (eventName, ...args) => {
    if (eventName.includes("Error")) {
      console.log(args);
    }
  });

  // 6. run your job
  await job.run();
})();

Jobs

A job in bellboy is a relationship link between processor and destinations. When the job is run, data processing and loading mechanism will be started.

Initialization

To initialize a Job instance, pass processor and some destination(s).

const job = new bellboy.Job(
  processor_instance,
  [destination_instance],
  (job_options = {})
);

Options

  • reporters Reporter[]
    Array of reporters.
  • jobName string
    Optional user-defined name of the job. Can become handy if used in combination with extended events to distinguish events from different jobs.

Instance methods

  • run async function()
    Starts processing data.
  • on function(event, async function listener)
    Add specific event listener.
  • onAny function(async function listener)
    Add any event listener.
  • stop function(errorMessage?)
    Stops job execution. If errorMessage is passed, job will throw an error with this message.

Events and event listeners

Event listeners, which can be registered with job.on or job.onAny methods, allow you to listen to specific events in the job lifecycle and to interact with them.

  • When multiple listeners are registered for the same event, those added using .on will always be executed first, regardless of the order in which they were added compared to .onAny. This ensures that specific event listeners have priority over generic ones.
  • When multiple listeners are registered for a single event, those added by reporters will be executed first, followed by the order of registration for the remaining listeners.
  • Job always waits for the code inside a listener to complete.
  • Any error thrown inside a listener will be ignored and warning message will be printed out.
  • job.stop() method can be used inside a listener to stop job execution and throw an error if needed.
job.on(
  "startProcessing",
  async (processor: IProcessor, destinations: IDestination[]) => {
    // Job has started execution.
  }
);
job.on("startProcessingStream", async (...args: any) => {
  // Stream processing has been started.
  // Passed parameters may vary based on specific processor.
});
job.on("startProcessingRow", async (row: any) => {
  // Row has been received and is about to be processed inside `recordGenerator` method.
});
job.on("rowGenerated", async (destinationIndex: number, generatedRow: any) => {
  // Row has been generated using `recordGenerator` method.
});
job.on(
  "rowGenerationError",
  async (destinationIndex: number, row: any, error: any) => {
    // Record generation (`recordGenerator` method) has thrown an error.
  }
);
job.on('endProcessingRow', async ()) => {
    // Row has been processed.
});
job.on("transformingBatch", async (destinationIndex: number, rows: any[]) => {
  // Batch is about to be transformed inside `batchTransformer` method.
});
job.on(
  "transformedBatch",
  async (destinationIndex: number, transformedRows: any) => {
    // Batch has been transformed using`batchTransformer` method.
  }
);
job.on(
  "transformingBatchError",
  async (destinationIndex: number, rows: any[], error: any) => {
    // Batch transformation (`batchTransformer` method) has thrown an error.
  }
);
job.on("endTransformingBatch", async (destinationIndex: number) => {
  // Batch has been transformed.
});
job.on("loadingBatch", async (destinationIndex: number, data: any[]) => {
  // Batch is about to be loaded into destination.
});
job.on(
  "loadedBatch",
  async (destinationIndex: number, data: any[], result: any) => {
    // Batch has been loaded into destination.
  }
);
job.on(
  "loadingBatchError",
  async (destinationIndex: number, data: any[], error: any) => {
    // Batch load has failed.
  }
);
job.on("endLoadingBatch", async (destinationIndex: number) => {
  // Batch load has finished .
});
job.on("endProcessingStream", async (...args: any) => {
  // Stream processing has finished.
  // Passed parameters may vary based on specific processor.
});
job.on("processingError", async (error: any) => {
  // Unexpected error has occured.
});
job.on("endProcessing", async () => {
  // Job has finished execution.
});
Listening for any event

Special listener can be registered using job.onAny method which will listen for any previously mentioned event.

job.onAny(async (eventName: string, ...args: any) => {
  // An event has been fired.
});
Extended information from event

Sometimes more information about event is needed, especially if you are building custom reporter to log or trace fired events.

This information can be obtained by registering an async function as a third parameter with job.on method or as a second parameter with job.onAny method.

For example,

job.on("rowGenerated", undefined, async (event: IBellboyEvent) => {
  // Row has been generated using `recordGenerator` method.
  console.log(
    `${event.jobName} has generated row for #${event.eventArguments.destinationIndex} destination`
  );
});

or

job.onAny(undefined, async (event: IBellboyEvent) => {
  console.log(`${event.jobName} has fired ${event.jobEvent}`);
});

Extended event (IBellboyEvent) fields

  • eventName string
    Name of the event.
  • eventArguments any
    Arguments of the event.
  • jobName string?
    User-defined name of the job.
  • jobId string
    Unique ID of the job.
  • eventId string
    Unique ID of the event.
  • timestamp number
    High resolution timestamp of the event.
  • jobStopped boolean
    Whether the job is stopped or not.

Processors

Each processor in bellboy is a class which has a single responsibility of processing data of specific type -

Options

  • rowLimit number
    Number of records to be processed before stopping processor. If not specified or 0 is passed, all records will be processed.

MqttProcessor

Usage examples

Listens for messages and processes them one by one. It also handles backpressure by queuing messages, so all messages can be eventually processed.

Options

HttpProcessor

Usage examples

Processes data received from a HTTP call. Can process json, xml as well as delimited data. Can handle pagination by using nextRequest function.

For delimited data produces rows described here.

Options

  • Processor options
  • connection object required
    Options from axios library.
  • dataFormat delimited | json | xml required
  • rowSeparator string required for delimited
  • delimiter string only for delimited
    A symbol separating fields of the row.
  • hasHeader boolean only for delimited
    If true, first row will be processed as a header.
  • qualifier string only for delimited
    Symbol placed around a field to signify that it is the same field.
  • encoding string only for delimited
  • jsonPath RegExp | string
    Path to the array to be streamed. This option is described in detail inside JsonProcessor section.
  • saxOptions object only for xml
    Options for XML streaming as described in sax-stream library.
  • authorizationRequest object
    • connection
      Options from axios library.
    • applyTo
      Where extracted field should be applied. Whether header or query.
    • sourceField
      Name of the field from which value of authorization token will be extracted.
    • destinationField
      Name of the field which will be applied to header or query using applyTo option.
    • prefix
      Custom prefix to apply to the token.
  • nextRequest async function(header)
    Function which must return connection for the next request or null if the next request is not needed.
const processor = new bellboy.HttpProcessor({
  nextRequest: async function () {
    if (currentPage < pageCount) {
      return {
        ...connection,
        url: `${url}&current_page=${currentPage + 1}`,
      };
    }
    return null;
  },
  // ...
});

Directory processors

Used for streaming text data from files in directory. There are currently four types of directory processors - ExcelProcessor, JsonProcessor, DelimitedProcessor and TailProcessor. Such processors search for the files in the source directory and process them one by one.

File name (file) and full file path (filePath) parameters will be passed to startProcessingStream event.

Options

  • Processor options
  • path string
    Path to the directory where files are located. Current directory by default.
  • filePattern RegExp
    Regex pattern for the files to be processed. If not specified, all files in the directory will be matched.
  • files string[]
    Array of file names. If not specified, all files in the directory will be matched against filePattern regex and processed in alphabetical order.

ExcelProcessor

Usage examples

Processes XLSX files in the directory.

Options

  • Directory processor options
  • hasHeader boolean | number
    Whether the worksheet has a header or not, false by default. 0-based row location can be passed to this option if header is not located on the first row.
  • fillMergedCells boolean
    If true, merged cells wil have the same value (by default, only the first cell of merged cells is filled with value).
    Warning! Enabling this feature may increase streaming time because file must be processed to detect merged cells before actual stream. false by default.
  • ignoreEmpty boolean
    Whether to ignore empty rows or not, true by default.
  • sheets (string | number)[] | async function(sheets)
    Array of sheet names and/or sheet indexes or async function, which accepts array of all sheets and must return another array of sheet names that needs to be processed. If not specified, first sheet will be processed.
  • encoding string
    XLSX file encoding.
const processor = new bellboy.ExcelProcessor({
  // process last sheet
  sheets: async (sheets) => {
    const sheet = sheets[sheets.length - 1];
    return [sheet.name];
  },
  // ...
});

Produced row

To see how processed row will look like, proceed to xlstream library documentation which is used for Excel processing.

JsonProcessor

Processes JSON files in the directory.

Options

  • Directory processor options
  • jsonPath RegExp | string
    Path to the array to be streamed. Internally when JSON is streamed, current path is joined together using . as separator and then tested against provided regular expression. If not specified, a root array will be streamed. As an example, if you have this JSON object:
    { "animals": { "dogs": [ "pug", "bulldog", "poodle" ] } }
    And want to stream dogs array, path you will need to use is /animals.dogs.(\d+)/ if using RegExp as jsonPath and animals.dogs.(\\d+) if a string is used.
    (\d+) is used here because each index of the array is a number.

DelimitedProcessor

Usage examples

Processes files with delimited data in the directory.

Options

  • Directory processor options
  • rowSeparator string required
  • delimiter string
    A symbol separating fields of the row.
  • hasHeader boolean
    If true, first row will be processed as a header.
  • qualifier string
    Symbol placed around a field to signify that it is the same field.
  • encoding string only for delimited

Produced row

  • header string[]
    If hasHeader is true, first row will appear here.
  • arr string
    Row split by delimiter and qualifier.
  • obj string
    If hasHeader is true, object with header elements as keys will appear here.
  • row string
    Received raw row.

TailProcessor

Usage examples

Watches for file changes and outputs last part of file as soon as new lines are added to the file.

Options

  • Directory processor options
  • fromBeginning boolean
    In addition to emitting new lines, emits lines from the beginning of file, false by default.

Produced row

  • file string
    Name of the file the data came from.
  • data string

PostgresProcessor

Processes a PostgreSQL SELECT query row by row.

Options

  • Processor options
  • query string required
    Query to execute.
  • connection object required
    • user
    • password
    • host
    • port
    • database
    • schema

MySqlProcessor

Processes a MySQL SELECT query row by row.

Options

  • Processor options
  • query string required
    Query to execute.
  • connection object required
    • user
    • password
    • host
    • port
    • database

FirebirdProcessor

Processes a Firebird SELECT query row by row.

Options

  • Processor options
  • query string required
    Query to execute.
  • connection object required
    • user
    • password
    • host
    • database

MssqlProcessor

Processes a MSSQL SELECT query row by row.

Options

  • Processor options
  • query string required
    Query to execute.
  • connection object required
    • user
    • password
    • server
    • port
    • database
    • driver
      Optional mssql TDS driver; defaults to the pure JavaScript Tedious driver.

Usage

Here is an example of how to configure MssqlProcessor with a native TDS driver instead of the default pure JavasScript Tedious driver.

const nativeDriver: ITdsDriver = await import("mssql/msnodesqlV8");
const connection: IMssqlDbConnection = {
  user: "user",
  password: "password",
  server: "server",
  database: "database",
  driver: nativeDriver,
};
const source = new MssqlProcessor({
  connection,
  query: "select * from orders",
});

In previous versions of bellboy, connection.driver was a string parameter.

More usage examples

DynamicProcessor

Processor which generates records on the fly. Can be used to define custom data processors.

Options

// processor which generates 10 records dynamically
const processor = new bellboy.DynamicProcessor({
  generator: async function* () {
    for (let i = 0; i < 10; i++) {
      yield i;
    }
  },
});

Destinations

Every job can have as many destinations (outputs) as needed. For example, one job can load processed data into a database, log this data to stdout and post it by HTTP simultaneously.

Options

  • disableLoad boolean
    If true, no data will be loaded to the destination. In combination with reporters, this option can become handy during testing process.
  • batchSize number
    Number of records to be processed before loading them to the destination. If not specified or 0 is passed, all records will be processed.
  • recordGenerator async generator function(row)
    Function which receives produced row by processor and can apply transformations to it.
  • batchTransformer async function(rows)
    Function which receives whole batch of rows. This function is being called after row count reaches batchSize. Data is being loaded to destination immediately after this function has been executed.

StdoutDestination

Logs out all data to stdout (console).

Options

HttpDestination

Usage examples

Puts processed data one by one in body and executes specified HTTP request.

Options

  • General destination options
  • request required
    Options from axios library.
  • authorizationRequest object
    • connection
      Options from axios library.
    • applyTo
      Where extracted field should be applied. Whether header or query.
    • sourceField
      Name of the field from which value of authorization token will be extracted.
    • destinationField
      Name of the field which will be applied to header or query using applyTo option.
    • prefix
      Custom prefix to apply to the token.

PostgresDestination

Usage examples

Inserts data to PostgreSQL.

Options

  • General destination options
  • table string required
    Table name.
  • upsertConstraints string[]
    If specified, UPSERT command will be executed based on provided constraints.
  • connection object required
    • user
    • password
    • host
    • database
    • schema

MySqlDestination

Usage examples

Inserts data to MySQL.

Options

  • General destination options
  • table string required
    Table name.
  • connection object required
    • user
    • password
    • host
    • database
  • useSourceColumns boolean
    If true, only the columns in the source data will be used for data load. Default is false, using all destination table columns.
  • postLoadQuery string
    A query which will be executed after the data load and before connection is closed. Result will be available in the result object of the loadedBatch event.

MssqlDestination

Usage examples

Inserts data to MSSQL.

Options

  • General destination options
  • table string required
    Table name.
  • connection object required
    • user
    • password
    • server
    • database
    • driver
      Optional mssql TDS driver; defaults to the pure JavaScript Tedious driver.

Usage

Here is an example of how to configure MssqlDestination with a native TDS driver instead of the default pure JavasScript Tedious driver.

const nativeDriver: ITdsDriver = await import("mssql/msnodesqlV8");
const connection: IMssqlDbConnection = {
  user: "user",
  password: "password",
  server: "server",
  database: "database",
  driver: nativeDriver,
};
const sink = new MssqlDestination({
  connection,
  table: "orders",
  batchSize: 1000,
});

More usage examples

Extendability

New processors and destinations can be made by extending existing ones. Feel free to make a pull request if you create something interesting.

Creating a new processor

Processor class examples

To create a new processor, you must extend Processor class and implement async process function. This function accepts one parameter:

  • processStream async function(readStream, ...args) required
    Callback function which accepts Readable stream. After calling this function, job instance will handle passed stream internally. Passed parameters (args) will be emitted with startProcessingStream event during job execution.
class CustomProcessor extends bellboy.Processor {
  async process(processStream) {
    // await processStream(readStream, 'hello', 'world');
  }
}

Creating a new destination

Destination class examples

To create a new destination, you must extend Destination class and implement async loadBatch function. This function accepts one parameter:

  • data any[] required
    Array of some processed data that needs to be loaded.
class CustomDestination extends bellboy.Destination {
  async loadBatch(data) {
    console.log(data);
  }
}

Creating a new reporter

Official stdout reporter

Reporter is a job wrapper which can operate with job instance (for example, listen to events using job on method). To create a new reporter, you must extend Reporter class and implement report function, which will be executed during job instance initialization. Reporter event listeners (on, onAny) are added before any other user-defined listeners. This function accepts one parameter:

  • job Job required
    Job instance
class CustomReporter extends bellboy.Reporter {
  report(job) {
    job.on("startProcessing", undefined, async ({ jobName }) => {
      console.log(`Job ${jobName} has been started.`);
    });
  }
}

Testing

Tests can be run by using docker compose up --abort-on-container-exit --exit-code-from test --build test command.