Skip to content

Commit

Permalink
refactor: add in index.ts for default handler
Browse files Browse the repository at this point in the history
  • Loading branch information
blacha committed Dec 16, 2024
1 parent f163ff0 commit 8ba3e75
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Client } from '@elastic/elasticsearch';

import { getYesterday } from '../date.js';
import { Elastic } from '../elastic.js';
import { handler } from '../handler.js';
import { main } from '../handler.js';
import { LogStats } from '../log.stats.js';
import { LogData } from './log.data.js';

Expand Down Expand Up @@ -59,7 +59,7 @@ describe('analytic lambda', () => {

await fsa.write(new URL(`mem://source/cfid.${shortDate}/data.txt.gz`), gzipSync(LogData));

await handler();
await main();

// One call to insert
assert.equal(operations.length, 1);
Expand Down Expand Up @@ -105,7 +105,7 @@ describe('analytic lambda', () => {

await fsa.write(new URL(`mem://source/cfid.${shortDate}/data.txt.gz`), gzipSync(LogData));

const ret = await handler().catch((e: Error) => e);
const ret = await main().catch((e: Error) => e);

assert.equal(String(ret), 'Error: Failed to index');

Expand Down
24 changes: 12 additions & 12 deletions packages/lambda-analytic-cloudfront/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { promisify } from 'node:util';
import { gzip } from 'node:zlib';

import { Env, fsa, LogConfig } from '@basemaps/shared';
import { LambdaRequest } from '@linzjs/lambda';
import pLimit from 'p-limit';
import { basename } from 'path';

Expand Down Expand Up @@ -31,15 +32,14 @@ function getEnvUrl(env: string): URL {
}
}

export async function handler(): Promise<void> {
export async function main(req: LambdaRequest): Promise<void> {
const SourceLocation = getEnvUrl(Env.Analytics.CloudFrontSourceBucket);
const CacheLocation = getEnvUrl(Env.Analytics.CacheBucket);
const CloudFrontId = Env.get(Env.Analytics.CloudFrontId);

const MaxToProcess = Env.getNumber(Env.Analytics.MaxRecords, 24 * 7 * 4); // Process 4 weeks of logs by default
const logger = LogConfig.get();

logger.info(
req.log.info(
{ source: SourceLocation.href, cacheLocation: CacheLocation.href, cloudFrontId: CloudFrontId },
'log:index:start',
);
Expand Down Expand Up @@ -70,19 +70,19 @@ export async function handler(): Promise<void> {
const promise = hourQ(async () => {
// Cache file exists skip processing
if (await fsa.exists(cacheUrl)) {
logger.debug({ prefix }, 'log:prefix:skip');
req.log.debug({ prefix }, 'log:prefix:skip');
return;
}

const startTime = performance.now();
logger.trace({ prefix }, 'log:prefix:start');
req.log.trace({ prefix }, 'log:prefix:start');
const logPrefix = new URL(`${CloudFrontId}.${prefix}`, SourceLocation);

const stats = new Map<string, LogStats>();

const logFiles = await fsa.toArray(fsa.list(logPrefix));
if (logFiles.length === 0) {
logger.info({ prefix }, 'log:prefix:no-files');
req.log.info({ prefix }, 'log:prefix:no-files');
return;
}

Expand All @@ -93,7 +93,7 @@ export async function handler(): Promise<void> {
const fileStartTime = performance.now();

const fileLines = await FileProcess.process(lf, stats);
logger.trace(
req.log.trace(
{
prefix: prefix,
file: basename(lf.pathname),
Expand All @@ -113,11 +113,11 @@ export async function handler(): Promise<void> {

// Extrac thte values
const allStats = [...stats.values()];
await Elastic.insert(prefix, allStats, logger);
await Elastic.insert(prefix, allStats, req.log);
// Ensure everything is indexed into elasticsearch before writing the cache to disk
await fsa.write(cacheUrl, await gzipPromise(JSON.stringify(allStats)));

logger.info(
req.log.info(
{
prefix: prefix,
files: logFiles.length,
Expand All @@ -134,17 +134,17 @@ export async function handler(): Promise<void> {

const rets = await Promise.allSettled(todo);

// If anythign fails to index write the errors out to a log file at the cache location
// If anything fails to index write the errors out to a log file at the cache location
if (Elastic.errors.length > 0) {
const errorLocation = new URL(`./errors-${new Date().toISOString()}.json`, CacheLocation);
logger.fatal({ errorLocation: errorLocation.href }, 'log:index:failed');
req.log.fatal({ errorLocation: errorLocation.href }, 'log:index:failed');
await fsa.write(errorLocation, JSON.stringify(Elastic.errors));
}

let failed = false;
for (const ret of rets) {
if (ret.status !== 'rejected') continue;
logger.fatal({ err: ret.reason }, 'log:index:failed');
req.log.fatal({ err: ret.reason }, 'log:index:failed');
failed = true;
}
if (failed) throw new Error('Failed to index');
Expand Down
6 changes: 6 additions & 0 deletions packages/lambda-analytic-cloudfront/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { LogConfig } from '@basemaps/shared';
import { lf } from '@linzjs/lambda';

import { main } from './handler.js';

export const handler = lf.handler(main, { tracePercent: 0, rejectOnError: true }, LogConfig.get());

0 comments on commit 8ba3e75

Please sign in to comment.