Skip to content

Commit

Permalink
way faster processing
Browse files Browse the repository at this point in the history
  • Loading branch information
catdevnull committed Dec 29, 2024
1 parent c63a237 commit bc57e1c
Showing 1 changed file with 76 additions and 28 deletions.
104 changes: 76 additions & 28 deletions trigger/process-latest-flight-data.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { logger, schedules } from "@trigger.dev/sdk/v3";
import { b2, B2_BUCKET, B2_PATH, B2_REGION, sql } from "../consts";
import { _Object, ListObjectsV2Command } from "@aws-sdk/client-s3";
import {
_Object,
GetObjectCommand,
ListObjectsV2Command,
PutObjectCommand,
} from "@aws-sdk/client-s3";
import PQueue from "p-queue";

export const processLatestFlightDataTask = schedules.task({
Expand All @@ -9,40 +14,80 @@ export const processLatestFlightDataTask = schedules.task({
maxDuration: 6000,
machine: { preset: "medium-1x" },
run: async (payload, { ctx }) => {
async function processJson(path: string) {
const { airport, direction, flightDate, fetchedAt } = parsePath(path);
const publicUrl = getPublicB2Url(path);
async function processJsonDir(path: string, fetchedAt: Date) {}

const list = await getAllObjectsFromS3Bucket(B2_BUCKET, B2_PATH);
const queue = new PQueue({ concurrency: 1 });
const tasks = Array.from(
list
.filter((item) => item.Size && item.Size > 2) // filter out empty JSON arrays
.filter(
(item): item is { Key: string } =>
(item.Key?.includes("webaa-api") &&
item.Key?.includes("all-flights")) ||
false
)
.filter(({ Key }) => {
const { fetchedAt } = parsePath(Key);
if (payload.lastTimestamp && fetchedAt <= payload.lastTimestamp)
return false;
return true;
})
.reduce((acc, { Key }) => {
const { fetchedAt } = parsePath(Key);
const timestamp = fetchedAt.getTime();
acc.set(timestamp, (acc.get(timestamp) || []).concat(Key));
return acc;
}, new Map<number, string[]>())
.entries()
).map(([_timestamp, keys]) => async () => {
const key = parsePath(keys[0]);
const path = `${B2_PATH}/${key.fetchedAt.toISOString()}/raw/`;
logger.info("Processing", { path, urls: keys.map(getPublicB2Url) });

const allJsonAlreadyExists = list.find(
(item) => item.Key === `${path}all.json`
);
if (!allJsonAlreadyExists) {
const allEntries = (
await Promise.all(
keys.map(async (key) => {
const { Body } = await b2.send(
new GetObjectCommand({
Bucket: B2_BUCKET,
Key: key,
})
);
if (!Body) throw new Error("No body");
return JSON.parse(await Body.transformToString());
})
)
).flat();
await b2.send(
new PutObjectCommand({
Bucket: B2_BUCKET,
Key: `${path}all.json`,
Body: JSON.stringify(allEntries),
})
);
logger.info("Stored all.json", {
path,
url: getPublicB2Url(`${path}all.json`),
});
}

const q = sql`
INSERT INTO aerolineas_latest_flight_status (aerolineas_flight_id, last_updated, json)
SELECT DISTINCT ON(aerolineas_flight_id) json->>'$.id' as aerolineas_flight_id, ${fetchedAt.toISOString()} as last_updated, json
FROM read_json_auto(${publicUrl})
SELECT DISTINCT ON(aerolineas_flight_id) json->>'$.id' as aerolineas_flight_id, ${key.fetchedAt.toISOString()} as last_updated, json
FROM read_json_auto(${`${getB2Uri(path)}all.json`})
ON CONFLICT (aerolineas_flight_id)
DO UPDATE SET last_updated = EXCLUDED.last_updated, json = EXCLUDED.json
`;
logger.info("Executing", { sql: q });
await q;
}
});

const list = await getAllObjectsFromS3Bucket(B2_BUCKET, B2_PATH);
const queue = new PQueue({ concurrency: 50 });
const tasks = list
.filter((item) => item.Size && item.Size > 2) // filter out empty JSON arrays
.filter(
(item): item is { Key: string } =>
(item.Key?.includes("webaa-api") &&
item.Key?.includes("all-flights")) ||
false
)
.filter(({ Key }) => {
const { fetchedAt } = parsePath(Key);
if (payload.lastTimestamp && fetchedAt <= payload.lastTimestamp)
return false;
return true;
})
.map(({ Key }) => async () => {
logger.info("Processing", { Key, url: getPublicB2Url(Key) });
await processJson(Key);
});
logger.info(`Processing ${tasks.length} files`);
logger.info(`Processing ${tasks.length} collections`);

await queue.addAll(tasks);
},
Expand Down Expand Up @@ -73,6 +118,9 @@ function getPublicB2Url(path: string) {
split[1] ? encodeURIComponent(`?${split[1]}`) : ""
}`;
}
function getB2Uri(path: string) {
return `s3://${B2_BUCKET}/${path}`;
}

async function getAllObjectsFromS3Bucket(bucket: string, prefix: string) {
let isTruncated = true;
Expand Down

0 comments on commit bc57e1c

Please sign in to comment.