Skip to content

Commit

Permalink
console: support for loading data to CH through S3 for ingested messages
Browse files Browse the repository at this point in the history
console: syncs: tweaks for the TIME_EXCEEDED status display
  • Loading branch information
absorbb committed Jan 30, 2025
1 parent 851762d commit e608bf2
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
20 changes: 13 additions & 7 deletions webapps/console/pages/[workspaceId]/syncs/tasks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
CalendarIcon,
CheckCircle2,
ChevronLeft,
ClockAlert,
Edit3,
ListMinusIcon,
PlayCircle,
Expand Down Expand Up @@ -162,11 +163,13 @@ function TaskStatus0({ task, loading }: { task: TasksDbModel & TaskStats; loadin
<CheckCircle2 style={{ color: "green" }} />
) : props.status === "PARTIAL" ? (
<AlertCircle style={{ color: "orange" }} />
) : props.status === "TIME_EXCEEDED" ? (
<ClockAlert style={{ color: "orange" }} />
) : props.status === "RUNNING" ? (
<PlayCircle style={{ color: "blue" }} />
) : props.status === "SKIPPED" ? (
<XCircle style={{ color: "orange" }} />
) : props.status === "CANCELLED" || props.status === "TIME_EXCEEDED" ? (
<XCircle style={{ color: "gray" }} />
) : props.status === "CANCELLED" ? (
<XCircle style={{ color: "gray" }} />
) : (
<XCircle style={{ color: "red" }} />
Expand Down Expand Up @@ -224,7 +227,7 @@ function TaskStatus0({ task, loading }: { task: TasksDbModel & TaskStats; loadin
case "TIME_EXCEEDED":
return (
<SyncStatus status={task.status}>
<Tag style={{ marginRight: 0 }}>
<Tag color={"orange"} style={{ marginRight: 0 }}>
{task.status} <FaExternalLinkAlt className={"inline ml-0.5 w-2.5 h-2.5"} />
</Tag>
<span className={"text-xxs text-gray-500"}>show stats</span>
Expand Down Expand Up @@ -280,9 +283,9 @@ function TaskStatusResultTable({ stats, error }: { stats: any[]; error?: string
render: (text, record) => {
if (record.status === "SUCCESS") {
return <Tag color="green">SUCCESS</Tag>;
} else if (record.status === "PARTIAL") {
return <Tag color="orange">PARTIAL</Tag>;
} else if (record.status === "CANCELLED" || record.status === "TIME_EXCEEDED") {
} else if (record.status === "PARTIAL" || record.status === "TIME_EXCEEDED") {
return <Tag color="orange">{record.status}</Tag>;
} else if (record.status === "CANCELLED") {
return <Tag>{record.status}</Tag>;
} else if (record.status === "PENDING") {
return <Tag>PENDING</Tag>;
Expand Down Expand Up @@ -504,7 +507,10 @@ function TasksTable({ tasks, loading, linksMap, servicesMap, destinationsMap, re
if (task.status === "SUCCESS") {
return <div className={"whitespace-nowrap"}>{task.successStreams}</div>;
} else if (
(task.status === "PARTIAL" || task.status === "RUNNING" || task.status === "CANCELLED") &&
(task.status === "PARTIAL" ||
task.status === "TIME_EXCEEDED" ||
task.status === "RUNNING" ||
task.status === "CANCELLED") &&
task.totalStreams
) {
return (
Expand Down
32 changes: 23 additions & 9 deletions webapps/console/pages/api/admin/export/[name]/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ export type Export = {
};

const batchSize = 1000;
const clickhouseUploadS3Bucket = process.env.CLICKHOUSE_UPLOAD_S3_BUCKET;
const s3Region = process.env.S3_REGION;
const s3AccessKeyId = process.env.S3_ACCESS_KEY_ID;
const s3SecretAccessKey = process.env.S3_SECRET_ACCESS_KEY;
const clickhouseS3Configured = clickhouseUploadS3Bucket && s3Region && s3AccessKeyId && s3SecretAccessKey;

const safeLastModified = new Date(2024, 0, 1, 0, 0, 0, 0);

Expand Down Expand Up @@ -78,15 +83,24 @@ const exports: Export[] = [
writer.write(",");
}
const credentials = omit(to.config, "destinationType", "type", "name");
if (destinationType === "clickhouse" && data.clickhouseSettings) {
const extraParams = Object.fromEntries(
(data.clickhouseSettings as string)
.split("\n")
.filter(s => s.includes("="))
.map(s => s.split("="))
.map(([k, v]) => [k.trim(), v.trim()])
);
credentials.parameters = { ...(credentials.parameters || {}), ...extraParams };
if (destinationType === "clickhouse") {
if (data.clickhouseSettings) {
const extraParams = Object.fromEntries(
(data.clickhouseSettings as string)
.split("\n")
.filter(s => s.includes("="))
.map(s => s.split("="))
.map(([k, v]) => [k.trim(), v.trim()])
);
credentials.parameters = { ...(credentials.parameters || {}), ...extraParams };
}
if (credentials.loadAsJson && !credentials.provisioned && clickhouseS3Configured) {
credentials.s3Region = s3Region;
credentials.s3AccessKeyId = s3AccessKeyId;
credentials.s3SecretAccessKey = s3SecretAccessKey;
credentials.s3Bucket = clickhouseUploadS3Bucket;
credentials.s3UsePresignedURL = true;
}
}
// if (data.timestampColumn) {
// // use timestampColumn field as discriminator field when doing local deduplication
Expand Down

0 comments on commit e608bf2

Please sign in to comment.