Skip to content

Commit 0d7864e

Browse files
authored
Merge pull request #811 from techmatters/CHI-3182-fix_scrubber_sqs
CHI-3182: Fix transcript scrubber SQS handling
2 parents dc535e6 + 75040aa commit 0d7864e

File tree

1 file changed

+55
-43
lines changed
  • hrm-domain/scheduled-tasks/transcript-scrubber

1 file changed

+55
-43
lines changed

hrm-domain/scheduled-tasks/transcript-scrubber/index.ts

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717
// eslint-disable-next-line import/no-extraneous-dependencies
1818
import { fetch } from 'undici';
1919
import { getS3Object, putS3Object } from '@tech-matters/s3-client';
20-
import { receiveSqsMessage, sendSqsMessage } from '@tech-matters/sqs-client';
21-
import { ContactJobAttemptResult } from '@tech-matters/types/ContactJob';
20+
import {
21+
deleteSqsMessage,
22+
receiveSqsMessage,
23+
sendSqsMessage,
24+
} from '@tech-matters/sqs-client';
25+
import { ContactJobAttemptResult } from '@tech-matters/types';
2226

2327
declare global {
2428
var fetch: typeof import('undici').fetch;
@@ -98,52 +102,60 @@ const pollQueue = async (): Promise<boolean> => {
98102
const messagesPayload = await receiveSqsMessage({
99103
queueUrl: PENDING_TRANSCRIPT_SQS_QUEUE_URL,
100104
});
101-
const [message] = Array.isArray(messagesPayload?.Messages)
105+
const messages = Array.isArray(messagesPayload?.Messages)
102106
? messagesPayload?.Messages
103107
: [];
104-
if (!message) {
105-
return false;
106-
}
107-
let parsedPendingMessage;
108-
try {
109-
parsedPendingMessage = JSON.parse(message.Body);
110-
const {
111-
jobId,
112-
contactId,
113-
accountSid,
114-
attemptNumber,
115-
originalLocation: { bucket, key },
116-
} = parsedPendingMessage;
117-
console.debug(
118-
`Scrubbing transcript: ${key} jobId: ${jobId} (attempt ${attemptNumber}), contact ${accountSid}/${contactId}`,
119-
);
120108

121-
const scrubbedKey = await scrubS3Transcript(bucket, key);
122-
await sendSqsMessage({
123-
queueUrl: COMPLETED_TRANSCRIPT_SQS_QUEUE_URL,
124-
message: JSON.stringify({
125-
...parsedPendingMessage,
126-
attemptPayload: {
127-
scrubbedLocation: { key: scrubbedKey, bucket },
128-
},
129-
attemptResult: ContactJobAttemptResult.SUCCESS,
130-
}),
131-
});
132-
console.log(
133-
`Successfully scrubbed transcript: ${key}, scrubbed version at ${scrubbedKey}${key}, jobId: ${jobId} (attempt ${attemptNumber}), contact ${accountSid}/${contactId}`,
134-
);
135-
} catch (error) {
136-
console.error(`Failed to scrub transcript`, error);
137-
const errorMessage = error instanceof Error ? error.message : String(error);
138-
await sendSqsMessage({
139-
queueUrl: COMPLETED_TRANSCRIPT_SQS_QUEUE_URL,
140-
message: JSON.stringify({
141-
attemptPayload: errorMessage,
142-
attemptResult: ContactJobAttemptResult.FAILURE,
143-
}),
109+
let moreToProcess = false;
110+
for (const message of messages) {
111+
moreToProcess = true;
112+
let parsedPendingMessage;
113+
// ECS tasks need to manually delete the message, unlike lambdas where deletion is automically handled for SQS inputs
114+
// Delete it first because polling handles retries
115+
await deleteSqsMessage({
116+
queueUrl: PENDING_TRANSCRIPT_SQS_QUEUE_URL,
117+
receiptHandle: message.ReceiptHandle,
144118
});
119+
try {
120+
parsedPendingMessage = JSON.parse(message.Body);
121+
const {
122+
jobId,
123+
contactId,
124+
accountSid,
125+
attemptNumber,
126+
originalLocation: { bucket, key },
127+
} = parsedPendingMessage;
128+
console.debug(
129+
`Scrubbing transcript: ${key} jobId: ${jobId} (attempt ${attemptNumber}), contact ${accountSid}/${contactId}`,
130+
);
131+
132+
const scrubbedKey = await scrubS3Transcript(bucket, key);
133+
await sendSqsMessage({
134+
queueUrl: COMPLETED_TRANSCRIPT_SQS_QUEUE_URL,
135+
message: JSON.stringify({
136+
...parsedPendingMessage,
137+
attemptPayload: {
138+
scrubbedLocation: { key: scrubbedKey, bucket },
139+
},
140+
attemptResult: ContactJobAttemptResult.SUCCESS,
141+
}),
142+
});
143+
console.log(
144+
`Successfully scrubbed transcript: ${key}, scrubbed version at ${scrubbedKey}${key}, jobId: ${jobId} (attempt ${attemptNumber}), contact ${accountSid}/${contactId}`,
145+
);
146+
} catch (error) {
147+
console.error(`Failed to scrub transcript`, error);
148+
const errorMessage = error instanceof Error ? error.message : String(error);
149+
await sendSqsMessage({
150+
queueUrl: COMPLETED_TRANSCRIPT_SQS_QUEUE_URL,
151+
message: JSON.stringify({
152+
attemptPayload: errorMessage,
153+
attemptResult: ContactJobAttemptResult.FAILURE,
154+
}),
155+
});
156+
}
145157
}
146-
return true;
158+
return moreToProcess;
147159
};
148160

149161
export const executeTask = async () => {

0 commit comments

Comments
 (0)