Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions packages/signalk-plugin/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,23 @@ export const ENV =
(isInstalledAsModule && "production") ||
"development";

export const {
BATHY_URL = ENV === "production"
/** The URL to report data to */
export const BATHY_URL =
process.env.BATHY_URL ||
(ENV === "production"
? "https://depth.openwaters.io"
: "http://localhost:3001",
BATHY_DEFAULT_SCHEDULE = "0 0 * * *", // every day at midnight
} = process.env;
: "http://localhost:3001");

// Earliest date for bathymetry data. signalk-to-influxdb was first released on 2017-06-28
/** Number of hours of data to report in each submission */
export const BATHY_WINDOW_SIZE = Temporal.Duration.from({
hours: Number(process.env.BATHY_WINDOW_SIZE ?? 6),
});

/** Cron schedule to report bathy */
export const BATHY_DEFAULT_SCHEDULE =
process.env.BATHY_DEFAULT_SCHEDULE ?? `0 0/${BATHY_WINDOW_SIZE.hours} * * *`;

/** Earliest date for bathymetry data. signalk-to-influxdb was first released on 2017-06-28 */
export const BATHY_EPOCH = Temporal.Instant.from(
process.env.BATHY_EPOCH ?? "2017-06-28T00:00:00Z",
);
59 changes: 34 additions & 25 deletions packages/signalk-plugin/src/reporters/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
BATHY_URL,
BATHY_DEFAULT_SCHEDULE,
BATHY_EPOCH,
BATHY_WINDOW_SIZE,
} from "../constants.js";
import type { Database } from "better-sqlite3";
import { Temporal } from "@js-temporal/polyfill";
Expand Down Expand Up @@ -37,15 +38,21 @@ export function createReporter({
const job = new CronJob(schedule, report);
signal.addEventListener("abort", stop, { once: true });

async function report({
from = reportLog.lastReport ?? BATHY_EPOCH,
to = Temporal.Now.instant(),
} = {}) {
app.debug(`Generating report from ${from} to ${to}`);
async function report(
timeframe = new Timeframe(
reportLog.lastReport ?? BATHY_EPOCH,
Temporal.Now.instant(),
),
) {
app.debug(`Generating report from ${timeframe.from} to ${timeframe.to}`);
try {
const data = await source.createReader({ from, to });
const data = await source.createReader(timeframe);
if (!data) {
app.debug("No data to report from %s to %s", from, to);
app.debug(
"No data to report from %s to %s",
timeframe.from,
timeframe.to,
);
return;
}

Expand All @@ -56,37 +63,38 @@ export function createReporter({

const submission = await submitGeoJSON(url, config, vessel, data);
app.debug("Submission response: %j", submission);
app.setPluginStatus(`Reported at ${to}`);
reportLog.logReport({ from, to });
app.setPluginStatus(`Reported at ${timeframe.to}`);
reportLog.logReport(timeframe);
} catch (err) {
console.error(err);
app.error(`Failed to generate or submit report: ${err}`);
app.setPluginStatus(
`Failed to report at ${to}: ${(err as Error).message}`,
`Failed to report at ${timeframe.to}: ${(err as Error).message}`,
);
throw err;
}
}

async function reportBackHistory({
from = reportLog.lastReport ?? BATHY_EPOCH,
to = Temporal.Now.instant(),
} = {}) {
if (!source.getAvailableDates) return;
async function reportInBatches(
timeframe = new Timeframe(
reportLog.lastReport ?? BATHY_EPOCH,
Temporal.Now.instant(),
),
windowSize = BATHY_WINDOW_SIZE,
) {
app.debug(
"Last reported %s, reporting back history",
"Last reported %s, reporting in batches",
reportLog.lastReport ?? "never",
);

for (const date of await source.getAvailableDates({ from, to })) {
for (const window of await source.getAvailableTimeframes(
timeframe,
windowSize,
)) {
// Stop if plugin is stopped
if (signal.aborted) return;

const from = date;
const to = date.toZonedDateTimeISO("UTC").add({ days: 1 }).toInstant();

app.debug(`Reporting back history date ${from} to ${to}`);
await report({ from, to });
await report(window.clamp(timeframe));
}

app.debug("Back history reporting complete");
Expand All @@ -99,6 +107,7 @@ export function createReporter({

function start() {
job.start();

app.debug(
`Last report at %s, next report at %s`,
reportLog.lastReport,
Expand All @@ -109,13 +118,13 @@ export function createReporter({

if (
reportLog.lastReport &&
// Last report was within 24 hours
// Last report was within window size
reportLog.lastReport.epochMilliseconds >
Temporal.Now.instant().subtract({ hours: 24 }).epochMilliseconds
Temporal.Now.instant().subtract(BATHY_WINDOW_SIZE).epochMilliseconds
) {
start();
} else {
reportBackHistory().then(start);
reportInBatches().then(start);
}
}

Expand Down
27 changes: 14 additions & 13 deletions packages/signalk-plugin/src/sources/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,17 @@ export async function createHistorySource(
}

/**
* Get the list of dates that there is data for in the history.
*
* @param from - The start date of the range to get available dates for, defaults to epoch
* @param to - The end date of the range to get available dates for, defaults to now
* Get the list of timeframes that there is data for in the history.
*/
async function getAvailableDates({
from = BATHY_EPOCH,
to = Temporal.Now.instant(),
} = {}) {
async function getAvailableTimeframes(
timeframe: Timeframe,
windowSize: Temporal.Duration,
) {
// @ts-expect-error: https://github.com/SignalK/signalk-server/pull/2264
const res = await history.getValues({
from,
to,
resolution: 86400, // 1 day
from: timeframe.from,
to: timeframe.to,
resolution: windowSize.total("seconds"),
pathSpecs: [
{
path: ("environment.depth." + config.path) as Path,
Expand All @@ -112,14 +109,18 @@ export async function createHistorySource(
// Get days with depth data
return res.data
.filter(([, v]) => Number.isFinite(v))
.map((row) => Temporal.Instant.from(row[0]));
.map((row) => {
const from = Temporal.Instant.from(row[0]);
const to = from.add(windowSize);
return new Timeframe(from, to);
});
}

return {
// History providers handle the recording of data themselves
createWriter: undefined,
createReader,
getAvailableDates,
getAvailableTimeframes,
};
}

Expand Down
32 changes: 31 additions & 1 deletion packages/signalk-plugin/src/sources/sqlite.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Database from "better-sqlite3";
import { BathymetryData, BathymetrySource } from "../types.js";
import { BathymetryData, BathymetrySource, Timeframe } from "../types.js";
import { Readable, Writable } from "stream";
import { ServerAPI } from "@signalk/server-api";
import { Temporal } from "@js-temporal/polyfill";
Expand All @@ -22,9 +22,39 @@ export function createSqliteSource(

return {
createWriter: () => createSqliteWriter(db),

createReader(options) {
return createSqliteReader(db, options);
},

getAvailableTimeframes(timeframe, windowSize) {
const fromMs = timeframe.from.epochMilliseconds;
const toMs = timeframe.to.epochMilliseconds;
const bucketMs = windowSize.total("milliseconds");

const stmt = db.prepare<
{ from: number; to: number; bucket: number },
{ idx: number }
>(
`
SELECT CAST(((timestamp - :from) / :bucket) AS INTEGER) AS idx
FROM bathymetry
WHERE timestamp >= :from AND timestamp < :to
GROUP BY idx
ORDER BY idx
`,
);

const rows = stmt.all({ from: fromMs, to: toMs, bucket: bucketMs });

return rows.map(({ idx }) => {
const start = Temporal.Instant.fromEpochMilliseconds(
fromMs + idx * bucketMs,
);
const end = start.add(windowSize);
return new Timeframe(start, end);
});
},
};
}

Expand Down
28 changes: 24 additions & 4 deletions packages/signalk-plugin/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,26 @@ import { Readable, Writable } from "stream";

export type MaybePromise<T> = T | Promise<T>;

export type Timeframe = { from: Temporal.Instant; to: Temporal.Instant };
export class Timeframe {
constructor(
public from: Temporal.Instant,
public to: Temporal.Instant,
) {}

clamp(bounds: Timeframe): Timeframe {
const clampedFrom =
Temporal.Instant.compare(this.from, bounds.from) < 0
? bounds.from
: this.from;
const clampedTo =
Temporal.Instant.compare(this.to, bounds.to) > 0 ? bounds.to : this.to;
return new Timeframe(clampedFrom, clampedTo);
}

get duration(): Temporal.Duration {
return this.to.since(this.from);
}
}

export type BathymetryData = {
latitude: number;
Expand All @@ -16,7 +35,8 @@ export type BathymetryData = {
export interface BathymetrySource {
createWriter?: () => Writable;
createReader: (options: Timeframe) => MaybePromise<Readable | undefined>;
getAvailableDates?(
timeframe?: Partial<Timeframe>,
): Promise<Temporal.Instant[]>;
getAvailableTimeframes(
timeframe: Timeframe,
windowSize: Temporal.Duration,
): MaybePromise<Timeframe[]>;
}
34 changes: 26 additions & 8 deletions packages/signalk-plugin/test/sources/history.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import nock from "nock";
import { Temporal } from "@js-temporal/polyfill";
import { createHistorySource } from "../../src/sources/history.js";
import { app, config } from "../helper.js";
import { Timeframe } from "../../src/types.js";

afterEach(() => {
nock.cleanAll();
Expand Down Expand Up @@ -38,7 +39,7 @@ test("reads bathymetry from history http api", async () => {
});

const source = await createHistorySource(app, config, { host: `${host}/` });
const reader = await source?.createReader({ from, to });
const reader = await source?.createReader(new Timeframe(from, to));

expect(reader).toBeDefined();
const results = await reader!.toArray();
Expand Down Expand Up @@ -85,12 +86,29 @@ test("lists available history dates", async () => {
});

const source = await createHistorySource(app, config, { host: `${host}/` });
const dates = await source?.getAvailableDates?.({ from, to });
const dates = await source!.getAvailableTimeframes(
new Timeframe(from, to),
Temporal.Duration.from({ hours: 24 }),
);

expect(dates?.map((d) => d.toString())).toEqual([
"2025-01-01T00:00:00Z",
"2025-01-03T00:00:00Z",
"2025-01-04T00:00:00Z",
]);
expect(nock.isDone()).toBe(true);
expect(dates).toBeDefined();
expect(dates).toHaveLength(3);
expect(dates![0].from).toEqual(
Temporal.Instant.from("2025-01-01T00:00:00.000Z"),
);
expect(dates![0].to).toEqual(
Temporal.Instant.from("2025-01-02T00:00:00.000Z"),
);
expect(dates![1].from).toEqual(
Temporal.Instant.from("2025-01-03T00:00:00.000Z"),
);
expect(dates![1].to).toEqual(
Temporal.Instant.from("2025-01-04T00:00:00.000Z"),
);
expect(dates![2].from).toEqual(
Temporal.Instant.from("2025-01-04T00:00:00.000Z"),
);
expect(dates![2].to).toEqual(
Temporal.Instant.from("2025-01-05T00:00:00.000Z"),
);
});
Loading