diff --git a/apps/server/src/Services/GeoEvent/GeoEventHandler.ts b/apps/server/src/Services/GeoEvent/GeoEventHandler.ts index e0d63cf1..7e0d1738 100644 --- a/apps/server/src/Services/GeoEvent/GeoEventHandler.ts +++ b/apps/server/src/Services/GeoEvent/GeoEventHandler.ts @@ -33,7 +33,6 @@ const processGeoEvents = async (geoEventProviderClientId: GeoEventProviderClient newGeoEvents.push(fetchedEvents[i]); } } - return newGeoEvents; }; diff --git a/apps/server/src/Services/SiteAlert/CreateSiteAlert.ts b/apps/server/src/Services/SiteAlert/CreateSiteAlert.ts index 4a2abea2..848db838 100644 --- a/apps/server/src/Services/SiteAlert/CreateSiteAlert.ts +++ b/apps/server/src/Services/SiteAlert/CreateSiteAlert.ts @@ -8,190 +8,247 @@ const createSiteAlerts = async ( geoEventProviderClientId: GeoEventProviderClientId, slice: string, ) => { - let siteAlertsCreatedCount = 0; + let totalSiteAlertsCreatedCount = 0; + let geoEventBatch: number = 1000; + let moreToProcess = true; + // Automatically process any unprocessed geoEvents older than 24 hours ago. + await prisma.$executeRaw` + UPDATE "GeoEvent" + SET "isProcessed" = true + WHERE "isProcessed" = false + AND "eventDate" < NOW() - INTERVAL '24 HOURS'; + `; + // Use a different SQL for GEOSTATIONARY satellite if(geoEventProviderClientId === 'GEOSTATIONARY'){ - try { - const siteAlertCreationQuery = Prisma.sql`INSERT INTO "SiteAlert" (id, TYPE, "isProcessed", "eventDate", "detectedBy", confidence, latitude, longitude, "siteId", "data", "distance") - - SELECT - gen_random_uuid(), - e.type, - FALSE, - e."eventDate", - ${geoEventProviderClientId}, - e.confidence, - e.latitude, - e.longitude, - s.id AS SiteId, - e.data, - ST_Distance(ST_SetSRID(ST_MakePoint(e.longitude, e.latitude), 4326), ST_GeomFromEWKB(decode(dg_elem, 'hex'))) AS distance - FROM - "GeoEvent" e - CROSS JOIN - "Site" s, - jsonb_array_elements_text(s."geometry"->'properties'->'detection_geometry') AS dg_elem - WHERE - s."type" = 'MultiPolygon' - AND s."deletedAt" IS NULL - AND s."isMonitored" = TRUE - AND (s."stopAlertUntil" IS NULL OR s."stopAlertUntil" < CURRENT_TIMESTAMP) - AND e."isProcessed" = FALSE - AND e. "geoEventProviderId" = ${geoEventProviderId} - AND EXISTS ( - SELECT 1 - FROM jsonb_array_elements_text(s.slices) AS slice_element - WHERE slice_element = ANY(string_to_array(${Prisma.raw(`'${slice}'`)}, ',')::text[]) - ) - AND ST_Within(ST_SetSRID(ST_MakePoint(e.longitude, e.latitude), 4326), ST_GeomFromEWKB(decode(dg_elem, 'hex'))) - AND NOT EXISTS ( + geoEventBatch = 500; + while(moreToProcess){ + const unprocessedGeoEvents = await prisma.geoEvent.findMany({ + where:{ + isProcessed: false, + geoEventProviderId: geoEventProviderId + }, + select:{ + id: true + }, + take: geoEventBatch + }) + if(unprocessedGeoEvents.length === 0){ + moreToProcess = false; + continue; // Skip the current iteration of the loop + // Since moreToProcess is false, continue will end the while clause here + } + const unprocessedGeoEventIds = unprocessedGeoEvents.map(geoEvent => geoEvent.id) + try { + // TODO: only take the first ${geoEventBatch} of geoEvents and join it with site and createSiteAlerts, keep doing so until there is no unprocessed geoEvents left. + const siteAlertCreationQuery = Prisma.sql`INSERT INTO "SiteAlert" (id, TYPE, "isProcessed", "eventDate", "detectedBy", confidence, latitude, longitude, "siteId", "data", "distance") + SELECT + gen_random_uuid(), + e.type, + FALSE, + e."eventDate", + ${geoEventProviderClientId}, + e.confidence, + e.latitude, + e.longitude, + s.id AS SiteId, + e.data, + ST_Distance(ST_SetSRID(ST_MakePoint(e.longitude, e.latitude), 4326), ST_GeomFromEWKB(decode(dg_elem, 'hex'))) AS distance + FROM + "GeoEvent" e + CROSS JOIN + "Site" s, + jsonb_array_elements_text(s."geometry"->'properties'->'detection_geometry') AS dg_elem + WHERE + e.id IN (${Prisma.join(unprocessedGeoEventIds)}) + AND s."type" = 'MultiPolygon' + AND s."deletedAt" IS NULL + AND s."isMonitored" = TRUE + AND (s."stopAlertUntil" IS NULL OR s."stopAlertUntil" < CURRENT_TIMESTAMP) + AND e."isProcessed" = FALSE + AND e. "geoEventProviderId" = ${geoEventProviderId} + AND EXISTS ( SELECT 1 - FROM "SiteAlert" - WHERE "SiteAlert".longitude = e.longitude - AND "SiteAlert".latitude = e.latitude - AND "SiteAlert"."eventDate" = e."eventDate" - AND "SiteAlert"."siteId" = s.id - ) - - UNION - - SELECT - gen_random_uuid(), - e.type, - FALSE, - e."eventDate", - ${geoEventProviderClientId}, - e.confidence, - e.latitude, - e.longitude, - s.id AS SiteId, - e.data, - ST_Distance(ST_SetSRID(e.geometry, 4326), s."detectionGeometry") AS distance - FROM - "GeoEvent" e - INNER JOIN "Site" s ON ST_Within(ST_SetSRID(e.geometry, 4326), s."detectionGeometry") - AND s."deletedAt" IS NULL - AND s."isMonitored" = TRUE - AND (s."stopAlertUntil" IS NULL OR s."stopAlertUntil" < CURRENT_TIMESTAMP) - AND e."isProcessed" = FALSE - AND e. "geoEventProviderId" = ${geoEventProviderId} - AND (s.type = 'Polygon' OR s.type = 'Point') - AND EXISTS ( - SELECT 1 - FROM jsonb_array_elements_text(s.slices) AS slice_element - WHERE slice_element = ANY(string_to_array(${Prisma.raw(`'${slice}'`)}, ',')::text[]) - ) - AND NOT EXISTS ( + FROM jsonb_array_elements_text(s.slices) AS slice_element + WHERE slice_element = ANY(string_to_array(${Prisma.raw(`'${slice}'`)}, ',')::text[]) + ) + AND ST_Within(ST_SetSRID(ST_MakePoint(e.longitude, e.latitude), 4326), ST_GeomFromEWKB(decode(dg_elem, 'hex'))) + AND NOT EXISTS ( + SELECT 1 + FROM "SiteAlert" + WHERE "SiteAlert".longitude = e.longitude + AND "SiteAlert".latitude = e.latitude + AND "SiteAlert"."eventDate" = e."eventDate" + AND "SiteAlert"."siteId" = s.id + ) + + UNION + + SELECT + gen_random_uuid(), + e.type, + FALSE, + e."eventDate", + ${geoEventProviderClientId}, + e.confidence, + e.latitude, + e.longitude, + s.id AS SiteId, + e.data, + ST_Distance(ST_SetSRID(e.geometry, 4326), s."detectionGeometry") AS distance + FROM + "GeoEvent" e + INNER JOIN "Site" s ON ST_Within(ST_SetSRID(e.geometry, 4326), s."detectionGeometry") + AND e.id IN (${Prisma.join(unprocessedGeoEventIds)}) + AND s."deletedAt" IS NULL + AND s."isMonitored" = TRUE + AND (s."stopAlertUntil" IS NULL OR s."stopAlertUntil" < CURRENT_TIMESTAMP) + AND e."isProcessed" = FALSE + AND e. "geoEventProviderId" = ${geoEventProviderId} + AND (s.type = 'Polygon' OR s.type = 'Point') + AND EXISTS ( SELECT 1 - FROM "SiteAlert" - WHERE "SiteAlert".longitude = e.longitude - AND "SiteAlert".latitude = e.latitude - AND "SiteAlert"."eventDate" = e."eventDate" - AND "SiteAlert"."siteId" = s.id - ); - `; - - const updateGeoEventIsProcessedToTrue = Prisma.sql`UPDATE "GeoEvent" SET "isProcessed" = true WHERE "isProcessed" = false AND "geoEventProviderId" = ${geoEventProviderId}`; - // REMOVE after 2nd release - const updateGeostationarySiteAlertIsProcessedToTrue = Prisma.sql`UPDATE "SiteAlert" SET "isProcessed" = true WHERE "isProcessed" = false AND "detectedBy" = ${geoEventProviderClientId}`; - // Create SiteAlerts by joining New GeoEvents and Sites that have the event's location in their proximity - // And, Set all GeoEvents as processed - const results = await prisma.$transaction([ - prisma.$executeRaw(siteAlertCreationQuery), - prisma.$executeRaw(updateGeoEventIsProcessedToTrue), - ]); - await prisma.$executeRaw(updateGeostationarySiteAlertIsProcessedToTrue) - siteAlertsCreatedCount = results[0]; - } catch (error) { - logger(`Failed to create SiteAlerts. Error: ${error}`, 'error'); + FROM jsonb_array_elements_text(s.slices) AS slice_element + WHERE slice_element = ANY(string_to_array(${Prisma.raw(`'${slice}'`)}, ',')::text[]) + ) + AND NOT EXISTS ( + SELECT 1 + FROM "SiteAlert" + WHERE "SiteAlert".longitude = e.longitude + AND "SiteAlert".latitude = e.latitude + AND "SiteAlert"."eventDate" = e."eventDate" + AND "SiteAlert"."siteId" = s.id + ); + `; + + const updateGeoEventIsProcessedToTrue = Prisma.sql` + UPDATE "GeoEvent" + SET "isProcessed" = true + WHERE id IN (${Prisma.join(unprocessedGeoEventIds)})`; + // REMOVE after 2nd release + const updateGeostationarySiteAlertIsProcessedToTrue = Prisma.sql`UPDATE "SiteAlert" SET "isProcessed" = true WHERE "isProcessed" = false AND "detectedBy" = ${geoEventProviderClientId}`; + // Create SiteAlerts by joining New GeoEvents and Sites that have the event's location in their proximity + // And, Set all GeoEvents as processed + const results = await prisma.$transaction([ + prisma.$executeRaw(siteAlertCreationQuery), + prisma.$executeRaw(updateGeoEventIsProcessedToTrue), + ]); + await prisma.$executeRaw(updateGeostationarySiteAlertIsProcessedToTrue) + let siteAlertsCreatedInBatchCount = results[0]; + totalSiteAlertsCreatedCount += siteAlertsCreatedInBatchCount + } catch (error) { + logger(`Failed to create SiteAlerts. Error: ${error}`, 'error'); + } } }else { // Non-geostationary satellites do not belong to a specific site - try { - const siteAlertCreationQuery = Prisma.sql`INSERT INTO "SiteAlert" (id, TYPE, "isProcessed", "eventDate", "detectedBy", confidence, latitude, longitude, "siteId", "data", "distance") - - SELECT - gen_random_uuid(), - e.type, - FALSE, - e."eventDate", - ${geoEventProviderClientId}, - e.confidence, - e.latitude, - e.longitude, - s.id AS SiteId, - e.data, - ST_Distance(ST_SetSRID(ST_MakePoint(e.longitude, e.latitude), 4326), ST_GeomFromEWKB(decode(dg_elem, 'hex'))) AS distance - FROM - "GeoEvent" e - CROSS JOIN - "Site" s, - jsonb_array_elements_text(s."geometry"->'properties'->'detection_geometry') AS dg_elem - WHERE - s."type" = 'MultiPolygon' - AND s."deletedAt" IS NULL - AND s."isMonitored" = TRUE - AND (s."stopAlertUntil" IS NULL OR s."stopAlertUntil" < CURRENT_TIMESTAMP) - AND e."isProcessed" = FALSE - AND e. "geoEventProviderId" = ${geoEventProviderId} - AND s.slices @> ('["' || ${slice} || '"]')::jsonb - AND ST_Within(ST_SetSRID(ST_MakePoint(e.longitude, e.latitude), 4326), ST_GeomFromEWKB(decode(dg_elem, 'hex'))) - AND NOT EXISTS ( - SELECT 1 - FROM "SiteAlert" - WHERE "SiteAlert".longitude = e.longitude - AND "SiteAlert".latitude = e.latitude - AND "SiteAlert"."eventDate" = e."eventDate" - AND "SiteAlert"."siteId" = s.id - ) - - UNION - - SELECT - gen_random_uuid(), - e.type, - FALSE, - e."eventDate", - ${geoEventProviderClientId}, - e.confidence, - e.latitude, - e.longitude, - s.id AS SiteId, - e.data, - ST_Distance(ST_SetSRID(e.geometry, 4326), s."detectionGeometry") AS distance - FROM - "GeoEvent" e - INNER JOIN "Site" s ON ST_Within(ST_SetSRID(e.geometry, 4326), s."detectionGeometry") - AND s."deletedAt" IS NULL - AND s."isMonitored" = TRUE - AND (s."stopAlertUntil" IS NULL OR s."stopAlertUntil" < CURRENT_TIMESTAMP) - AND e."isProcessed" = FALSE - AND e. "geoEventProviderId" = ${geoEventProviderId} - AND s.slices @> ('["' || ${slice} || '"]')::jsonb - AND (s.type = 'Polygon' OR s.type = 'Point') - AND NOT EXISTS ( - SELECT 1 - FROM "SiteAlert" - WHERE "SiteAlert".longitude = e.longitude - AND "SiteAlert".latitude = e.latitude - AND "SiteAlert"."eventDate" = e."eventDate" - AND "SiteAlert"."siteId" = s.id - ); - `; - - const updateGeoEventIsProcessedToTrue = Prisma.sql`UPDATE "GeoEvent" SET "isProcessed" = true WHERE "isProcessed" = false AND "geoEventProviderId" = ${geoEventProviderId} AND "slice" = ${slice}`; - - // Create SiteAlerts by joining New GeoEvents and Sites that have the event's location in their proximity - // And, Set all GeoEvents as processed - const results = await prisma.$transaction([ - prisma.$executeRaw(siteAlertCreationQuery), - prisma.$executeRaw(updateGeoEventIsProcessedToTrue), - ]); - siteAlertsCreatedCount = results[0]; - } catch (error) { - logger(`Failed to create SiteAlerts. Error: ${error}`, 'error'); + geoEventBatch = 1000; + while(moreToProcess){ + const unprocessedGeoEvents = await prisma.geoEvent.findMany({ + where:{ + isProcessed: false, + geoEventProviderId: geoEventProviderId + }, + select:{ + id: true + }, + take: geoEventBatch + }) + if(unprocessedGeoEvents.length === 0){ + moreToProcess = false; + continue; // Skip the current iteration of the loop + // Since moreToProcess is false, continue will end the while clause here + } + const unprocessedGeoEventIds = unprocessedGeoEvents.map(geoEvent => geoEvent.id) + try { + const siteAlertCreationQuery = Prisma.sql`INSERT INTO "SiteAlert" (id, TYPE, "isProcessed", "eventDate", "detectedBy", confidence, latitude, longitude, "siteId", "data", "distance") + + SELECT + gen_random_uuid(), + e.type, + FALSE, + e."eventDate", + ${geoEventProviderClientId}, + e.confidence, + e.latitude, + e.longitude, + s.id AS SiteId, + e.data, + ST_Distance(ST_SetSRID(ST_MakePoint(e.longitude, e.latitude), 4326), ST_GeomFromEWKB(decode(dg_elem, 'hex'))) AS distance + FROM + "GeoEvent" e + CROSS JOIN + "Site" s, + jsonb_array_elements_text(s."geometry"->'properties'->'detection_geometry') AS dg_elem + WHERE + e.id IN (${Prisma.join(unprocessedGeoEventIds)}) + AND s."type" = 'MultiPolygon' + AND s."deletedAt" IS NULL + AND s."isMonitored" = TRUE + AND (s."stopAlertUntil" IS NULL OR s."stopAlertUntil" < CURRENT_TIMESTAMP) + AND e."isProcessed" = FALSE + AND e. "geoEventProviderId" = ${geoEventProviderId} + AND s.slices @> ('["' || ${slice} || '"]')::jsonb + AND ST_Within(ST_SetSRID(ST_MakePoint(e.longitude, e.latitude), 4326), ST_GeomFromEWKB(decode(dg_elem, 'hex'))) + AND NOT EXISTS ( + SELECT 1 + FROM "SiteAlert" + WHERE "SiteAlert".longitude = e.longitude + AND "SiteAlert".latitude = e.latitude + AND "SiteAlert"."eventDate" = e."eventDate" + AND "SiteAlert"."siteId" = s.id + ) + + UNION + + SELECT + gen_random_uuid(), + e.type, + FALSE, + e."eventDate", + ${geoEventProviderClientId}, + e.confidence, + e.latitude, + e.longitude, + s.id AS SiteId, + e.data, + ST_Distance(ST_SetSRID(e.geometry, 4326), s."detectionGeometry") AS distance + FROM + "GeoEvent" e + INNER JOIN "Site" s ON ST_Within(ST_SetSRID(e.geometry, 4326), s."detectionGeometry") + AND e.id IN (${Prisma.join(unprocessedGeoEventIds)}) + AND s."deletedAt" IS NULL + AND s."isMonitored" = TRUE + AND (s."stopAlertUntil" IS NULL OR s."stopAlertUntil" < CURRENT_TIMESTAMP) + AND e."isProcessed" = FALSE + AND e. "geoEventProviderId" = ${geoEventProviderId} + AND s.slices @> ('["' || ${slice} || '"]')::jsonb + AND (s.type = 'Polygon' OR s.type = 'Point') + AND NOT EXISTS ( + SELECT 1 + FROM "SiteAlert" + WHERE "SiteAlert".longitude = e.longitude + AND "SiteAlert".latitude = e.latitude + AND "SiteAlert"."eventDate" = e."eventDate" + AND "SiteAlert"."siteId" = s.id + ); + `; + + const updateGeoEventIsProcessedToTrue = Prisma.sql`UPDATE "GeoEvent" SET "isProcessed" = true WHERE "isProcessed" = false AND "geoEventProviderId" = ${geoEventProviderId} AND "slice" = ${slice}`; + + // Create SiteAlerts by joining New GeoEvents and Sites that have the event's location in their proximity + // And, Set all GeoEvents as processed + const results = await prisma.$transaction([ + prisma.$executeRaw(siteAlertCreationQuery), + prisma.$executeRaw(updateGeoEventIsProcessedToTrue), + ]); + let siteAlertsCreatedInBatchCount = results[0]; + totalSiteAlertsCreatedCount += siteAlertsCreatedInBatchCount + } catch (error) { + logger(`Failed to create SiteAlerts. Error: ${error}`, 'error'); + } } } - return siteAlertsCreatedCount; + return totalSiteAlertsCreatedCount; }; export default createSiteAlerts; diff --git a/apps/server/src/pages/api/cron/geo-event-fetcher.ts b/apps/server/src/pages/api/cron/geo-event-fetcher.ts index 28b975b4..cb519c5c 100644 --- a/apps/server/src/pages/api/cron/geo-event-fetcher.ts +++ b/apps/server/src/pages/api/cron/geo-event-fetcher.ts @@ -55,6 +55,7 @@ export default async function alertFetcher(req: NextApiRequest, res: NextApiResp let newSiteAlertCount = 0; let processedProviders = 0; + const fetchCount = Math.max(5, limit * 2); // while (processedProviders <= limit) { const activeProviders: GeoEventProvider[] = await prisma.$queryRaw` SELECT * @@ -63,8 +64,18 @@ export default async function alertFetcher(req: NextApiRequest, res: NextApiResp AND "fetchFrequency" IS NOT NULL AND ("lastRun" + ("fetchFrequency" || ' minutes')::INTERVAL) < (current_timestamp AT TIME ZONE 'UTC') ORDER BY (current_timestamp AT TIME ZONE 'UTC' - "lastRun") DESC - LIMIT ${limit}; + LIMIT ${fetchCount}; `; + function shuffleArray(array: GeoEventProvider[]) { + for (let i = array.length - 1; i > 0; i--) { + const j = Math.floor(Math.random() * (i + 1)); + [array[i], array[j]] = [array[j], array[i]]; // swap elements + } + return array; + } + const shuffledProviders = shuffleArray([...activeProviders]); + const selectedProviders = shuffledProviders.slice(0, limit); + // Filter out those active providers whose last (run date + fetchFrequency (in minutes) > current time // Break the loop if there are no active providers @@ -73,7 +84,7 @@ export default async function alertFetcher(req: NextApiRequest, res: NextApiResp // break; // } - logger(`Running Geo Event Fetcher. Taking ${activeProviders.length} eligible providers.`, "info"); + logger(`Running Geo Event Fetcher. Taking ${selectedProviders.length} eligible providers.`, "info"); // Define Chunk Size for processGeoEvents const chunkSize = 2000; @@ -88,7 +99,7 @@ export default async function alertFetcher(req: NextApiRequest, res: NextApiResp } // Loop for each active provider and fetch geoEvents - const promises = activeProviders.map(async (provider) => { + const promises = selectedProviders.map(async (provider) => { const { config, id: geoEventProviderId, clientId: geoEventProviderClientId, clientApiKey, lastRun } = provider // For GOES-16, geoEventProviderId is 55, geoEventProviderClientId is GEOSTATIONARY, and clientApiKey is GOES-16 const parsedConfig: GeoEventProviderConfig = JSON.parse(JSON.stringify(config))