From 7a0310994437681bea993784fa543cbc1683c2e3 Mon Sep 17 00:00:00 2001 From: Brandon Keepers Date: Fri, 23 Jan 2026 14:49:53 -0500 Subject: [PATCH] Report in 6 hour increments to limit file size --- packages/signalk-plugin/src/constants.ts | 21 +++- .../signalk-plugin/src/reporters/index.ts | 59 ++++++---- .../signalk-plugin/src/sources/history.ts | 27 ++--- packages/signalk-plugin/src/sources/sqlite.ts | 32 +++++- packages/signalk-plugin/src/types.ts | 28 ++++- .../test/sources/history.test.ts | 34 ++++-- .../test/sources/sqlite.test.ts | 107 +++++++++++++++--- 7 files changed, 237 insertions(+), 71 deletions(-) diff --git a/packages/signalk-plugin/src/constants.ts b/packages/signalk-plugin/src/constants.ts index 018cbcc..0788fb2 100644 --- a/packages/signalk-plugin/src/constants.ts +++ b/packages/signalk-plugin/src/constants.ts @@ -18,14 +18,23 @@ export const ENV = (isInstalledAsModule && "production") || "development"; -export const { - BATHY_URL = ENV === "production" +/** The URL to report data to */ +export const BATHY_URL = + process.env.BATHY_URL || + (ENV === "production" ? "https://depth.openwaters.io" - : "http://localhost:3001", - BATHY_DEFAULT_SCHEDULE = "0 0 * * *", // every day at midnight -} = process.env; + : "http://localhost:3001"); -// Earliest date for bathymetry data. signalk-to-influxdb was first released on 2017-06-28 +/** Number of hours of data to report in each submission */ +export const BATHY_WINDOW_SIZE = Temporal.Duration.from({ + hours: Number(process.env.BATHY_WINDOW_SIZE ?? 6), +}); + +/** Cron schedule to report bathy */ +export const BATHY_DEFAULT_SCHEDULE = + process.env.BATHY_DEFAULT_SCHEDULE ?? `0 0/${BATHY_WINDOW_SIZE.hours} * * *`; + +/** Earliest date for bathymetry data. signalk-to-influxdb was first released on 2017-06-28 */ export const BATHY_EPOCH = Temporal.Instant.from( process.env.BATHY_EPOCH ?? "2017-06-28T00:00:00Z", ); diff --git a/packages/signalk-plugin/src/reporters/index.ts b/packages/signalk-plugin/src/reporters/index.ts index 60cb083..42dfb00 100644 --- a/packages/signalk-plugin/src/reporters/index.ts +++ b/packages/signalk-plugin/src/reporters/index.ts @@ -8,6 +8,7 @@ import { BATHY_URL, BATHY_DEFAULT_SCHEDULE, BATHY_EPOCH, + BATHY_WINDOW_SIZE, } from "../constants.js"; import type { Database } from "better-sqlite3"; import { Temporal } from "@js-temporal/polyfill"; @@ -37,15 +38,21 @@ export function createReporter({ const job = new CronJob(schedule, report); signal.addEventListener("abort", stop, { once: true }); - async function report({ - from = reportLog.lastReport ?? BATHY_EPOCH, - to = Temporal.Now.instant(), - } = {}) { - app.debug(`Generating report from ${from} to ${to}`); + async function report( + timeframe = new Timeframe( + reportLog.lastReport ?? BATHY_EPOCH, + Temporal.Now.instant(), + ), + ) { + app.debug(`Generating report from ${timeframe.from} to ${timeframe.to}`); try { - const data = await source.createReader({ from, to }); + const data = await source.createReader(timeframe); if (!data) { - app.debug("No data to report from %s to %s", from, to); + app.debug( + "No data to report from %s to %s", + timeframe.from, + timeframe.to, + ); return; } @@ -56,37 +63,38 @@ export function createReporter({ const submission = await submitGeoJSON(url, config, vessel, data); app.debug("Submission response: %j", submission); - app.setPluginStatus(`Reported at ${to}`); - reportLog.logReport({ from, to }); + app.setPluginStatus(`Reported at ${timeframe.to}`); + reportLog.logReport(timeframe); } catch (err) { console.error(err); app.error(`Failed to generate or submit report: ${err}`); app.setPluginStatus( - `Failed to report at ${to}: ${(err as Error).message}`, + `Failed to report at ${timeframe.to}: ${(err as Error).message}`, ); throw err; } } - async function reportBackHistory({ - from = reportLog.lastReport ?? BATHY_EPOCH, - to = Temporal.Now.instant(), - } = {}) { - if (!source.getAvailableDates) return; + async function reportInBatches( + timeframe = new Timeframe( + reportLog.lastReport ?? BATHY_EPOCH, + Temporal.Now.instant(), + ), + windowSize = BATHY_WINDOW_SIZE, + ) { app.debug( - "Last reported %s, reporting back history", + "Last reported %s, reporting in batches", reportLog.lastReport ?? "never", ); - for (const date of await source.getAvailableDates({ from, to })) { + for (const window of await source.getAvailableTimeframes( + timeframe, + windowSize, + )) { // Stop if plugin is stopped if (signal.aborted) return; - const from = date; - const to = date.toZonedDateTimeISO("UTC").add({ days: 1 }).toInstant(); - - app.debug(`Reporting back history date ${from} to ${to}`); - await report({ from, to }); + await report(window.clamp(timeframe)); } app.debug("Back history reporting complete"); @@ -99,6 +107,7 @@ export function createReporter({ function start() { job.start(); + app.debug( `Last report at %s, next report at %s`, reportLog.lastReport, @@ -109,13 +118,13 @@ export function createReporter({ if ( reportLog.lastReport && - // Last report was within 24 hours + // Last report was within window size reportLog.lastReport.epochMilliseconds > - Temporal.Now.instant().subtract({ hours: 24 }).epochMilliseconds + Temporal.Now.instant().subtract(BATHY_WINDOW_SIZE).epochMilliseconds ) { start(); } else { - reportBackHistory().then(start); + reportInBatches().then(start); } } diff --git a/packages/signalk-plugin/src/sources/history.ts b/packages/signalk-plugin/src/sources/history.ts index 88d8be8..b2c23d1 100644 --- a/packages/signalk-plugin/src/sources/history.ts +++ b/packages/signalk-plugin/src/sources/history.ts @@ -86,20 +86,17 @@ export async function createHistorySource( } /** - * Get the list of dates that there is data for in the history. - * - * @param from - The start date of the range to get available dates for, defaults to epoch - * @param to - The end date of the range to get available dates for, defaults to now + * Get the list of timeframes that there is data for in the history. */ - async function getAvailableDates({ - from = BATHY_EPOCH, - to = Temporal.Now.instant(), - } = {}) { + async function getAvailableTimeframes( + timeframe: Timeframe, + windowSize: Temporal.Duration, + ) { // @ts-expect-error: https://github.com/SignalK/signalk-server/pull/2264 const res = await history.getValues({ - from, - to, - resolution: 86400, // 1 day + from: timeframe.from, + to: timeframe.to, + resolution: windowSize.total("seconds"), pathSpecs: [ { path: ("environment.depth." + config.path) as Path, @@ -112,14 +109,18 @@ export async function createHistorySource( // Get days with depth data return res.data .filter(([, v]) => Number.isFinite(v)) - .map((row) => Temporal.Instant.from(row[0])); + .map((row) => { + const from = Temporal.Instant.from(row[0]); + const to = from.add(windowSize); + return new Timeframe(from, to); + }); } return { // History providers handle the recording of data themselves createWriter: undefined, createReader, - getAvailableDates, + getAvailableTimeframes, }; } diff --git a/packages/signalk-plugin/src/sources/sqlite.ts b/packages/signalk-plugin/src/sources/sqlite.ts index be9619d..12cfa72 100644 --- a/packages/signalk-plugin/src/sources/sqlite.ts +++ b/packages/signalk-plugin/src/sources/sqlite.ts @@ -1,5 +1,5 @@ import Database from "better-sqlite3"; -import { BathymetryData, BathymetrySource } from "../types.js"; +import { BathymetryData, BathymetrySource, Timeframe } from "../types.js"; import { Readable, Writable } from "stream"; import { ServerAPI } from "@signalk/server-api"; import { Temporal } from "@js-temporal/polyfill"; @@ -22,9 +22,39 @@ export function createSqliteSource( return { createWriter: () => createSqliteWriter(db), + createReader(options) { return createSqliteReader(db, options); }, + + getAvailableTimeframes(timeframe, windowSize) { + const fromMs = timeframe.from.epochMilliseconds; + const toMs = timeframe.to.epochMilliseconds; + const bucketMs = windowSize.total("milliseconds"); + + const stmt = db.prepare< + { from: number; to: number; bucket: number }, + { idx: number } + >( + ` + SELECT CAST(((timestamp - :from) / :bucket) AS INTEGER) AS idx + FROM bathymetry + WHERE timestamp >= :from AND timestamp < :to + GROUP BY idx + ORDER BY idx + `, + ); + + const rows = stmt.all({ from: fromMs, to: toMs, bucket: bucketMs }); + + return rows.map(({ idx }) => { + const start = Temporal.Instant.fromEpochMilliseconds( + fromMs + idx * bucketMs, + ); + const end = start.add(windowSize); + return new Timeframe(start, end); + }); + }, }; } diff --git a/packages/signalk-plugin/src/types.ts b/packages/signalk-plugin/src/types.ts index 6911289..f34a297 100644 --- a/packages/signalk-plugin/src/types.ts +++ b/packages/signalk-plugin/src/types.ts @@ -3,7 +3,26 @@ import { Readable, Writable } from "stream"; export type MaybePromise = T | Promise; -export type Timeframe = { from: Temporal.Instant; to: Temporal.Instant }; +export class Timeframe { + constructor( + public from: Temporal.Instant, + public to: Temporal.Instant, + ) {} + + clamp(bounds: Timeframe): Timeframe { + const clampedFrom = + Temporal.Instant.compare(this.from, bounds.from) < 0 + ? bounds.from + : this.from; + const clampedTo = + Temporal.Instant.compare(this.to, bounds.to) > 0 ? bounds.to : this.to; + return new Timeframe(clampedFrom, clampedTo); + } + + get duration(): Temporal.Duration { + return this.to.since(this.from); + } +} export type BathymetryData = { latitude: number; @@ -16,7 +35,8 @@ export type BathymetryData = { export interface BathymetrySource { createWriter?: () => Writable; createReader: (options: Timeframe) => MaybePromise; - getAvailableDates?( - timeframe?: Partial, - ): Promise; + getAvailableTimeframes( + timeframe: Timeframe, + windowSize: Temporal.Duration, + ): MaybePromise; } diff --git a/packages/signalk-plugin/test/sources/history.test.ts b/packages/signalk-plugin/test/sources/history.test.ts index eb84369..e5e927e 100644 --- a/packages/signalk-plugin/test/sources/history.test.ts +++ b/packages/signalk-plugin/test/sources/history.test.ts @@ -3,6 +3,7 @@ import nock from "nock"; import { Temporal } from "@js-temporal/polyfill"; import { createHistorySource } from "../../src/sources/history.js"; import { app, config } from "../helper.js"; +import { Timeframe } from "../../src/types.js"; afterEach(() => { nock.cleanAll(); @@ -38,7 +39,7 @@ test("reads bathymetry from history http api", async () => { }); const source = await createHistorySource(app, config, { host: `${host}/` }); - const reader = await source?.createReader({ from, to }); + const reader = await source?.createReader(new Timeframe(from, to)); expect(reader).toBeDefined(); const results = await reader!.toArray(); @@ -85,12 +86,29 @@ test("lists available history dates", async () => { }); const source = await createHistorySource(app, config, { host: `${host}/` }); - const dates = await source?.getAvailableDates?.({ from, to }); + const dates = await source!.getAvailableTimeframes( + new Timeframe(from, to), + Temporal.Duration.from({ hours: 24 }), + ); - expect(dates?.map((d) => d.toString())).toEqual([ - "2025-01-01T00:00:00Z", - "2025-01-03T00:00:00Z", - "2025-01-04T00:00:00Z", - ]); - expect(nock.isDone()).toBe(true); + expect(dates).toBeDefined(); + expect(dates).toHaveLength(3); + expect(dates![0].from).toEqual( + Temporal.Instant.from("2025-01-01T00:00:00.000Z"), + ); + expect(dates![0].to).toEqual( + Temporal.Instant.from("2025-01-02T00:00:00.000Z"), + ); + expect(dates![1].from).toEqual( + Temporal.Instant.from("2025-01-03T00:00:00.000Z"), + ); + expect(dates![1].to).toEqual( + Temporal.Instant.from("2025-01-04T00:00:00.000Z"), + ); + expect(dates![2].from).toEqual( + Temporal.Instant.from("2025-01-04T00:00:00.000Z"), + ); + expect(dates![2].to).toEqual( + Temporal.Instant.from("2025-01-05T00:00:00.000Z"), + ); }); diff --git a/packages/signalk-plugin/test/sources/sqlite.test.ts b/packages/signalk-plugin/test/sources/sqlite.test.ts index ad897cb..e1c436a 100644 --- a/packages/signalk-plugin/test/sources/sqlite.test.ts +++ b/packages/signalk-plugin/test/sources/sqlite.test.ts @@ -5,6 +5,7 @@ import { pipeline } from "stream/promises"; import { app } from "../helper.js"; import { createDB } from "../../src/storage.js"; import { Temporal } from "@js-temporal/polyfill"; +import { Timeframe } from "../../src/types.js"; const data = [ { @@ -36,11 +37,13 @@ test("reading and writing to sqlite", async () => { const writer = source.createWriter!(); await pipeline(Readable.from(data), writer); - const reader = await source.createReader({ - from: Temporal.Instant.fromEpochMilliseconds(0), - to: Temporal.Now.instant(), - }); - const result = await reader.toArray(); + const reader = await source.createReader( + new Timeframe( + Temporal.Instant.fromEpochMilliseconds(0), + Temporal.Now.instant(), + ), + ); + const result = await reader!.toArray(); expect(result.length).toBe(data.length); expect(result[0]).toEqual(data[0]); expect(result[1]).toEqual({ ...data[1], heading: null }); @@ -52,12 +55,14 @@ test("reading with from and to", async () => { const writer = source.createWriter!(); await pipeline(Readable.from(data), writer); - const reader = await source.createReader({ - from: Temporal.Instant.from("2025-08-06T22:30:00.000Z"), - to: Temporal.Instant.from("2025-08-06T23:30:00.000Z"), - }); + const reader = await source.createReader( + new Timeframe( + Temporal.Instant.from("2025-08-06T22:30:00.000Z"), + Temporal.Instant.from("2025-08-06T23:30:00.000Z"), + ), + ); - const result = await reader.toArray(); + const result = await reader!.toArray(); expect(result.length).toBe(1); expect(result[0].timestamp).toEqual(data[1].timestamp); }); @@ -65,10 +70,84 @@ test("reading with from and to", async () => { test("reading with no data", async () => { const source = createSqliteSource(app, createDB(":memory:")); - const reader = await source.createReader({ - from: new Date("2025-08-06T22:30:00.000Z"), - to: new Date("2025-08-06T23:30:00.000Z"), - }); + const reader = await source.createReader( + new Timeframe( + Temporal.Instant.from("2025-08-06T22:30:00.000Z"), + Temporal.Instant.from("2025-08-06T23:30:00.000Z"), + ), + ); expect(reader).toBeUndefined(); }); + +function point( + ts: string, + extra: Partial<{ + latitude: number; + longitude: number; + depth: number; + heading: number | undefined; + }> = {}, +) { + return { + latitude: 1, + longitude: 2, + depth: 3, + timestamp: Temporal.Instant.from(ts), + ...extra, + }; +} + +test("getAvailableTimeframes returns 6-hour windows with data", async () => { + const db = createDB(":memory:"); + const source = createSqliteSource(app, db); + const writer = source.createWriter!(); + + const data = [ + point("2025-01-01T01:00:00Z"), // bucket [00:00, 06:00) + point("2025-01-01T07:30:00Z"), // bucket [06:00, 12:00) + point("2025-01-02T00:30:00Z"), // bucket [00:00, 06:00) next day + ]; + + await pipeline(Readable.from(data), writer); + + const from = Temporal.Instant.from("2025-01-01T00:00:00Z"); + const to = Temporal.Instant.from("2025-01-03T00:00:00Z"); + const windowSize = Temporal.Duration.from({ hours: 6 }); + + const windows = await source.getAvailableTimeframes!( + new Timeframe(from, to), + windowSize, + ); + expect(windows).toHaveLength(3); + + expect(windows[0].from).toEqual( + Temporal.Instant.from("2025-01-01T00:00:00Z"), + ); + expect(windows[0].to).toEqual(Temporal.Instant.from("2025-01-01T06:00:00Z")); + + expect(windows[1].from).toEqual( + Temporal.Instant.from("2025-01-01T06:00:00Z"), + ); + expect(windows[1].to).toEqual(Temporal.Instant.from("2025-01-01T12:00:00Z")); + + expect(windows[2].from).toEqual( + Temporal.Instant.from("2025-01-02T00:00:00Z"), + ); + expect(windows[2].to).toEqual(Temporal.Instant.from("2025-01-02T06:00:00Z")); +}); + +test("getAvailableTimeframes returns empty for no data", async () => { + const db = createDB(":memory:"); + const source = createSqliteSource(app, db); + + const from = Temporal.Instant.from("2025-01-01T00:00:00Z"); + const to = Temporal.Instant.from("2025-01-02T00:00:00Z"); + const windowSize = Temporal.Duration.from({ hours: 6 }); + + const windows = await source.getAvailableTimeframes!( + new Timeframe(from, to), + windowSize, + ); + expect(windows).toHaveLength(0); +});