Skip to content

Commit

Permalink
optimize process latest flight data
Browse files Browse the repository at this point in the history
  • Loading branch information
catdevnull committed Dec 28, 2024
1 parent c8bd152 commit 2bef981
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 41 deletions.
5 changes: 3 additions & 2 deletions consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ export const sql = waddler({
url: `md:flight-statii${PROD ? "" : "-dev"}`,
});

export const B2_REGION = "us-west-004";
export const b2 = new S3Client({
endpoint: "https://s3.us-west-004.backblazeb2.com",
region: "us-west-004",
endpoint: `https://s3.${B2_REGION}.backblazeb2.com`,
region: B2_REGION,
credentials: {
accessKeyId: process.env.B2_KEY_ID || "",
secretAccessKey: process.env.B2_APP_KEY || "",
Expand Down
89 changes: 50 additions & 39 deletions trigger/process-latest-flight-data.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { logger, schedules } from "@trigger.dev/sdk/v3";
import { b2, B2_BUCKET, B2_PATH, sql } from "../consts";
import { GetObjectCommand, ListObjectsV2Command } from "@aws-sdk/client-s3";
import { AerolineasFlightData } from "../misc/aerolineas";
import { z } from "zod";
import { b2, B2_BUCKET, B2_PATH, B2_REGION, sql } from "../consts";
import { _Object, ListObjectsV2Command } from "@aws-sdk/client-s3";
import PQueue from "p-queue";

export const processLatestFlightDataTask = schedules.task({
Expand All @@ -12,44 +10,22 @@ export const processLatestFlightDataTask = schedules.task({
machine: { preset: "medium-1x" },
run: async (payload, { ctx }) => {
async function processJson(path: string) {
const response = await b2.send(
new GetObjectCommand({
Bucket: B2_BUCKET,
Key: path,
})
);
const json = await response.Body?.transformToString();
if (!json) throw new Error("No data");
const obj = JSON.parse(json);
const result = z.array(AerolineasFlightData).safeParse(obj);
if (!result.success) {
logger.warn("Failed to parse FlightData", {
error: result.error,
path,
});
return;
}
const { airport, direction, flightDate, fetchedAt } = parsePath(path);
for (const status of result.data) {
await sql`
INSERT INTO aerolineas_latest_flight_status (aerolineas_flight_id, last_updated, json)
VALUES (${status.id}, ${fetchedAt.toISOString()}, ${JSON.stringify(status)})
ON CONFLICT (aerolineas_flight_id)
DO UPDATE SET last_updated = ${fetchedAt.toISOString()}, json = ${JSON.stringify(
status
)}
`;
}
const publicUrl = getPublicB2Url(path);
const q = sql`
INSERT INTO aerolineas_latest_flight_status (aerolineas_flight_id, last_updated, json)
SELECT DISTINCT ON(aerolineas_flight_id) json->>'$.id' as aerolineas_flight_id, ${fetchedAt.toISOString()} as last_updated, json
FROM read_json_auto(${publicUrl})
ON CONFLICT (aerolineas_flight_id)
DO UPDATE SET last_updated = EXCLUDED.last_updated, json = EXCLUDED.json
`;
await q;
}

const list = await b2.send(
new ListObjectsV2Command({
Bucket: B2_BUCKET,
Prefix: B2_PATH,
})
);
const list = await getAllObjectsFromS3Bucket(B2_BUCKET, B2_PATH);
const queue = new PQueue({ concurrency: 50 });
const tasks = (list.Contents || [])
const tasks = list
.filter((item) => item.Size && item.Size > 2) // filter out empty JSON arrays
.filter(
(item): item is { Key: string } =>
(item.Key?.includes("webaa-api") &&
Expand All @@ -63,7 +39,7 @@ export const processLatestFlightDataTask = schedules.task({
return true;
})
.map(({ Key }) => async () => {
logger.info("Processing", { Key });
logger.info("Processing", { Key, url: getPublicB2Url(Key) });
await processJson(Key);
});
logger.info(`Processing ${tasks.length} files`);
Expand Down Expand Up @@ -91,3 +67,38 @@ function parsePath(path: string) {
fetchedAt,
};
}
function getPublicB2Url(path: string) {
const split = path.split("?");
return `https://${B2_BUCKET}.s3.${B2_REGION}.backblazeb2.com/${split[0]}${
split[1] ? encodeURIComponent(`?${split[1]}`) : ""
}`;
}

async function getAllObjectsFromS3Bucket(bucket: string, prefix: string) {
let isTruncated = true;
let continuationToken: string | undefined;
const objects: { Key: string; Size?: number }[] = [];

while (isTruncated) {
const response = await b2.send(
new ListObjectsV2Command({
Bucket: bucket,
Prefix: prefix,
ContinuationToken: continuationToken,
})
);

if (response.Contents) {
objects.push(
...response.Contents.filter(
(item): item is { Key: string } => item.Key !== undefined
)
);
}

isTruncated = response.IsTruncated ?? false;
continuationToken = response.NextContinuationToken;
}

return objects;
}

0 comments on commit 2bef981

Please sign in to comment.