Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/use ldes producer library #5

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@ This microservice allows you to publish additions and modifications to resources

The following environment variables can be provided:

- `LDES_ENDPOINT`: the backend endpoint on which the resources should be posted.
- `LDES_FOLDER`: the default folder being used.
- `LDES_FRAGMENTER` (optional): the fragmenter which should be applied when adding new resources. For time-based streams, this will typically be `time-fragmenter`.
- `LDES_STATUS_GRAPH`: EXPERIMENTAL optional the graph where the ldes-delta-pusher keeps its status. If empty, no status is kept and no catching up is done. If this is used, the pusher will write the last dct:modified timestamp that it processed into this graph. Upon restart, it will examine this graph and fetch all subjects from the database with a modified greater or equal to that value minus LDES_CATCH_UP_INITIAL_OFFSET in milliseconds. All of these will be sent (paginated) to the `handlePage` function in `catchUp.ts` in the config folder. Each page is a list of Catchup items {uri, type, modified}. The default config just writes all information about this subject to the LDES. This is naive, you probably want your own implementation of this.
- `LDES_CATCH_UP_PAGE_SIZE`: the page size used when catching up. Only used when `LDES_STATUS_GRAPH` is set
- `LDES_CATCH_UP_INITIAL_OFFSET`: the offset in ms used when catching up after restart. Only used when `LDES_STATUS_GRAPH` is set. Defaults to 1. Upon restart, the pusher will add all items on the LDES with a dct:modified >= the last sync date - the offset. This might add a little duplicate info on the LDES but this way, if two items had the same dct:modified and only the first one made it onto the LDES, now the second one is also there.

> [!CAUTION]
> The catching up after restart process is EXPERIMENTAL. It also has the drawback that IF you add a migration (not triggering deltas right now) with concepts that have modified dates earlier than the moment the LDES has caught up to, these instances will NOT be detected by the catch up process and will NOT appear on the LDES feed. You can get around that by setting the ext:lastSync for the LDES pusher to before these modified dates, but that will trigger some duplication on the feed.
### LDES PRODUCER CONFIG:

The following environment variables must be configured:

- `BASE_URL` (required): the base-url on which this service is hosted. This ensures the service can resolve relative urls.
- `BASE_FOLDER`: the parent folder to store the LDES streams in. (default: `./data`)
- `LDES_STREAM_PREFIX`: the stream prefix to use to identify the streams. This prefix is used in conjunction with the folder name of the stream. (default: `http://mu.semte.ch/streams/`)
- `TIME_TREE_RELATION_PATH`: the path on which the relations should be defined when fragmenting resources using the time-fragmenter. This is also the predicate which is used when adding a timestamp to a new version of a resource. (default: `http://www.w3.org/ns/prov#generatedAtTime`)
- `PREFIX_TREE_RELATION_PATH`: the path on which the relations should be defined when fragmenting resources using the prefix-tree-fragmenter. (default: `https://example.org/name`)
- `CACHE_SIZE`: the maximum number of pages the cache should keep in memory. (default: `10`)
- `FOLDER_DEPTH`: the number of levels the data folder structure should contain. (default: `1`, a flat folder structure)
- `PAGE_RESOURCES_COUNT`: the number of resources (members) one page should contain. (default: `10`)
- `SUBFOLDER_NODE_COUNT`: the maximum number of nodes (pages) a subfolder should contain. (default: `10`)
u
> [!CAUTION]
> The catching up after restart process is EXPERIMENTAL. It also has the drawback that IF you add a migration (not triggering deltas right now) with concepts that have modified dates earlier than the moment the LDES has caught up to, these instances will NOT be detected by the catch up process and will NOT appear on the LDES feed. You can get around that by setting the ext:lastSync for the LDES pusher to before these modified dates, but that will trigger some duplication on the feed.
12 changes: 6 additions & 6 deletions app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ app.use(
bodyParser.json({
limit: "500mb",
// @ts-ignore
type: function (req: Request) {
type: function(req: Request) {
return /^application\/json/.test(req.get("content-type") as string);
},
})
}),
);

async function updateLastModifiedSeen(quads: Quad[]) {
Expand All @@ -36,13 +36,13 @@ async function updateLastModifiedSeen(quads: Quad[]) {
}
}

app.post("/publish", async function (req: Request, res: Response) {
app.post("/publish", async function(req: Request, res: Response) {
if (!status.hasCaughtUpSinceRestart) {
// we haven't finished catching up after restarting. We'll see this change by querying the db
res
.status(202)
.send(
"Still catching up after restart, we assume this request will be handled during catching up."
"Still catching up after restart, we assume this request will be handled during catching up.",
);
return;
}
Expand All @@ -52,8 +52,8 @@ app.post("/publish", async function (req: Request, res: Response) {
await updateLastModifiedSeen(
// deletes happen before inserts, so try inserts first, then deletes
changeSets[changeSets.length - 1]?.inserts ||
changeSets[changeSets.length - 1]?.deletes ||
[]
changeSets[changeSets.length - 1]?.deletes ||
[],
);
res.send("Resource added to LDES");
} catch (e) {
Expand Down
6 changes: 4 additions & 2 deletions config.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export const LDES_FRAGMENTER = process.env.LDES_FRAGMENTER as string | undefined;
export const LDES_ENDPOINT = process.env.LDES_ENDPOINT as string;
export const LDES_FRAGMENTER = process.env.LDES_FRAGMENTER as
| string
| undefined;
export const LDES_FOLDER = process.env.LDES_FOLDER as string;
Loading