diff --git a/README.md b/README.md index 37ae93a..b7fa0eb 100644 --- a/README.md +++ b/README.md @@ -1,99 +1,36 @@ > **Note** > The AMRC Connectivity Stack is an open-source implementation of the AMRC's [Factory+ Framework](https://factoryplus.app.amrc.co.uk/). -This is a NodeJS library for writing clients for Factory+. It was used extensively when building the AMRC Connectivity Stack. +This is a NodeJS library for writing clients for Factory+. It was used +extensively when building the AMRC Connectivity Stack. This library is +now being cut down into a compatibility shim for existing code; new code +trying to consume Factory+ services should use +`@amrc-factoryplus/service-client` instead. -## Getting Started - -Because this library has native code dependencies, the easiest way to use it from your code is to base your container image on the Docker images that we provide. - -Start a new project by running - -```bash -npm init -y -``` - -then follow the instructions, creating a new `Dockerfile` instead of updating an existing one. - -### Updating your Dockerfile - -If your code currently has a basic Node.js Dockerfile which looks like this: - -```dockerfile -FROM node:lts-alpine - -RUN mkdir -p /home/node/app/node_modules && chown -R node:node /home/node/app -WORKDIR /home/node/app - -COPY package*.json ./ -USER node -RUN npm install --save=false - -COPY --chown=node . . - -CMD npm start -``` - -then you need to replace it with one which looks like this, setting `utility_ver` to the desired version: - -```dockerfile -ARG utility_prefix=ghcr.io/amrc-factoryplus/utilities -ARG utility_ver=v1.0.6 +## Compatibility -FROM ${utility_prefix}-build:${utility_ver} AS build +Version 2.0.0 of this package has broken backwards compatibility as +follows: -# Install the node application on the build container where we can -# compile the native modules. -RUN install -d -o node -g node /home/node/app -WORKDIR /home/node/app -USER node -COPY package*.json ./ -RUN npm install --save=false -COPY . . +* The method `basic_sparkplug_node` on the MQTT interface now returns a + Promise and must be awaited. +* The Debug class no longer reads `VERBOSE` from the environment, the + option must be supplied explicitly. +* The WebAPI class now requires a `verbose` option to enable logging. -FROM ${utility_prefix}-run:${utility_ver} +Further versions in the 2.x series are intended only to support the +existing ACS codebase and may break backwards compat further as needed +to migrate the code out into other packages. If you need functionality +from this package which is not provided by +`@amrc-factoryplus/service-client`, please speak to us first. -# Copy across from the build container. -WORKDIR /home/node/app -COPY --from=build --chown=root:root /home/node/app ./ - -USER node -CMD npm start -``` - -This makes the following changes from the original: - -* The build is now multi-stage, because we need compilers and so on for the build stage which we don't need at runtime. - This means that we do most of the building in one container, and then just copy the results across into a fresh - container at the end. - -* Both stages of the build are based on the Docker images provided for this library. These images include the tools - needed to build the library and the native libraries needed to run it. They also set up npm to reference the NPM - registry for `@amrc-factoryplus` packages. - -* The build stage runs the build as user `node`, but the code is copied across to the run stage owned by `root`. This - improves security but may cause problems if your app assumes it can write to its working directory. This is, in - general, a bad Idea for a Docker container (you should be writing to a volume probably), but if necessary the commands - can be adjusted to change the permissions. - -If you have a more complicated Dockerfile you will need to adjust this appropriately. Try to do as much work as possible -in the build container, and then just copy the results across into the runtime container. This will make the final -images smaller. - -### Adding to `package.json` - -You now need to add the following entry to your `package.json`: - -``` -{ - "dependencies": { - "@amrc-factoryplus/utilities": "^1.0.0" - } -} -``` +## Getting Started -The library will install on Windows; however we do not have access to the GSSAPI libraries on Windows so most of the -functionality will not work. However this allows `npm update` to work at least. +This library has native code dependencies: it requires GSSAPI libraries +and (for ACS use) a Postgres library built with GSSAPI support. The most +straightforward way to meet these is to build a Docker image using the +base images from the ACS repository. See that repository for example +Dockerfiles. ## Using the package @@ -124,10 +61,12 @@ If you are using Typescript then the ESM import should work fine. There are curr ### Third-party libraries ```js -import { MQTT, GSS, Pg, SpB, fetch } from "@amrc-factoryplus/utilities"; +import { MQTT, GSS, Pg, SpB } from "@amrc-factoryplus/utilities"; ``` -These are re-exports of third party modules. They are re-exported here partly to provide protection from future changes to the third-party modules, and partly to work around bugs or problems with importing. +These are re-exports of third party modules. They are re-exported here +partly to provide protection from future changes to the third-party +modules, and partly to work around bugs or problems with importing. - [Full Third-Party Library Documentation](./docs/deps.md) @@ -205,17 +144,18 @@ Classes useful in implementing an HTTP service confirming to the Factory+ spec. - [Full Web API Documentation](./docs/webapi.md) -### Deprecated APIs - -```js -import { debug, secrets, gss_mqtt } from "@amrc-factoryplus/utilities"; -``` +### Removed APIs -These are deprecated APIs. +As of version 2.0.0 these exports have been removed: * `debug` has been replaced by the Debug object. -* `secrets` provides support for reading from Docker secrets; since moving to Kubernetes this has been redundant. -* `gss_mqtt` connects to an MQTT server with GSSAPI authentication. It is better to use a ServiceClient instead, as this will discover the MQTT server via the Directory. +* `secrets` provided support for reading from Docker secrets; since + moving to Kubernetes this has been redundant. +* `gss_mqtt` connected to an MQTT server with GSSAPI authentication. It + is better to use a ServiceClient instead, as this will discover the + MQTT server via the Directory. If it is really necessary the MQTT URL + can be overridden in the ServiceClient configuration. +* `fetch` has been removed as an export. ### Coding Style @@ -225,4 +165,4 @@ These are deprecated APIs. ## Contributing -Development practice follows [GitHub flow](https://guides.github.com/introduction/flow/). \ No newline at end of file +Development practice follows [GitHub flow](https://guides.github.com/introduction/flow/). diff --git a/lib/db.js b/lib/db.js index 9233704..b0b5c1f 100644 --- a/lib/db.js +++ b/lib/db.js @@ -3,19 +3,19 @@ * Copyright 2021 AMRC */ -import timers from "node:timers/promises"; +import timers from "timers/promises"; + +import { Debug } from "@amrc-factoryplus/service-client"; import { Pg } from "./deps.js"; -import { Debug } from "./debug.js"; -const debug = new Debug(); +const debug = new Debug({ verbose: process.env.VERBOSE }); export class DB { constructor (opts) { this.isolation = opts.isolation ?? "read committed"; this.readonly = opts.readonly; this.deferrable = opts.deferrable; - this.verbose = opts.verbose; this.version = opts.version; this.pool = new Pg.Pool(); @@ -69,7 +69,6 @@ export class DB { const isolation = opts.isolation ?? this.isolation; const readonly = opts.readonly ?? this.readonly; const deferrable = opts.deferrable ?? this.deferrable; - const verbose = opts.verbose ?? this.verbose; const begin = [ "begin", diff --git a/lib/debug.js b/lib/debug.js index 079a1c6..a26cbef 100644 --- a/lib/debug.js +++ b/lib/debug.js @@ -4,7 +4,7 @@ * Copyright 2022 AMRC. */ -import util from "node:util"; +import util from "util"; export class Debug { constructor (opts) { diff --git a/lib/deps.js b/lib/deps.js index ada8bb8..dbf1871 100644 --- a/lib/deps.js +++ b/lib/deps.js @@ -11,17 +11,6 @@ import { createRequire } from "module"; -/* Annoying re-export syntax... If you find yourself having to document - * 'you can't do `export Foo from "foo"`' then maybe you should design - * the syntax so you can... ? */ -export { default as MQTT } from "mqtt"; - -/* No GSS on Windows. */ -export const GSS = await - import("gssapi.js") - .then(mod => mod.default) - .catch(e => undefined); - /* The 'pg' module is not properly ESM-compatible. While pure-JS use can * be accomplished with `import Pg from "pg"` this does not provide * access to the native (libpq) bindings. They are only available via @@ -30,19 +19,3 @@ const require = createRequire(import.meta.url); const pg_cjs = require("pg"); export const Pg = pg_cjs.native ?? pg_cjs; -import sparkplug_payload from "sparkplug-payload"; -export const SpB = sparkplug_payload.get("spBv1.0"); - -/* We have to go round the houses a bit here... */ -import got from "got"; -import { createFetch } from "got-fetch"; - -const configured_got = got.extend({ - cacheOptions: { shared: true }, -}); -const got_fetch = createFetch(configured_got); - -/* Bug fix */ -export function fetch (url, opts) { - return got_fetch(`${url}`, opts); -} diff --git a/lib/index.js b/lib/index.js index 209aed3..5f58136 100644 --- a/lib/index.js +++ b/lib/index.js @@ -4,19 +4,13 @@ * Copyright 2022 AMRC. */ +export * from "@amrc-factoryplus/service-client"; + export * from "./db.js"; -export * from "./debug.js"; export * from "./deps.js"; -export * as secrets from "./secrets.js"; -export * from "./service-client.js"; export * from "./sparkplug/util.js"; export * from "./util.js"; -export * as UUIDs from "./uuids.js"; export * from "./webapi.js"; -export * from "./service/service-interface.js" - -/* Compat export; better is to go via ServiceClient. */ -export { gss_mqtt } from "./service/mqtt.js"; import { pkgVersion } from "./util.js"; diff --git a/lib/secrets.js b/lib/secrets.js deleted file mode 100644 index 7141567..0000000 --- a/lib/secrets.js +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * Read Docker secrets from node. - * Copyright 2021 AMRC. - */ - -import fs from "node:fs"; - -export const read = function (name) { - try { - return fs.readFileSync(`/run/secrets/${name}`); - } - catch (err) { - if (err.code != "ENOENT") { - console.error("Error reading Docker secret [%s]: %o", name, err); - } - return null; - } -}; - -export const readUTF8 = function (name) { - const buf = read(name); - if (buf == null) return null; - - return buf.toString("utf8").replace(/\r?\n$/, ""); -}; - -/* Read a value from the environment if it exists. Otherwise append - * _SECRET and read the value as a secret. - */ -export const env = function (key) { - if (key in process.env) - return process.env[key]; - - const secret = `${key}_SECRET`; - if (secret in process.env) - return readUTF8(process.env[secret]); - - throw `No value available for ${key}!`; -} diff --git a/lib/service-client.js b/lib/service-client.js deleted file mode 100644 index c99acc5..0000000 --- a/lib/service-client.js +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * Service client library. - * Copyright 2022 AMRC - */ - -import { Debug } from "./debug.js"; -import { Service } from "./uuids.js"; - -import Auth from "./service/auth.js"; -import CmdEsc from "./service/cmdesc.js"; -import ConfigDB from "./service/configdb.js"; -import Directory from "./service/directory.js"; -import Discovery from "./service/discovery.js"; -import Fetch from "./service/fetch.js"; -import Git from "./service/git.js"; -import MQTTInterface from "./service/mqtt.js"; - -function opts_from_env (env) { - const opts = ` - AUTHN_URL - CONFIGDB_URL - DIRECTORY_URL - MQTT_URL - ROOT_PRINCIPAL - SERVICE_USERNAME:username - SERVICE_PASSWORD:password - VERBOSE - ` .split(/\s+/) - .map(v => v.includes(":") ? v.split(":") : [v, v.toLowerCase()]) - .filter(v => v[0] in env) - .map(v => [v[1], env[v[0]]]); - return Object.fromEntries(opts); -} - -export class ServiceClient { - constructor (opts) { - opts ??= {}; - this.opts = "env" in opts - ? { ...opts_from_env(opts.env), ...opts } - : opts; - delete this.opts.env; - - this.debug = new Debug(opts); - } - - async init () { - return this; - } - - static define_interfaces (...interfaces) { - for (const [name, klass, methlist] of interfaces) { - Object.defineProperty(this.prototype, name, { - configurable: true, - get () { return this[`_${name}`] ??= new klass(this); }, - }); - - const meths = methlist.split(/\s+/).filter(s => s.length); - for (const meth of meths) { - const [mine, theirs] = meth.split(":"); - Object.defineProperty(this.prototype, mine, { - configurable: true, - writable: true, - value (...args) { - return this[name][theirs ?? mine](...args); - }, - }); - } - } - } -} - -/* The methods delegeted here from the ServiceClient should be - * considered backwards-compatible shims. Future service methods will - * mostly be defined only on the service interface. */ -ServiceClient.define_interfaces( - ["Auth", Auth, `check_acl fetch_acl resolve_principal`], - ["CmdEsc", CmdEsc, ``], - ["ConfigDB", ConfigDB, `fetch_configdb:get_config`], - ["Directory", Directory, ``], - ["Discovery", Discovery, - `set_service_url set_service_discovery service_url service_urls`], - ["Fetch", Fetch, `fetch`], - ["Git", Git, ``], - ["MQTT", MQTTInterface, `mqtt_client`], -); diff --git a/lib/service/auth.js b/lib/service/auth.js deleted file mode 100644 index 9722f03..0000000 --- a/lib/service/auth.js +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * Auth service interface. - * Copyright 2022 AMRC. - */ - -import semver from "semver"; - -import { Address } from "../sparkplug/util.js"; -import { App, Service, Null as Null_UUID } from "../uuids.js"; - -import { ServiceInterface } from "./service-interface.js"; - -export default class Auth extends ServiceInterface { - constructor (fplus) { - super(fplus); - - this.service = Service.Authentication; - this.root_principal = fplus.opts.root_principal; - this.permission_group = fplus.opts.permission_group; - } - - /* Verifies if principal has permission on target. If 'wild' is true - * then the null UUID in an ACE will match any target. */ - async check_acl (principal, permission, target, wild) { - const acl = await this.fetch_acl(principal, this.permission_group); - return acl(permission, target, wild); - } - - /* Takes a principal and a permission group. Returns a function - * which checks a particular permission and target against the - * returned ACL. */ - async fetch_acl (princ_req, group) { - const [type, principal] = - typeof(princ_req) == "string" ? ["kerberos", princ_req] - : "kerberos" in princ_req ? ["kerberos", princ_req.kerberos] - : "uuid" in princ_req ? ["uuid", princ_req.uuid] - : [null, null]; - if (type == null) { - this.debug.log("acl", - "Unrecognised principal request: %o", princ_req); - return () => false; - } - const by_uuid = type == "uuid"; - - if (this.root_principal - && type == "kerberos" - && principal == this.root_principal) - return () => true; - - const res = await this.fplus.fetch({ - service: Service.Authentication, - url: "/authz/acl", - query: { principal, permission: group, "by-uuid": by_uuid }, - }); - if (!res.ok) { - this.debug.log("acl", `Failed to read ACL for ${principal}: ${res.status}`); - return () => false; - } - const acl = await res.json(); - this.debug.log("acl", "Got ACL for %s: %o", principal, acl); - - return (permission, target, wild) => - acl.some(ace => - ace.permission == permission - && (ace.target == target - || (wild && ace.target == Null_UUID))); - } - - /* Resolve a principal to a UUID. Query is an object with a single - * key; currently this must be 'kerberos' to search for principals - * by Kerberos principal name. */ - async resolve_principal (query) { - const res = await this.fplus.fetch({ - service: Service.Authentication, - url: "/authz/principal/find", - query, - }); - if (!res.ok) { - this.debug.log("princ", - "Failed to resolve %o: %s", query, res.status); - return null; - } - const uuid = await res.json(); - this.debug.log("princ", "Resolved %o to %s", query, uuid); - return uuid; - } - - async _resolve_by_addr (address) { - const cdb = this.fplus.ConfigDB; - - /* Check for a version of the ConfigDB that can search for - * not-existing keys. Otherwise we will get false results. */ - const ping = await cdb.ping(); - if (!ping || !semver.satisfies(ping.version, ">=1.7 || =1.7.0-bmz")) { - this.debug.log("princ", - `ConfigDB is too old to search for addresses (${ping.version})`); - return; - } - - const uuids = await cdb.search(App.SparkplugAddress, { - group_id: address.group, - node_id: address.node, - device_id: undefined, - }); - if (uuids.length == 1) return uuids; - if (uuids.length) - this.debug.log("princ", - "Multiple results resolving Sparkplug address %s", - address); - return; - } - - /** Fetch the different identities for a principal. - * - * With the current version of the auth service Sparkplug identities - * are stored in the ConfigDB and must be resolved from there. This - * means more queries than are strictly necessary. - * - * @param kind Specifies the type of identifier we have already: - * "kerberos", "uuid" or "sparkplug". If omitted this will look up - * our client identities. - * @param identifier The identifier we have. Kerberos and UUID - * identifiers must be supplied as a string. Sparkplug identifiers - * can be either a string or an Address. - * @return An object with one or more of those keys giving all the - * principal's identities we have access to, or null if we cannot - * resolve the principal (no permission or doesn't exist). - */ - async find_principal (kind, identifier) { - const uuid = - kind == undefined ? await this.resolve_principal() - : kind == "uuid" ? identifier - : kind == "kerberos" - ? await this.resolve_principal({kerberos: identifier}) - : kind == "sparkplug" - ? await this._resolve_by_addr(identifier) - : undefined; - - if (uuid == undefined) return; - - const [st, ids] = await this.fetch(`/authz/principal/${uuid}`); - if (st != 200) { - this.debug.log("princ", "Failed to fetch principal %s: %s", - uuid, st); - return; - } - - const sp = await this.fplus.ConfigDB - .get_config(App.SparkplugAddress, uuid); - if (sp) - ids.sparkplug = new Address(sp.group_id, sp.node_id); - - return ids; - } - - async add_principal (uuid, kerberos) { - const [st] = await this.fetch({ - method: "POST", - url: "authz/principal", - body: { uuid, kerberos }, - }); - if (st != 204) - this.throw(`Can't create principal ${kerberos}`, st); - } - - /* Returns true if we deleted a principal, false if the principal - * didn't exist. */ - async delete_principal (uuid) { - const [st] = await this.fetch({ - method: "DELETE", - url: `authz/principal/${uuid}`, - }); - switch (st) { - case 204: return true; - case 404: return false; - default: - this.throw(`Can't delete principal ${uuid}`, st); - } - } - - async create_principal (klass, kerberos, name) { - const cdb = this.fplus.ConfigDB; - const uuid = await cdb.create_object(klass); - try { - await this.add_principal(uuid, kerberos); - } - catch (e) { - await cdb.put_config(App.Info, uuid, { - name, deleted: true, - }); - throw e; - } - if (name) - await cdb.put_config(App.Info, uuid, { name }); - return uuid; - } - - /* This is of limited use; you can only call it if you have wildcard - * Manage_ACL permission. So a root-equivalent administrator. */ - async get_all_ace () { - const [st, aces] = await this.fetch("authz/ace"); - if (st != 200) - this.throw(`Can't read ACEs`, st); - return aces; - } - - async _edit_ace (spec) { - const [st] = await this.fetch({ - method: "POST", - url: "authz/ace", - body: spec, - }); - if (st != 204) - this.throw(`Can't ${action} ACE`, st); - } - - /* XXX This is a bad API. These should be HTTP methods rather than a - * generic POST request. */ - async add_ace (principal, permission, target) { - await this._edit_ace({ - action: "add", - principal, permission, target, - }); - } - - async delete_ace (principal, permission, target) { - await this._edit_ace({ - action: "delete", - principal, permission, target, - }); - } - - async add_to_group (group, member) { - const [st] = await this.fetch({ - method: "PUT", - url: `authz/group/${group}/${member}`, - }); - if (st != 204) - this.throw(`Can't add ${member} to group ${group}`, st); - } - - async remove_from_group (group, member) { - const [st] = await this.fetch({ - method: "DELETE", - url: `authz/group/${group}/${member}`, - }); - if (st != 204) - this.throw(`Can't remove ${member} from group ${group}`, st); - } -} diff --git a/lib/service/cmdesc.js b/lib/service/cmdesc.js deleted file mode 100644 index e85ddf5..0000000 --- a/lib/service/cmdesc.js +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * Command Escalation service interface. - * Copyright 2023 AMRC. - */ - -import { Address } from "../sparkplug/util.js"; -import { Service } from "../uuids.js"; - -import { ServiceInterface } from "./service-interface.js"; - -export default class CmdEsc extends ServiceInterface { - constructor (fplus) { - super(fplus); - this.service = Service.Command_Escalation; - this.log = this.debug.log.bind(this.debug, "cmdesc"); - } - - async request_cmd ({ address, name, type, value }) { - const [st] = await this.fetch({ - method: "POST", - url: `v1/address/${address}`, - body: { name, type, value }, - }); - if (st != 200) - this.throw(`Can't set metric ${name} of ${address}`, st); - } - - async rebirth (address) { - const ctrl = address.isDevice() ? "Device Control" : "Node Control"; - await this.request_cmd({ - address: address, - name: `${ctrl}/Rebirth`, - type: "Boolean", - value: true, - }); - } -} diff --git a/lib/service/configdb-watcher.js b/lib/service/configdb-watcher.js deleted file mode 100644 index 39558ae..0000000 --- a/lib/service/configdb-watcher.js +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * ConfigDB service interface. - * Copyright 2022 AMRC. - * - * This class is loaded dynamically by service/configdb.js to avoid a - * required dependency on rxjs. - */ - -import rx from "rxjs"; - -export class ConfigDBWatcher { - constructor (configdb, device) { - this.configdb = configdb; - this.device = device; - } - - /* For now this returns an Observable. It would be much - * more useful to return an Observable identifying which - * object has had its config entry changed but the ConfigDB MQTT - * interface doesn't currently expose that information. */ - application (uuid) { - this._app ??= this.device - .metric("Last_Changed/Application") - .pipe(rx.share()); - - return this._app.pipe( - rx.filter(u => u == uuid), - rx.map(u => undefined)); - } - - watch_config (app, obj) { - return this.application(app).pipe( - rx.startWith(undefined), - rx.switchMap(() => this.configdb.get_config(app, obj)), - ); - } - - watch_search (app, query) { - return this.application(app).pipe( - rx.startWith(undefined), - rx.switchMap(() => this.configdb.search({ app, query })), - ); - } -} diff --git a/lib/service/configdb.js b/lib/service/configdb.js deleted file mode 100644 index 301a0de..0000000 --- a/lib/service/configdb.js +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * ConfigDB service interface. - * Copyright 2022 AMRC. - */ - -import format from "util"; - -import { App, Service, Null as Null_UUID } from "../uuids.js"; - -import { ServiceInterface } from "./service-interface.js"; - -export default class ConfigDB extends ServiceInterface { - constructor (fplus) { - super(fplus); - this.service = Service.ConfigDB; - this.log = this.debug.log.bind(this.debug, "configdb"); - } - - async get_config_with_etag (app, obj) { - const [st, json, etag] = await this.fetch(`v1/app/${app}/object/${obj}`); - if (st == 404) return []; - if (st != 200) - this.throw(`Can't get ${app} for ${obj}`, st); - return [json, etag]; - } - - async get_config (app, obj) { - const [json] = await this.get_config_with_etag(app, obj); - return json; - } - - async get_config_etag (app, obj) { - const [st, , etag] = await this.fetch({ - method: "HEAD", - url: `v1/app/${app}/object/${obj}`, - }); - if (st == 404) return; - if (st != 200) - this.throw(`Can't get ${app} for ${obj}`, st); - return etag; - } - - async put_config (app, obj, json) { - const [st] = await this.fetch({ - method: "PUT", - url: `/v1/app/${app}/object/${obj}`, - body: json, - }); - if (st == 204 || st == 201) return; - this.throw(`Can't set ${app} for ${obj}`, st); - } - - async delete_config (app, obj) { - const [st] = await this.fetch({ - method: "DELETE", - url: `/v1/app/${app}/object/${obj}`, - }); - if (st == 204) return; - this.throw(`Can't remove ${app} for ${obj}`, st); - } - - async patch_config (app, obj, type, patch) { - if (type != "merge") - this.throw(`Only merge-patch supported`); - - const [st] = await this.fetch({ - method: "PATCH", - url: `/v1/app/${app}/object/${obj}`, - body: patch, - content_type: "application/merge-patch+json", - }); - if (st == 204) return; - this.throw(`Can't patch ${app} for ${obj}`, st); - } - - async list_configs (app) { - const [st, json] = await this.fetch(`v1/app/${app}/object`); - if (st == 404) return; - if (st != 200) - this.throw(`Can't list objects for ${app}`, st); - return json; - } - - /** Create a new object. - * - * @param klass The class of the new object. Required. - * @param obj The UUID of the new object. Optional. - * @param excl Fail if the object already existed. - * @return The UUID of the new object. - */ - async create_object (klass, obj, excl) { - const [st, json] = await this.fetch({ - method: "POST", - url: `/v1/object`, - body: { - "class": klass, - uuid: obj, - }, - }); - if (st == 200 && excl) - this.throw(`Exclusive create of ${obj} failed`); - if (st == 201 || st == 200) - return json.uuid; - if (obj) - this.throw(`Creating ${obj} failed`, st); - else - this.throw(`Creating new ${klass} failed`, st); - } - - /* This should not normally be used by automated processes */ - async delete_object (obj) { - const [st] = await this.fetch({ - method: "DELETE", - url: `/v1/object/${obj}`, - }); - if (st == 204) return; - this.throw(`Deleting ${obj} failed`, st); - } - - /* This is currently the recommended way to 'delete' an object. This - * will NOT delete any config entries, the caller will have to - * handle that. - * - * This flag is advisory and for human consumption only. Don't - * attempt to read it from services. */ - mark_object_deleted (obj) { - return this.patch_config(App.Info, obj, "merge", - { deleted: true }); - } - - async search (...args) { - const opts = args.length == 3 - ? { app: args[0], query: args[1], results: args[2] } - : args[0]; - - const klass = opts.klass ? `/class/${opts.klass}` : ""; - const qs = Object.fromEntries([ - ...Object.entries(opts.query) - .map(([k, v]) => [k, JSON.stringify(v)]), - ...Object.entries(opts.results ?? {}) - .map(([k, v]) => [`@${k}`, v]), - ]); - - const res = await this.fplus.fetch({ - service: Service.ConfigDB, - url: `/v1/app/${opts.app}${klass}/search`, - query: qs, - }); - if (!res.ok) - this.throw("search failed", res.status); - return await res.json(); - } - - async resolve (opts) { - if ("results" in opts) - this.throw("resolve doesn't return results"); - - const uuids = await this.search(opts); - - if (!Array.isArray(uuids)) - this.throw("search didn't return an array"); - if (uuids.length > 1) - this.throw(format("More than one result resolving %s with %o", - opts.app, opts.query)); - - return uuids[0]; - } - - /* Returns a Promise to a watcher object. */ - async watcher () { - if (!this._watcher) { - const fplus = this.fplus; - - const [watcher, spapp, info] = await Promise.all([ - import("./configdb-watcher.js"), - fplus.MQTT.sparkplug_app(), - fplus.Directory.get_service_info(this.service), - ]); - - const dev = await spapp.device({ device: info.device }); - this._watcher = new watcher.ConfigDBWatcher(this, dev); - } - return this._watcher; - } -} diff --git a/lib/service/directory.js b/lib/service/directory.js deleted file mode 100644 index 6009b55..0000000 --- a/lib/service/directory.js +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * Directory service interface. - * Copyright 2023 AMRC. - */ - -import { Address } from "../sparkplug/util.js"; -import { Service, Null as Null_UUID } from "../uuids.js"; - -import { ServiceInterface } from "./service-interface.js"; - -export default class Directory extends ServiceInterface { - constructor (fplus) { - super(fplus); - this.service = Service.Directory; - this.log = this.debug.log.bind(this.debug, "directory"); - } - - async get_service_info (service) { - const [st, specs] = await this.fetch(`/v1/service/${service}`); - if (st == 404) { - this.log("Can't find service %s: %s", service, st); - return; - } - if (st != 200) - this.throw(`Can't get service records for ${service}`, st); - if (!Array.isArray(specs)) - this.throw(`Invalid service response for ${service}`); - if (specs.length > 1) - this.throw(`More than one service record for ${service}`); - - return specs[0]; - } - - async get_service_url (service) { - const spec = await this.get_service_info(service); - return spec?.url; - } - - /* XXX Endpoint for compatibility. This will only ever return a - * single result. */ - async get_service_urls (service) { - const url = await this.get_service_url(service); - return url ? [url] : undefined; - } - - async register_service_url (service, url) { - const [st] = await this.fetch({ - method: "PUT", - url: `v1/service/${service}/advertisment`, - body: { url }, - }); - if (st != 204) - this.throw(`Can't register service ${service}`, st); - this.log("Registered %s for %s", url, service); - } - - async get_device_info (device) { - const [st, info] = await this.fetch(`v1/device/${device}`); - if (st == 404) return; - if (st != 200) - this.throw(`Can't find device ${device}`, st); - return info; - } - - async get_device_address (device) { - const info = await this.get_device_info(device); - if (!info) return; - return new Address(info.group_id, info.node_id, info.device_id); - } -} diff --git a/lib/service/discovery.js b/lib/service/discovery.js deleted file mode 100644 index 3f41eff..0000000 --- a/lib/service/discovery.js +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * Service discovery. - * Copyright 2022 AMRC. - */ - -import { Service } from "../uuids.js"; - -import { ServiceInterface } from "./service-interface.js"; - -export default class Discovery extends ServiceInterface { - constructor (fplus) { - super(fplus); - this.urls = new Map(); - - const opts = fplus.opts; - const presets = [ - [opts.authn_url, Service.Authentication], - [opts.configdb_url, Service.ConfigDB], - [opts.directory_url, Service.Directory], - [opts.mqtt_url, Service.MQTT], - ]; - for (const [pr, srv] of presets) { - if (pr == null) continue; - this.debug.log("discovery", "Preset URL for %s: %s", srv, pr); - this.set_service_url(srv, pr); - } - } - - /* We have a service URL from somewhere... */ - set_service_url (service, url) { - this.urls.set(service, url); - } - - /* We know how to find service URLs (hook for the Directory) */ - set_service_discovery (locator) { - this.find_service_urls = locator; - } - - find_service_urls (service) { - return this.fplus.Directory.get_service_urls(service); - } - - /* This interface is deprecated. The concept of 'multiple providers - * for a service' was never properly implemented. */ - async service_urls (service) { - this.debug.log("service", `[${service}] Looking for URL...`); - if (this.urls.has(service)) { - const url = this.urls.get(service); - this.debug.log("service", `[${service}] Found ${url} preconfigured.`); - return [url]; - } - - const urls = await this.find_service_urls(service); - - if (urls) { - this.debug.log("service", "[%s] Discovery returned %s", - service, urls.join(", ")); - return urls; - } else { - return []; - } - } - - async service_url (service) { - const urls = await this.service_urls(service); - return urls?.[0]; - } -} diff --git a/lib/service/fetch.js b/lib/service/fetch.js deleted file mode 100644 index 24d9022..0000000 --- a/lib/service/fetch.js +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * HTTP client code. - * Copyright 2022 AMRC. - */ - -import {fetch, GSS} from "../deps.js"; - -import { ServiceInterface } from "./service-interface.js"; - -const Auth_rx = /^([A-Za-z]+) +([A-Za-z0-9._~+/=-]+)$/; - -class Warning extends Error { - constructor (msg, opts) { - super(msg, opts); - this.name = "Warning"; - } -} - -class ErrorHeaders { - append () { throw new TypeError("Immutable headers"); } - delete () { throw new TypeError("Immutable headers"); } - *entries () { } - forEach () { } - get () { return null; } - has () { return false; } - *keys () { } - set () { throw new TypeError("Immutable headers"); } - *values () { } -} - -class ErrorResponse { - constructor () { - this.body = null; - this.headers = new ErrorHeaders(); - this.ok = false; - this.redirected = false; - this.status = 0; - this.statusText = "Network error"; - this.type = "error"; - this.url = null; - } - - arrayBuffer () { return Promise.reject("Network error"); } - blob () { return Promise.reject("Network error"); } - formData () { return Promise.reject("Network error"); } - json () { return Promise.reject("Network error"); } - text () { return Promise.reject("Network error"); } -} - -function _with_auth (opts, scheme, creds) { - opts = {...opts}; - opts.headers = {...opts.headers}; - opts.headers.Authorization = `${scheme} ${creds}`; - return opts; -} - -function _idempotent (opts) { - if (opts.method && opts.method !== "GET") - return false; - if (opts.headers) return false; - return !opts.body; -} - -export default class Fetch extends ServiceInterface { - constructor (fplus) { - super(fplus); - this.tokens = new Map(); - this.inflight = new Map(); - this.cache = "default"; - } - - async fetch (opts) { - try { - /* XXX Call service_urls instead and find one that works. */ - opts.service_url = await this.fplus.service_url(opts.service); - const url = new URL(opts.url, opts.service_url); - if (opts.query) - url.search = new URLSearchParams(Object.entries(opts.query)); - - opts.cache ??= this.cache; - - /* Don't mess with stateful requests. */ - if (!_idempotent(opts)) - return await this.do_fetch(url, opts); - - /* If we have a request to this URL in-flight, don't make - * another, wait for the first to complete. */ - const inflight = this.inflight.get(url); - if (inflight) return await inflight; - - const res_pr = this.do_fetch(url, opts) - /* Make sure we clear in-flight on failure too */ - .finally(() => this.inflight.delete(url)); - this.inflight.set(url, res_pr); - return await res_pr; - } - catch (e) { - this.debug.log("fetch", `Fetch error: ${e}\n${e.stack}`); - return new ErrorResponse(); - } - } - - async do_fetch (url, opts) { - let token; - const try_fetch = async () => { - token = await this.service_token(opts.service_url, token); - if (!token) return null; - return await fetch(url, _with_auth(opts, "Bearer", token)); - }; - - let res = await try_fetch(); - if (res?.status === 401) - res = await try_fetch(); - - return res; - } - - async service_token (base_url, bad) { - /* This might be a token or a promise for one */ - let token = this.tokens.get(base_url); - - const is_bad = bad !== undefined && - typeof token == "string" && - token === bad; - - if (!token || is_bad) { - token = this._fetch_token(base_url); - /* We push a promise to the cache here */ - this.tokens.set(base_url, token); - } - - try { - token = await token; - this.debug.log("token", `Using token ${token} for ${base_url}`); - return token; - } - catch (e) { - this.tokens.delete(base_url); - throw e; - } - } - - /* Some code was consuming the private interface */ - _service_token (base_url, bad) { - this.debug.log("warning", new Warning("Private method called")); - return this.service_token(base_url, bad); - } - - async _fetch_token (base_url) { - const tok_url = new URL("/token", base_url); - - this.debug.log("token", `Requesting new token from ${tok_url}`); - const res = await this._gss_fetch(tok_url, { - method: "POST", - body: "", - }); - - if (!res?.ok) { - this.debug.log("token", `Token request failed (${res.status})`); - /* XXX How should we handle failure here? This indicates, - * perhaps, that our Kerberos creds are bad, or the service - * is down. Probably the only strategy that makes sense is - * to log and exit; let K8s handle restart/backoff. */ - throw `Token fetch fetch failed for ${base_url}`; - } - - const json = await res.json(); - /* There will be a promise in the cache already; overwrite it - * with the real token. */ - this.tokens.set(base_url, json.token); - return json.token; - } - - - async _gss_fetch(url, opts) { - if (this.fplus.opts.username && this.fplus.opts.password) { - this.debug.log("fetch", `Basic auth with ${this.fplus.opts.username}`); - return await fetch(url, _with_auth(opts, - "Basic", Buffer.from(`${this.fplus.opts.username}:${this.fplus.opts.password}`) - .toString('base64'))); - } else { - const ctx = GSS.createClientContext({ server: `HTTP@${url.hostname}` }); - const cli_tok = await GSS.initSecContext(ctx); - const cli_b64 = cli_tok.toString("base64"); - const res = await fetch(url, _with_auth(opts, "Negotiate", cli_b64)); - if (res.status === 401) - throw `Server denied GSS auth for ${url}`; - - /* Grr crummy M$ protocol... */ - const [, scheme, creds] = res.headers - .get("WWW-Authenticate") - .match(Auth_rx) ?? []; - if (scheme !== "Negotiate") - throw `Bad WWW-A scheme '${scheme}' from ${url}`; - - await GSS.initSecContext(ctx, Buffer.from(creds, "base64")); - if (!ctx.isComplete()) - throw `GSS mutual auth failed for ${url}`; - - return res; - } - } -} diff --git a/lib/service/git.js b/lib/service/git.js deleted file mode 100644 index b1c77d8..0000000 --- a/lib/service/git.js +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * Git service interface. - * Copyright 2023 AMRC. - */ - -/* This interface is a bit minimal, as I don't want to depend on a git - * library. */ - -import { App, Service } from "../uuids.js"; - -import { ServiceInterface } from "./service-interface.js"; - -export default class Git extends ServiceInterface { - constructor (fplus) { - super(fplus); - - this.service = Service.Git; - } - - base_url () { - return this.fplus.Discovery.service_url(this.service); - } - - /* This is suitable for isomorphic-git */ - async git_auth (url, auth) { - const bad = auth?.headers?.Authorization - ?.match(/^Bearer\s+(\S+)$/)?.[1]; - - const base = await this.base_url(); - const tok = await this.fplus.Fetch.service_token(base, bad); - - return { - headers: { - "Authorization": `Bearer ${tok}`, - }, - }; - } - - async repo_by_uuid (uuid) { - const base = await this.base_url(); - return new URL(`uuid/${uuid}`, base).toString(); - } - - async repo_by_path (path) { - const base = await this.base_url(); - return new URL(`git/${path}`, base).toString(); - } - - /* Options to splat into iso-git's options where a remote is - * required. This does not include the http client as we don't know - * which to use. */ - async remote (opts) { - const url = opts.uuid - ? await this.repo_by_uuid(opts.uuid) - : await this.repo_by_path(opts.path); - const auth = this.git_auth.bind(this); - return { - url, - onAuth: auth, - onAuthFailure: auth, - }; - } -} diff --git a/lib/service/mqtt.js b/lib/service/mqtt.js deleted file mode 100644 index 5fa6c96..0000000 --- a/lib/service/mqtt.js +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * GSSAPI MQTT connection. - * Copyright 2022 AMRC - */ - -import timers from "timers/promises"; - -import { GSS, MQTT } from "../deps.js"; -import { Service } from "../uuids.js"; -import { BasicSparkplugNode } from "../sparkplug/basic-node.js"; - -import { ServiceInterface } from "./service-interface.js"; - -async function gss_init (host) { - const ctx = GSS.createClientContext({ - server: `mqtt@${host}`, - }); - const buf = await GSS.initSecContext(ctx); - - return [ctx, buf]; -} - -function get_verb_from (opts) { - return opts.verbose ? console.log - : opts.log ?? (m => {}); -} - -/* Although MQTT.js supports MQTT5 extended auth, the support has not - * really been thought through. The initial auth-data must be supplied - * before a connection attempt is made, meaning we must build an AP-REQ - * before we even know if the MQTT server is online. Then, we can't use - * the auto-reconnect functionality, as this gives no opportunity to - * create a new AP-REQ, as that might perform network activity and must - * be awaited. So that has to be reimplemented manually based on - * watching for the 'close' event. If we wanted to support a GSSAPI - * exchange that took more than one step (e.g. SPNEGO) this would be - * even more convoluted. */ - -/* Don't use .on("connect") on the object returned from here; we need - * that event to complete the GSS auth. Use .on("authenticated") instead, - * which doesn't fire until after we have finished authenticating the - * server properly. */ - -/* This export is deprecated. Go via ServiceClient instead, as this - * will allow fallback to username/password if this is all we have. */ -export async function gss_mqtt (url, opts) { - opts ??= {}; - const host = new URL(url).hostname; - const verb = get_verb_from(opts); - - /* Has the connection deliberately been closed? */ - let ending = false; - - /* These are renewed every time we reconnect as Kerberos won't let - * us replay AP-REQ packets. */ - let [ctx, buf] = await gss_init(host); - - const mqtt = MQTT.connect(url, { - reconnectPeriod: 3000, - ...opts, - protocolVersion: 5, - properties: { - ...opts.properties, - authenticationMethod: "GSSAPI", - authenticationData: buf, - }, - }); - mqtt.on("packetreceive", pkt => { - if (pkt.cmd != "connack") - return; - verb("MQTT clearing GSS creds"); - mqtt.options.properties.authenticationData = ""; - }); - mqtt.on("connect", ack => { - //verb("Got CONNACK: %o", ack); - const srv_buf = ack.properties.authenticationData; - GSS.initSecContext(ctx, srv_buf).then(next => { - if (next.length) - throw "GSS auth took more than one step"; - /* XXX I'm not sure this will properly abort if mutual auth - * fails. We may need to be more drastic. */ - if (!ctx.isComplete()) - throw "MQTT server failed to authenticate itself!"; - verb("MQTT connected"); - mqtt.emit("gssconnect", ack); - mqtt.emit("authenticated", ack); - }); - }); - mqtt.on("close", () => { - if (ending) return; - - /* We clear the authData when we get a CONNACK (successful or - * not). We don't need to do anything if our AP-REQ hasn't been - * used yet. */ - if (mqtt.options.properties.authenticationData != "") - return; - - /* XXX This will not necessarily complete before the client - * sends the CONNECT packet. There is nothing we can do about - * this with the existing MQTT.js API. The connect attempt will - * fail (GSS replay) and we will try again. - * - * Previously this code bypassed the auto-reconnect logic and - * explicitly reconnected after fetching the AP-REQ. But this - * also disabled the auto-resubscribe logic, which is not - * helpful. - */ - verb("MQTT fetching new GSS creds"); - gss_init(host) - .then(newgss => { - verb("MQTT setting new GSS creds"); - [ctx, buf] = newgss; - mqtt.options.properties.authenticationData = buf; - }); - }); - mqtt.on("end", () => { - verb("MQTT end"); - ending = true; - }); - - return mqtt; -} - -async function basic_mqtt(url, opts) { - const verb = get_verb_from(opts); - - verb(`Basic auth with ${opts.username}`); - const mqtt = MQTT.connect(url,{ - ...opts - }); - mqtt.on("connect", ack => { - verb("MQTT connected"); - mqtt.emit("authenticated", ack); - }); - mqtt.on("close", () => { - verb("MQTT connection closed"); - }); - return mqtt; -} - -export default class MQTTInterface extends ServiceInterface { - async _mqtt_client(opts) { - opts ??= {}; - - this.debug.log("mqtt", "Looking up MQTT broker URL"); - /* XXX Call service_urls instead and find one that works. */ - let url = opts.host; - url ??= await this.fplus.service_url(Service.MQTT); - - if (url == null) - return null; - - this.debug.log("mqtt", `Connecting to broker ${url}`); - - const { username, password } = this.fplus.opts; - const mqopts = { - ...opts, - username, password, - /* I had this here, but as of mqttjs v5 it is producing a - * lot of rather verbose logging, so I've removed it. */ - //log: (...a) => this.debug.log("mqtt", ...a), - }; - return username && password - ? await basic_mqtt(url, mqopts) - : await gss_mqtt(url, mqopts); - } - - async mqtt_client (opts) { - if (this.client && opts && Object.keys(opts).length) - this.throw("MQTT options must be provided on the first connection"); - - this.client ??= await this._mqtt_client(opts); - return this.client; - } - - basic_sparkplug_node (opts) { - return new BasicSparkplugNode({ - ...opts, - mqttFactory: will => this.mqtt_client({ ...opts, will }), - }); - } - - async sparkplug_app () { - if (!this._sparkplug_app) { - const spa = await import("@amrc-factoryplus/sparkplug-app"); - const app = new spa.SparkplugApp({ fplus: this.fplus }); - this._sparkplug_app = await app.init(); - } - return this._sparkplug_app; - } -} diff --git a/lib/service/service-interface.js b/lib/service/service-interface.js deleted file mode 100644 index 6b49423..0000000 --- a/lib/service/service-interface.js +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Factory+ NodeJS Utilities - * Service interface base class. - * Copyright 2022 AMRC. - */ - -import content_type from "content-type"; -import Optional from "optional-js"; - -import { Service } from "../uuids.js"; - -const Names = Object.fromEntries( - Object.entries(Service) - .filter(([k, v]) => k != "Registry") - .map(([n, u]) => [u, n])); - -export class ServiceError extends Error { - constructor (service, message, status) { - super(message); - this.service = service; - this.status = status; - } - - toString () { - const srv = Names[this.service] ?? this.service; - const st = this.status == undefined ? "" : `: ${this.status}`; - return `${srv}: ${this.message}${st}`; - } -} - -export class ServiceInterface { - constructor (fplus) { - this.fplus = fplus; - this.debug = fplus.debug; - } - - throw (message, status) { - throw new ServiceError(this.service, message, status); - } - - async fetch (opts) { - if (typeof opts == "string") - opts = { url: opts }; - - const body = "body" in opts ? JSON.stringify(opts.body) : undefined; - - const headers = { ...opts.headers }; - headers["Accept"] = opts.accept ?? "application/json"; - if (body) - headers["Content-Type"] = opts.content_type ?? "application/json"; - - opts = { - ...opts, - service: this.service, - headers, - body, - }; - const res = await this.fplus.fetch(opts); - /* ?? here because Optional.or doesn't work properly */ - const json = Optional.of(opts.method ?? "GET") - .filter(m => m != "HEAD") - .map(_ => res.headers.get("Content-Type")) - .map(content_type.parse) - .filter(ct => ct.type == "application/json") - .map(_ => res.json()) - .orElse(undefined); - - /* We are only interested in strong etags in the form of a UUID */ - const etag = Optional - .ofNullable(res.headers.get("ETag")) - .map(et => - /^"([0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12})"$/.exec(et)) - .map(m => m[1]) - .orElse(undefined); - - return [res.status, await json, etag]; - } - - async ping () { - const [st, ping] = await this.fetch("/ping"); - if (st != 200) return; - return ping; - } -} diff --git a/lib/sparkplug/basic-node.js b/lib/sparkplug/basic-node.js deleted file mode 100644 index 914546f..0000000 --- a/lib/sparkplug/basic-node.js +++ /dev/null @@ -1,152 +0,0 @@ -import { EventEmitter } from "node:events"; - -import { Debug } from "../debug.js"; -import { SpB } from "../deps.js"; -import { Address, Topic } from "./util.js"; - -export class BasicSparkplugNode extends EventEmitter { - constructor (opts) { - super(); - - this.address = opts.address; - this.publishDeath = opts.publishDeath; - this.mqttFactory = opts.mqttFactory; - this.debug = opts.debug ?? new Debug(); - - this.log = this.debug.log.bind(this.debug, "sparkplug"); - - this.bdSeq = 0; - } - - will () { - return { - topic: this.address.topic("DEATH"), - payload: SpB.encodePayload(this._death()), - qos: 0, - retain: false, - }; - } - - async connect (mqtt) { - mqtt ??= await this.mqttFactory(this.will()); - this.mqtt = mqtt; - - /* XXX Tahu exits if we get an error before we have connected... - * I'm not sure why. */ - for (const ev of ["error", "close", "reconnect", "offline"]) - mqtt.on(ev, this.emit.bind(this, ev)); - - mqtt.on("authenticated", this.on_authenticated.bind(this)); - mqtt.on("message", this.on_message.bind(this)); - } - - on_authenticated () { - const {mqtt, address} = this; - - this.log("MQTT connected"); - this.emit("connect"); - - /* Tahu subscribes much too broadly here. This causes problems - * with the HiveMQ ACLs, which are quite strict about the - * subscription topics requested. */ - mqtt.subscribe(address.topic("CMD")); - mqtt.subscribe(address.child_device("+").topic("CMD")); - - this.emit("birth"); - } - - /* XXX We don't handle Tahu compressed payloads. There is no spec; - * we could potentially write one up and use it. */ - on_message (topicstr, message) { - const topic = Topic.parse(topicstr); - if (!topic) { - this.log(`Message on unknown topic ${topicstr}`); - return; - } - - let payload; - try { - payload = SpB.decodePayload(message); - } - catch { - this.log(`Invalid payload on ${topic}`); - return; - } - - if (topic.type == "CMD") { - if (topic.address.isDevice()) - this.emit("dcmd", topic.address.device, payload); - else - this.emit("ncmd", payload); - } - else { - /* What are you doing listening to other messages? */ - this.emit("message", payload); - } - } - - _bdSeqMetric () { - return { - name: "bdSeq", - type: "UInt64", - value: this.bdSeq, - }; - } - - _death () { - return { - timestamp: Date.now(), - metrics: [this._bdSeqMetric()], - }; - } - - _publish (kind, device, payload, opts) { - const addr = device == null - ? this.address - : this.address.child_device(device); - - payload.seq = this.seq; - this.seq = (this.seq < 255) ? this.seq + 1 : 0; - - /* XXX compress? */ - - const message = SpB.encodePayload(payload); - this.mqtt.publish(addr.topic(kind), message); - this.log(`Published ${kind} for ${device ?? "Edge Node"}`); - } - - /* All these methods may modify the payload parameter, to any depth. - */ - - publishNodeBirth (payload, opts) { - this.seq = 0; - payload.metrics ??= []; - /* This will always sit at 0, and is therefore pointless. But - * it's in the spec, and this is what Tahu does. If we wanted to - * require stable storage we could do this properly. */ - payload.metrics.push(this._bdSeqMetric()); - this._publish("BIRTH", null, payload, opts); - } - - publishDeviceBirth (device, payload, opts) { - this._publish("BIRTH", device, payload, opts); - } - - publishNodeData (payload, opts) { - this._publish("DATA", null, payload, opts); - } - - publishDeviceData (device, payload, opts) { - this._publish("DATA", device, payload, opts); - } - - publishDeviceDeath (device, payload, opts) { - this._publish("DEATH", device, payload, opts); - } - - stop () { - if (this.publishDeath) - this._publish("DEATH", null, this._death()); - this.mqtt.end(); - } -} diff --git a/lib/sparkplug/util.js b/lib/sparkplug/util.js index 574fcba..0e677a5 100644 --- a/lib/sparkplug/util.js +++ b/lib/sparkplug/util.js @@ -2,99 +2,10 @@ * Sparkplug utilities * Copyright 2021 AMRC */ -export class Address { - constructor (group, node, device) { - this.group = group; - this.node = node; - if (device == null || device == "") - device = undefined; - this.device = device; - } - - static parse (addr) { - return new Address(...addr.split("/")); - } - - equals (other) { - return this.group === other.group && - this.node === other.node && - this.device === other.device; - } - - /* Checks for a match, allowing this address to be a wildcard. The - * other address must be literal. The only wildcard is '+', which - * allows any string (but not no string at all). */ - matches (other) { - const wild = (p, a) => - p === a || - (p == "+" && a != undefined); - - return wild(this.group, other.group) && - wild(this.node, other.node) && - wild(this.device, other.device); - } - - toString () { - let node = `${this.group}/${this.node}`; - return this.isDevice() ? `${node}/${this.device}` : node; - } - - isDevice () { return this.device != undefined; } - topicKind () { return this.isDevice() ? "D" : "N" } - - topic (type) { - return new Topic(this, type).toString(); - } - - parent_node () { - return new Address(this.group, this.node); - } - - child_device (device) { - return new Address(this.group, this.node, device); - } - - is_child_of (node) { - return this.parent_node().equals(node); - } -} - -export class Topic { - static prefix = "spBv1.0"; - - constructor (address, type) { - this.address = address; - this.type = type; - } - - static parse (topic) { - const parts = topic.split("/"); - - if (parts.length != 4 && parts.length != 5) - return null; - if (parts[0] != Topic.prefix) - return null; - - const addr = new Address(parts[1], parts[3], parts[4]); - const [, kind, type] = parts[2].match(/(.)(.*)/); - - if (kind != addr.topicKind()) - return null; - - return new Topic(addr, type); - } - - toString () { - const a = this.address; - const t = this.type; - const typ = - t == "+" ? t : a.topicKind() + t; - - const node = `${Topic.prefix}/${a.group}/${typ}/${a.node}`; - return a.isDevice() ? `${node}/${a.device}` : node; - } -} +/* These functions are a compatibility interface for existing code; I + * think acs-directory, acs-configdb and acs-cmdesc. Do not use in new + * code. */ const make_builder = (new_metrics) => { return (metrics) => { @@ -273,27 +184,3 @@ export const MetricBuilder = { }, }, }; - -export class MetricBranch { -} - -export class MetricTree { - constructor (payload) { - for (const m of payload.metrics) { - const path = m.name.split("/"); - const name = path.pop(); - - let obj = this; - for (const p of path) { - obj = obj[p] ||= new MetricBranch(); - - if (!(obj instanceof MetricBranch)) { - const branch = new MetricBranch(); - branch.$metric = obj; - obj = branch; - } - } - obj[name] = m; - } - } -} diff --git a/lib/util.js b/lib/util.js index 9015696..bb63f59 100644 --- a/lib/util.js +++ b/lib/util.js @@ -4,9 +4,9 @@ * Copyright 2022 AMRC. */ -import fs from "node:fs"; -import path from "node:path"; -import url from "node:url"; +import fs from "fs"; +import path from "path"; +import url from "url"; import find_pkg from "find-package-json"; diff --git a/lib/webapi.js b/lib/webapi.js index 70ae1ca..c3e9eb6 100644 --- a/lib/webapi.js +++ b/lib/webapi.js @@ -4,16 +4,15 @@ * Copyright 2022 AMRC */ -import http from "node:http"; -import crypto from "node:crypto"; +import http from "http"; +import crypto from "crypto"; import express from "express"; import createError from "http-errors"; import cors from "cors"; import { pathToRegexp } from "path-to-regexp"; -import { GSS } from "./deps.js"; -import { Debug } from "./debug.js"; +import { Debug, GSS } from "@amrc-factoryplus/service-client"; const Auth_rx = /^([A-Za-z]+) +([A-Za-z0-9._~+/=-]+)$/; const SESSION_LENGTH = 3*3600*1000; @@ -173,7 +172,7 @@ export class WebAPI { this.port = opts.http_port || 80; this.routes = opts.routes; - this.debug = new Debug(); + this.debug = new Debug(opts); this.auth = new FplusHttpAuth({ ...opts, log: (req, res, ...args) => this.debug.log("http", ...args), diff --git a/package.json b/package.json index 0d956a5..d623e52 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@amrc-factoryplus/utilities", - "version": "1.3.2", + "version": "2.0.0", "description": "A collection of NodeJS utilities for developing components for Factory+", "author": "AMRC", "license": "MIT", @@ -20,22 +20,16 @@ }, "scripts": {}, "dependencies": { - "content-type": "^1.0.5", + "@amrc-factoryplus/service-client": "^1.3.4", "cors": "^2.8.5", "express": "^4.18.2", "find-package-json": "^1.2.0", - "got-fetch": "^5.1.7", "http-errors": "^2.0.0", - "mqtt": "^5.1.4", - "optional-js": "^2.3.0", "path-to-regexp": "^6.2.1", "pg": "^8.11.3", - "protobufjs": "^6.11.4", - "semver": "^7.5.4", - "sparkplug-payload": "^1.0.3" + "semver": "^7.5.4" }, "optionalDependencies": { - "gssapi.js": "^2.0.1", "pg-native": "^3.0.1" }, "keywords": [