value = queue.take();
+ if (value == POISON_PILL) {
+ break;
+ }
+ ByteBuffer valueBuffer = value.get();
+ metricsFacade.recordDebeziumEvent();
+ switch (protoType) {
+ case ROW -> handleRowChangeSourceRecord(0, valueBuffer, 0);
+ case TRANS -> handleTransactionSourceRecord(valueBuffer, 0);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ LOGGER.error("Error in async processing", e);
+ }
+ }
+
+ private SchemaTableName computeQueueKey(ByteBuffer keyBuffer, ProtoType protoType) {
+ switch (protoType) {
+ case ROW -> {
+ int schemaLen = keyBuffer.getInt();
+ int tableLen = keyBuffer.getInt();
+ String schemaName = readString(keyBuffer, schemaLen);
+ String tableName = readString(keyBuffer, tableLen);
+ return new SchemaTableName(schemaName, tableName);
+ }
+ case TRANS -> {
+ return transactionSchemaTableName;
+ }
+ default -> {
+ throw new IllegalArgumentException("Proto type " + protoType.toString());
+ }
+ }
+ }
+
+ private void handleRowChangeSourceRecord(SchemaTableName schemaTableName, ByteBuffer dataBuffer) {
+ tableProvidersManagerImpl.routeRecord(schemaTableName, dataBuffer);
+ }
+
+ private void handleRowChangeSourceRecord(ByteBuffer keyBuffer, ByteBuffer dataBuffer) {
+ {
+ // CODE BLOCK VERSION 2
+// long tableId = keyBuffer.getLong();
+// try
+// {
+// schemaTableName = tableMetadataRegistry.getSchemaTableName(tableId);
+// } catch (SinkException e)
+// {
+// throw new RuntimeException(e);
+// }
+ }
+
+// tableProvidersManagerImpl.routeRecord(schemaTableName, dataBuffer);
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/util/BlockingBoundedMap.java b/src/main/java/io/pixelsdb/pixels/sink/util/BlockingBoundedMap.java
new file mode 100644
index 0000000..a48348c
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/util/BlockingBoundedMap.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.util;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
+import java.util.function.BiFunction;
+
+/**
+ * A thread-safe bounded map that blocks when full.
+ *
+ * Similar to ConcurrentHashMap, but with a capacity limit.
+ * When the map reaches its maximum size, any new insertion or compute
+ * for a new key will block until space becomes available.
+ */
+public class BlockingBoundedMap {
+ private final int maxSize;
+ private final Semaphore semaphore;
+ private final ConcurrentMap map;
+
+ public BlockingBoundedMap(int maxSize) {
+ this.maxSize = maxSize;
+ this.map = new ConcurrentHashMap<>();
+ this.semaphore = new Semaphore(maxSize);
+ }
+
+ /**
+ * Puts a key-value pair into the map.
+ * If the map is full, this call blocks until space becomes available.
+ */
+ private void put(K key, V value) throws InterruptedException {
+ semaphore.acquire(); // block if full
+ V prev = map.put(key, value);
+ if (prev != null) {
+ // replaced existing value — no new space consumed
+ semaphore.release();
+ }
+ }
+
+ public V get(K key) {
+ return map.get(key);
+ }
+
+ /**
+ * Removes a key from the map and releases one permit if a value was present.
+ */
+ public V remove(K key) {
+ V val = map.remove(key);
+ if (val != null) {
+ semaphore.release();
+ }
+ return val;
+ }
+
+ public int size() {
+ return map.size();
+ }
+
+ /**
+ * Atomically computes a new value for a key, blocking if capacity is full.
+ *
+ * - If the key is new and capacity is full, this method blocks until space is freed.
+ * - If the key already exists, it does not block.
+ * - If the remapping function returns null, the key is removed and capacity is released.
+ */
+ public V compute(K key, BiFunction super K, ? super V, ? extends V> remappingFunction) {
+ for (; ; ) {
+ V oldVal = map.get(key);
+ if (oldVal == null) {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+
+ V newVal = remappingFunction.apply(key, null);
+ if (newVal == null) {
+ semaphore.release();
+ return null;
+ }
+
+ V existing = map.putIfAbsent(key, newVal);
+ if (existing == null) {
+ return newVal;
+ } else {
+ semaphore.release();
+ continue;
+ }
+ } else {
+ return map.compute(key, remappingFunction);
+ }
+ }
+ }
+
+ public Set keySet() {
+ return map.keySet();
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/util/DataTransform.java b/src/main/java/io/pixelsdb/pixels/sink/util/DataTransform.java
new file mode 100644
index 0000000..fd3c872
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/util/DataTransform.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.util;
+
+import com.google.protobuf.ByteString;
+import io.pixelsdb.pixels.retina.RetinaProto;
+import io.pixelsdb.pixels.sink.SinkProto;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataTransform {
+ private static ByteString longToByteString(long value) {
+ byte[] bytes = ByteBuffer.allocate(Long.BYTES).putLong(value).array();
+ return ByteString.copyFrom(bytes);
+ }
+
+ @Deprecated
+ public static void updateTimeStamp(List updateData, long txStartTime) {
+ ByteString timestampBytes = longToByteString(txStartTime);
+
+ for (RetinaProto.TableUpdateData.Builder tableUpdateDataBuilder : updateData) {
+ int insertDataCount = tableUpdateDataBuilder.getInsertDataCount();
+ for (int i = 0; i < insertDataCount; i++) {
+ RetinaProto.InsertData.Builder insertBuilder = tableUpdateDataBuilder.getInsertDataBuilder(i);
+ int colValueCount = insertBuilder.getColValuesCount();
+ if (colValueCount > 0) {
+ insertBuilder.setColValues(colValueCount - 1, timestampBytes);
+ }
+ }
+
+ int updateDataCount = tableUpdateDataBuilder.getUpdateDataCount();
+ for (int i = 0; i < updateDataCount; i++) {
+ RetinaProto.UpdateData.Builder updateBuilder = tableUpdateDataBuilder.getUpdateDataBuilder(i);
+
+ int colValueCount = updateBuilder.getColValuesCount();
+ if (colValueCount > 0) {
+ updateBuilder.setColValues(colValueCount - 1, timestampBytes);
+ }
+ }
+ }
+ }
+
+
+ public static List updateRecordTimestamp(List records, long timestamp) {
+ if (records == null || records.isEmpty()) {
+ return records;
+ }
+ SinkProto.ColumnValue timestampColumn = getTimestampColumn(timestamp);
+ List updatedRecords = new ArrayList<>(records.size());
+ for (SinkProto.RowRecord record : records) {
+
+ updatedRecords.add(updateRecordTimestamp(record, timestampColumn));
+ }
+ return updatedRecords;
+ }
+
+ private static SinkProto.ColumnValue getTimestampColumn(long timestamp) {
+ ByteString timestampBytes = longToByteString(timestamp);
+ return SinkProto.ColumnValue.newBuilder().setValue(timestampBytes).build();
+ }
+
+ public static SinkProto.RowRecord updateRecordTimestamp(SinkProto.RowRecord record, long timestamp) {
+ if (record == null) {
+ return null;
+ }
+ SinkProto.ColumnValue timestampColumn = getTimestampColumn(timestamp);
+ return updateRecordTimestamp(record, timestampColumn);
+ }
+
+ public static void updateRecordTimestamp(SinkProto.RowRecord.Builder recordBuilder, long timestamp) {
+ switch (recordBuilder.getOp()) {
+ case INSERT:
+ case UPDATE:
+ case SNAPSHOT:
+ if (recordBuilder.hasAfter()) {
+ SinkProto.RowValue.Builder afterBuilder = recordBuilder.getAfterBuilder();
+ int colCount = afterBuilder.getValuesCount();
+ if (colCount > 0) {
+ afterBuilder.setValues(colCount - 1, getTimestampColumn(timestamp));
+ }
+ }
+ break;
+ case DELETE:
+ default:
+ break;
+ }
+ }
+
+ private static SinkProto.RowRecord updateRecordTimestamp(SinkProto.RowRecord.Builder recordBuilder, SinkProto.ColumnValue timestampColumn) {
+ switch (recordBuilder.getOp()) {
+ case INSERT:
+ case UPDATE:
+ case SNAPSHOT:
+ if (recordBuilder.hasAfter()) {
+ SinkProto.RowValue.Builder afterBuilder = recordBuilder.getAfterBuilder();
+ int colCount = afterBuilder.getValuesCount();
+ if (colCount > 0) {
+ afterBuilder.setValues(colCount - 1, timestampColumn);
+ }
+ }
+ break;
+ case DELETE:
+ default:
+ break;
+ }
+ return recordBuilder.build();
+ }
+
+ private static SinkProto.RowRecord updateRecordTimestamp(SinkProto.RowRecord record, SinkProto.ColumnValue timestampColumn) {
+ SinkProto.RowRecord.Builder recordBuilder = record.toBuilder();
+ return updateRecordTimestamp(recordBuilder, timestampColumn);
+ }
+
+ public static String extractTableName(String topic) {
+ String[] parts = topic.split("\\.");
+ return parts[parts.length - 1];
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/util/DateUtil.java b/src/main/java/io/pixelsdb/pixels/sink/util/DateUtil.java
new file mode 100644
index 0000000..41b2d6f
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/util/DateUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.util;
+
+
+import io.pixelsdb.pixels.core.utils.DatetimeUtils;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Calendar;
+import java.util.Date;
+
+/**
+ * @package: io.pixelsdb.pixels.sink.util
+ * @className: DateUtil
+ * @author: AntiO2
+ * @date: 2025/8/21 17:31
+ */
+public class DateUtil {
+
+ public static Date fromDebeziumDate(int epochDay) {
+ Calendar cal = Calendar.getInstance();
+ cal.clear();
+ cal.set(1970, Calendar.JANUARY, 1); // epoch 起点
+ cal.add(Calendar.DAY_OF_MONTH, epochDay); // 加上天数
+ return cal.getTime();
+ }
+
+ // TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)
+ public static Date fromDebeziumTimestamp(long epochTs) {
+ return new Date(epochTs / 1000);
+ }
+
+ public static String convertDateToDayString(Date date) {
+ // "yyyy-MM-dd HH:mm:ss.SSS"
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+ String dateToString = df.format(date);
+ return (dateToString);
+ }
+
+ public static String convertDateToString(Date date) {
+ // "yyyy-MM-dd HH:mm:ss.SSS"
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ String dateToString = df.format(date);
+ return (dateToString);
+ }
+
+ public static String convertTimestampToString(Date date) {
+ if (date == null) {
+ return null;
+ }
+ SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
+ return sdf.format(date);
+ }
+
+ public static String convertDebeziumTimestampToString(long epochTs) {
+ LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(epochTs), ZoneId.systemDefault());
+ DateTimeFormatter formatter = DatetimeUtils.SQL_LOCAL_DATE_TIME;
+ return dateTime.format(formatter);
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/util/EtcdFileRegistry.java b/src/main/java/io/pixelsdb/pixels/sink/util/EtcdFileRegistry.java
new file mode 100644
index 0000000..999fa3e
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/util/EtcdFileRegistry.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.util;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.etcd.jetcd.KeyValue;
+import io.pixelsdb.pixels.common.utils.EtcdUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * @package: io.pixelsdb.pixels.sink.util
+ * @className: EtcdFileRegistry
+ * @author: AntiO2
+ * @date: 2025/10/5 08:24
+ */
+public class EtcdFileRegistry {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EtcdFileRegistry.class);
+
+ private static final String REGISTRY_PREFIX = "/sink/proto/registry/";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final String topic;
+ private final String baseDir;
+ private final EtcdUtil etcd = EtcdUtil.Instance();
+ private final AtomicInteger nextFileId = new AtomicInteger(0);
+ private String currentFileKey;
+
+ public EtcdFileRegistry(String topic, String baseDir) {
+ this.topic = topic;
+ this.baseDir = baseDir;
+ initRegistry();
+ }
+
+ public static String extractPath(String etcdValue) {
+ try {
+ Map meta = OBJECT_MAPPER.readValue(etcdValue, Map.class);
+ return (String) meta.get("path");
+ } catch (IOException e) {
+ LOGGER.error("Failed to parse etcd value: {}", etcdValue, e);
+ return null;
+ }
+ }
+
+ private void initRegistry() {
+ List files = etcd.getKeyValuesByPrefix(filePrefix());
+ if (!files.isEmpty()) {
+ int maxId = files.stream()
+ .mapToInt(kv -> extractFileId(kv.getKey().toString()))
+ .max()
+ .orElse(0);
+ nextFileId.set(maxId + 1);
+ LOGGER.info("Initialized registry for topic {} with nextFileId={}", topic, nextFileId.get());
+ } else {
+ LOGGER.info("No existing files found for topic {}, starting fresh", topic);
+ }
+ }
+
+ private String topicPrefix() {
+ return REGISTRY_PREFIX + topic;
+ }
+
+ private String filePrefix() {
+ return topicPrefix() + "/files/";
+ }
+
+ private int extractFileId(String key) {
+ try {
+ String fileName = key.substring(key.lastIndexOf('/') + 1);
+ String id = fileName.replace(".proto", "");
+ return Integer.parseInt(id);
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ /**
+ * Create a new file and register it in etcd.
+ */
+ public synchronized String createNewFile() {
+ String fileName = String.format("%05d.proto", nextFileId.getAndIncrement());
+ String fullPath = baseDir + "/" + topic + "/" + fileName;
+
+ Map fileMeta = new HashMap<>();
+ fileMeta.put("path", fullPath);
+ fileMeta.put("created_at", String.valueOf(System.currentTimeMillis()));
+ fileMeta.put("status", "active");
+ currentFileKey = filePrefix() + fileName;
+
+ String jsonValue = null;
+ try {
+ jsonValue = OBJECT_MAPPER.writeValueAsString(fileMeta);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+
+ currentFileKey = filePrefix() + fileName;
+ etcd.putKeyValue(currentFileKey, jsonValue);
+ etcd.putKeyValue(topicPrefix() + "/current", fileName);
+ LOGGER.info("Created new file [{}] for topic [{}]", fileName, topic);
+ return fullPath;
+ }
+
+ public synchronized String getCurrentFileKey() {
+ return currentFileKey;
+ }
+
+ /**
+ * List all files (for readers).
+ */
+ public List listAllFiles() {
+ List files = etcd.getKeyValuesByPrefix(filePrefix());
+ return files.stream()
+ .map(kv ->
+ {
+ String value = kv.getValue().toString();
+ return extractPath(value);
+ })
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Mark a file as completed (for writer rotation).
+ */
+ public void markFileCompleted(String fileName) {
+ KeyValue kv = etcd.getKeyValue(fileName);
+ if (kv == null) return;
+
+ Map meta = null;
+ try {
+ meta = OBJECT_MAPPER.readValue(kv.getValue().toString(), Map.class);
+ meta.put("completed_at", String.valueOf(System.currentTimeMillis()));
+ meta.put("status", "completed");
+ String jsonValue = OBJECT_MAPPER.writeValueAsString(meta);
+ etcd.putKeyValue(fileName, jsonValue);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+
+ LOGGER.info("Marked file [{}] as completed", fileName);
+ }
+
+ public void cleanData() {
+ etcd.deleteByPrefix(topicPrefix());
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/util/FlushRateLimiter.java b/src/main/java/io/pixelsdb/pixels/sink/util/FlushRateLimiter.java
new file mode 100644
index 0000000..8fa0276
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/util/FlushRateLimiter.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.util;
+
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class FlushRateLimiter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlushRateLimiter.class);
+ // Configuration derived parameters
+ private static final long REFRESH_PERIOD_MS = 10;
+ private static volatile FlushRateLimiter instance;
+ private final Semaphore semaphore;
+ private final boolean enableRateLimiter;
+ private final ScheduledExecutorService scheduler;
+ private final int replenishmentAmount;
+
+ private FlushRateLimiter() {
+ PixelsSinkConfig pixelsSinkConfig = PixelsSinkConfigFactory.getInstance();
+ int sourceRateLimit = pixelsSinkConfig.getSourceRateLimit();
+ this.enableRateLimiter = pixelsSinkConfig.isEnableSourceRateLimit();
+
+ if (sourceRateLimit <= 0 || !enableRateLimiter) {
+ this.semaphore = null;
+ this.replenishmentAmount = 0;
+ this.scheduler = null;
+ return;
+ }
+
+ double replenishmentPerMillisecond = (double) sourceRateLimit / 1000.0;
+ this.replenishmentAmount = (int) Math.max(1, Math.round(replenishmentPerMillisecond * REFRESH_PERIOD_MS));
+
+ this.semaphore = new Semaphore(this.replenishmentAmount);
+
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(r ->
+ {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("Rate-Limiter-Replenish");
+ t.setDaemon(true);
+ return t;
+ });
+
+ this.scheduler.scheduleAtFixedRate(
+ this::replenishTokens,
+ REFRESH_PERIOD_MS,
+ REFRESH_PERIOD_MS,
+ TimeUnit.MILLISECONDS
+ );
+
+ LOGGER.info("FlushRateLimiter initialized. Rate: {}/s, Replenishment: {} tokens every {}ms.",
+ sourceRateLimit, this.replenishmentAmount, REFRESH_PERIOD_MS);
+ }
+
+ public static FlushRateLimiter getInstance() {
+ if (instance == null) {
+ synchronized (FlushRateLimiter.class) {
+ if (instance == null) {
+ instance = new FlushRateLimiter();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public static FlushRateLimiter getNewInstance() {
+ return new FlushRateLimiter();
+ }
+
+ private void replenishTokens() {
+ if (semaphore != null) {
+ semaphore.release(replenishmentAmount);
+ }
+ }
+
+ public void acquire(int num) {
+ if (enableRateLimiter && semaphore != null) {
+ try {
+ semaphore.acquire(num);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("FlushRateLimiter acquire interrupted.", e);
+ }
+ }
+ }
+
+ public void shutdown() {
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ LOGGER.info("FlushRateLimiter scheduler stopped.");
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/pixelsdb/pixels/sink/util/LatencySimulator.java b/src/main/java/io/pixelsdb/pixels/sink/util/LatencySimulator.java
index 1a4913d..a89d571 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/util/LatencySimulator.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/util/LatencySimulator.java
@@ -1,18 +1,21 @@
/*
* Copyright 2025 PixelsDB.
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * This file is part of Pixels.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
*
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
*/
package io.pixelsdb.pixels.sink.util;
diff --git a/src/main/java/io/pixelsdb/pixels/sink/util/MetricsFacade.java b/src/main/java/io/pixelsdb/pixels/sink/util/MetricsFacade.java
new file mode 100644
index 0000000..9001433
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/util/MetricsFacade.java
@@ -0,0 +1,567 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.util;
+
+import com.google.protobuf.ByteString;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.freshness.FreshnessHistory;
+import io.pixelsdb.pixels.sink.freshness.OneSecondAverage;
+import io.pixelsdb.pixels.sink.writer.retina.SinkContextManager;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import io.prometheus.client.Summary;
+import lombok.Setter;
+import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MetricsFacade {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MetricsFacade.class);
+ private static final PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
+ private static MetricsFacade instance;
+ private final boolean enabled;
+ private final Counter tableChangeCounter;
+ private final Counter rowChangeCounter;
+ private final Counter transactionCounter;
+ private final Counter serdRowRecordCounter;
+ private final Counter serdTxRecordCounter;
+ private final Summary processingLatency;
+ private final Counter rawDataThroughputCounter;
+ private final Counter debeziumEventCounter;
+ private final Counter rowEventCounter;
+ private final Summary transServiceLatency;
+ private final Summary indexServiceLatency;
+ private final Summary retinaServiceLatency;
+ private final Summary writerLatency;
+ private final Summary totalLatency;
+ private final Summary tableFreshness;
+ private final Histogram transactionRowCountHistogram;
+ private final Histogram primaryKeyUpdateDistribution;
+
+ private final boolean monitorReportEnabled;
+ private final int monitorReportInterval;
+ private final int freshnessReportInterval;
+
+ private final SynchronizedDescriptiveStatistics freshness;
+ private final SynchronizedDescriptiveStatistics rowChangeSpeed;
+ private final OneSecondAverage freshnessAvg;
+ private final Boolean freshnessVerbose;
+ private final FreshnessHistory freshnessHistory;
+
+ private final String monitorReportPath;
+ private final String freshnessReportPath;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final Thread reportThread;
+ private final Thread freshnessThread;
+ @Setter
+ private SinkContextManager sinkContextManager;
+ private long lastRowChangeCount = 0;
+ private long lastTransactionCount = 0;
+ private long lastDebeziumCount = 0;
+ private long lastSerdRowRecordCount = 0;
+ private long lastSerdTxRecordCount = 0;
+
+ private MetricsFacade(boolean enabled) {
+ this.enabled = enabled;
+ this.debeziumEventCounter = Counter.build()
+ .name("debezium_event_total")
+ .help("Debezium Event Total")
+ .register();
+
+ this.rowEventCounter = Counter.build()
+ .name("row_event_total")
+ .help("Debezium Row Event Total")
+ .register();
+
+ this.serdRowRecordCounter = Counter.build()
+ .name("serd_row_record")
+ .help("Serialized Row Record Total")
+ .register();
+
+ this.serdTxRecordCounter = Counter.build()
+ .name("serd_tx_record")
+ .help("Serialized Transaction Record Total")
+ .register();
+
+ this.tableChangeCounter = Counter.build()
+ .name("sink_table_changes_total")
+ .help("Total processed table changes")
+ .labelNames("table")
+ .register();
+
+ this.rowChangeCounter = Counter.build()
+ .name("sink_row_changes_total")
+ .help("Total processed row changes")
+ .labelNames("table", "operation")
+ .register();
+
+ this.transactionCounter = Counter.build()
+ .name("sink_transactions_total")
+ .help("Total committed transactions")
+ .register();
+
+ this.processingLatency = Summary.build()
+ .name("sink_processing_latency_seconds")
+ .help("End-to-end processing latency")
+ .quantile(0.5, 0.05)
+ .quantile(0.75, 0.01)
+ .quantile(0.95, 0.005)
+ .quantile(0.99, 0.001)
+ .register();
+
+ this.rawDataThroughputCounter = Counter.build()
+ .name("sink_data_throughput_counter")
+ .help("Data throughput")
+ .register();
+
+ this.transServiceLatency = Summary.build()
+ .name("trans_service_latency_seconds")
+ .help("End-to-end processing latency")
+ .quantile(0.5, 0.05)
+ .quantile(0.75, 0.01)
+ .quantile(0.95, 0.005)
+ .quantile(0.99, 0.001)
+ .register();
+
+ this.indexServiceLatency = Summary.build()
+ .name("index_service_latency_seconds")
+ .help("End-to-end processing latency")
+ .quantile(0.5, 0.05)
+ .quantile(0.75, 0.01)
+ .quantile(0.95, 0.005)
+ .quantile(0.99, 0.001)
+ .register();
+
+ this.retinaServiceLatency = Summary.build()
+ .name("retina_service_latency_seconds")
+ .help("End-to-end processing latency")
+ .quantile(0.5, 0.05)
+ .quantile(0.75, 0.01)
+ .quantile(0.95, 0.005)
+ .quantile(0.99, 0.001)
+ .register();
+
+ this.writerLatency = Summary.build()
+ .name("write_latency_seconds")
+ .help("Write latency")
+ .labelNames("table")
+ .quantile(0.5, 0.05)
+ .quantile(0.75, 0.01)
+ .quantile(0.95, 0.005)
+ .quantile(0.99, 0.001)
+ .register();
+
+ this.totalLatency = Summary.build()
+ .name("total_latency_seconds")
+ .help("total latency to ETL a row change event")
+ .labelNames("table", "operation")
+ .quantile(0.5, 0.05)
+ .quantile(0.75, 0.01)
+ .quantile(0.95, 0.005)
+ .quantile(0.99, 0.001)
+ .register();
+
+ this.tableFreshness = Summary.build()
+ .name("data_freshness_latency_ms")
+ .help("Data freshness latency in milliseconds per table")
+ .labelNames("table")
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.001)
+ .register();
+
+ this.transactionRowCountHistogram = Histogram.build()
+ .name("transaction_row_count_histogram")
+ .help("Distribution of row counts within a single transaction")
+ .buckets(1, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 200)
+ .register();
+ this.primaryKeyUpdateDistribution = Histogram.build()
+ .name("primary_key_update_distribution")
+ .help("Distribution of primary key updates by logical bucket/hash for hot spot analysis")
+ .labelNames("table") // Table name tag
+ .buckets(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // 10 buckets for distribution
+ .register();
+ this.freshness = new SynchronizedDescriptiveStatistics();
+ this.rowChangeSpeed = new SynchronizedDescriptiveStatistics();
+
+ freshnessReportInterval = config.getFreshnessReportInterval();
+ freshnessReportPath = config.getMonitorFreshnessReportFile();
+ freshnessAvg = new OneSecondAverage(freshnessReportInterval);
+ freshnessVerbose = config.isSinkMonitorFreshnessVerbose();
+ if (freshnessVerbose) {
+ freshnessHistory = new FreshnessHistory();
+ } else {
+ freshnessHistory = null;
+ }
+
+
+ monitorReportEnabled = config.isMonitorReportEnabled();
+ monitorReportInterval = config.getMonitorReportInterval();
+ monitorReportPath = config.getMonitorReportFile();
+ if (monitorReportEnabled) {
+ running.set(true);
+ reportThread = new Thread(this::run, "Metrics Report Thread");
+ LOGGER.info("Metrics Report Thread Started");
+ reportThread.start();
+ freshnessThread = new Thread(this::runFreshness, "Freshness Thread");
+ freshnessThread.start();
+ } else {
+ reportThread = null;
+ freshnessThread = null;
+ }
+ }
+
+ private static synchronized void initialize() {
+ if (instance == null) {
+ instance = new MetricsFacade(config.isMonitorEnabled());
+ LOGGER.info("Init Metrics Facade");
+ }
+ }
+
+ public static MetricsFacade getInstance() {
+ if (instance == null) {
+ initialize();
+ }
+ return instance;
+ }
+
+ public void stop() {
+ running.set(false);
+ if (reportThread != null) {
+ reportThread.interrupt();
+ }
+
+ if (freshnessThread != null) {
+ freshnessThread.interrupt();
+ }
+ LOGGER.info("Monitor report thread stopped.");
+ }
+
+ public void recordDebeziumEvent() {
+ if (enabled && debeziumEventCounter != null) {
+ debeziumEventCounter.inc();
+ }
+ }
+
+ public void recordRowChange(String table, SinkProto.OperationType operation) {
+ recordRowChange(table, operation, 1);
+ }
+
+ public void recordRowChange(String table, SinkProto.OperationType operation, int rows) {
+ if (enabled && rowChangeCounter != null) {
+ tableChangeCounter.labels(table).inc(rows);
+ rowChangeCounter.labels(table, operation.toString()).inc(rows);
+ }
+ }
+
+ public void recordSerdRowChange() {
+ recordSerdRowChange(1);
+ }
+
+ public void recordSerdRowChange(int i) {
+ if (enabled && serdRowRecordCounter != null) {
+ serdRowRecordCounter.inc(i);
+ }
+ }
+
+
+ public void recordSerdTxChange() {
+ recordSerdTxChange(1);
+ }
+
+ public void recordSerdTxChange(int i) {
+ if (enabled && serdTxRecordCounter != null) {
+ serdTxRecordCounter.inc(i);
+ }
+ }
+
+
+ public void recordTransaction(int i) {
+ if (enabled && transactionCounter != null) {
+ transactionCounter.inc(i);
+ }
+ }
+
+ public void recordTransaction() {
+ recordTransaction(1);
+ }
+
+ public Summary.Timer startProcessLatencyTimer() {
+ return enabled ? processingLatency.startTimer() : null;
+ }
+
+ public Summary.Timer startIndexLatencyTimer() {
+ return enabled ? indexServiceLatency.startTimer() : null;
+ }
+
+ public Summary.Timer startTransLatencyTimer() {
+ return enabled ? transServiceLatency.startTimer() : null;
+ }
+
+ public Summary.Timer startRetinaLatencyTimer() {
+ return enabled ? retinaServiceLatency.startTimer() : null;
+ }
+
+ public Summary.Timer startWriteLatencyTimer(String tableName) {
+ return enabled ? writerLatency.labels(tableName).startTimer() : null;
+ }
+
+ public void addRawData(double data) {
+ rawDataThroughputCounter.inc(data);
+ }
+
+ public void recordTotalLatency(RowChangeEvent event) {
+ if (event.getTimeStamp() != 0) {
+ long recordLatency = System.currentTimeMillis() - event.getTimeStamp();
+ totalLatency.labels(event.getFullTableName(), event.getOp().toString()).observe(recordLatency);
+ }
+ }
+
+ public void recordRowEvent() {
+ recordRowEvent(1);
+ }
+
+ public void recordRowEvent(int i) {
+ if (enabled && rowEventCounter != null) {
+ rowEventCounter.inc(i);
+ }
+ }
+
+ public int getRecordRowEvent() {
+ return (int) rowEventCounter.get();
+ }
+
+ public int getTransactionEvent() {
+ return (int) transactionCounter.get();
+ }
+
+ public void recordTableFreshness(String table, double freshnessMill) {
+ if (!enabled) {
+ return;
+ }
+
+ tableFreshness.labels(table).observe(freshnessMill);
+ recordFreshness(freshnessMill);
+ }
+
+ public void recordFreshness(double freshnessMill) {
+ if (enabled && freshness != null) {
+ freshness.addValue(freshnessMill);
+ }
+
+ if (freshnessAvg != null) {
+ freshnessAvg.record(freshnessMill);
+ }
+
+ if (freshnessVerbose) {
+ freshnessHistory.record(freshnessMill);
+ }
+ }
+
+ public void recordPrimaryKeyUpdateDistribution(String table, ByteString pkValue) {
+ if (!enabled || primaryKeyUpdateDistribution == null) {
+ return;
+ }
+ if (pkValue == null || pkValue.isEmpty()) {
+ LOGGER.debug("Skipping PK distribution recording: pkValue is null or empty for table {}.", table);
+ return;
+ }
+
+ long numericPK;
+ int length = pkValue.size();
+
+ try {
+ ByteBuffer buffer = pkValue.asReadOnlyByteBuffer();
+
+ if (length == Integer.BYTES) {
+ numericPK = Integer.toUnsignedLong(buffer.getInt());
+ } else if (length == Long.BYTES) {
+ numericPK = buffer.getLong();
+ } else {
+ LOGGER.warn("Unsupported PK ByteString length {} for table {}. Expected 4 or 8.", length, table);
+ return;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to convert ByteString to numeric type for table {}: {}", table, e.getMessage());
+ return;
+ }
+ int hash = Long.hashCode(numericPK);
+ double bucketIndex = (Math.abs(hash % 10)) + 1;
+
+ // 3. 记录到 Histogram
+ primaryKeyUpdateDistribution.labels(table).observe(bucketIndex);
+
+ LOGGER.debug("Table {}: PK {} mapped to bucket index {}", table, numericPK, bucketIndex);
+ }
+
+ public void recordTransactionRowCount(int rowCount) {
+ if (enabled && transactionRowCountHistogram != null) {
+ // Use observe() to add the value to the Histogram's configured buckets.
+ transactionRowCountHistogram.observe(rowCount);
+ }
+ }
+
+ public void run() {
+ while (running.get()) {
+ try {
+ Thread.sleep(monitorReportInterval);
+ logPerformance();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Throwable t) {
+ LOGGER.warn("Error while reporting performance.", t);
+ }
+ }
+ }
+
+ public void runFreshness() {
+ try {
+ Thread.sleep(monitorReportInterval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ while (running.get()) {
+ try {
+ Thread.sleep(freshnessReportInterval);
+ try (FileWriter fw = new FileWriter(freshnessReportPath, true)) {
+ if (freshnessVerbose) {
+ List detailedRecords = freshnessHistory.pollAll();
+ if (!detailedRecords.isEmpty()) {
+ for (FreshnessHistory.Record record : detailedRecords) {
+ fw.write(record.toString() + "\n");
+ }
+ fw.flush();
+ }
+ } else {
+ long now = System.currentTimeMillis();
+ double avg = freshnessAvg.getWindowAverage();
+ if (Double.isNaN(avg)) {
+ continue;
+ }
+ fw.write(now + "," + avg + "\n");
+ fw.flush();
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to write perf metrics: " + e.getMessage());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Throwable t) {
+ LOGGER.warn("Error while reporting performance.", t);
+ }
+ }
+ }
+
+ public void logPerformance() {
+ long currentRows = (long) rowEventCounter.get();
+ long currentTxns = (long) transactionCounter.get();
+ long currentDebezium = (long) debeziumEventCounter.get();
+ long currentSerdRows = (long) serdRowRecordCounter.get();
+ long currentSerdTxs = (long) serdTxRecordCounter.get();
+
+ long deltaRows = currentRows - lastRowChangeCount;
+ long deltaTxns = currentTxns - lastTransactionCount;
+ long deltaDebezium = currentDebezium - lastDebeziumCount;
+ long deltaSerdRows = currentSerdRows - lastSerdRowRecordCount;
+ long deltaSerdTxs = currentSerdTxs - lastSerdTxRecordCount;
+
+ lastRowChangeCount = currentRows;
+ lastTransactionCount = currentTxns;
+ lastDebeziumCount = currentDebezium;
+ lastSerdRowRecordCount = currentSerdRows;
+ lastSerdTxRecordCount = currentSerdTxs;
+
+ double seconds = monitorReportInterval / 1000.0;
+
+ double rowOips = deltaRows / seconds;
+ double txnOips = deltaTxns / seconds;
+ double dbOips = deltaDebezium / seconds;
+ double serdRowsOips = deltaSerdRows / seconds;
+ double serdTxsOips = deltaSerdTxs / seconds;
+
+ rowChangeSpeed.addValue(rowOips);
+
+ LOGGER.info(
+ "Performance report: +{} rows (+{}/s), +{} transactions (+{}/s), +{} debezium (+{}/s)" +
+ ", +{} serdRows (+{}/s), +{} serdTxs (+{}/s)" +
+ " in {} ms\t activeTxNum: {} min Tx: {}",
+ deltaRows, String.format("%.2f", rowOips),
+ deltaTxns, String.format("%.2f", txnOips),
+ deltaDebezium, String.format("%.2f", dbOips),
+ deltaSerdRows, String.format("%.2f", serdRowsOips),
+ deltaSerdTxs, String.format("%.2f", serdTxsOips),
+ monitorReportInterval,
+ sinkContextManager.getActiveTxnsNum(),
+ sinkContextManager.findMinActiveTx()
+ );
+
+ LOGGER.info(
+ String.format(
+ "Row Per/Second Summary: Max=%.2f, Min=%.2f, Mean=%.2f, P10=%.2f, P50=%.2f, P90=%.2f, P95=%.2f, P99=%.2f",
+ rowChangeSpeed.getMax(),
+ rowChangeSpeed.getMin(),
+ rowChangeSpeed.getMean(),
+ rowChangeSpeed.getPercentile(10),
+ rowChangeSpeed.getPercentile(50),
+ rowChangeSpeed.getPercentile(90),
+ rowChangeSpeed.getPercentile(95),
+ rowChangeSpeed.getPercentile(99)
+ )
+ );
+
+ LOGGER.info(
+ String.format(
+ "Freshness Report: Count=%d, Max=%.2f, Min=%.2f, Mean=%.2f, P50=%.2f, P90=%.2f, P95=%.2f, P99=%.2f",
+ freshness.getN(),
+ freshness.getMax(),
+ freshness.getMin(),
+ freshness.getMean(),
+ freshness.getPercentile(50),
+ freshness.getPercentile(90),
+ freshness.getPercentile(95),
+ freshness.getPercentile(99)
+ )
+ );
+
+ String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
+ // Append to CSV for plotting
+ try (FileWriter fw = new FileWriter(monitorReportPath, true)) {
+ fw.write(String.format("%s,%.2f,%.2f,%.2f,%.2f,%.2f%n",
+ time, rowOips, txnOips, dbOips, serdRowsOips, serdTxsOips));
+ } catch (IOException e) {
+ LOGGER.warn("Failed to write perf metrics: " + e.getMessage());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/pixelsdb/pixels/sink/util/TableCounters.java b/src/main/java/io/pixelsdb/pixels/sink/util/TableCounters.java
new file mode 100644
index 0000000..2adac60
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/util/TableCounters.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.util;
+
+/**
+ * Inner class to hold and manage per-table transaction row counts.
+ */
+public class TableCounters {
+ private final int totalCount; // The expected total number of rows
+ // currentCount is volatile for visibility across threads, as it's incremented during writeRow.
+ private volatile int currentCount = 0;
+
+ public TableCounters(int totalCount) {
+ this.totalCount = totalCount;
+ }
+
+ public void increment() {
+ currentCount++;
+ }
+
+ public boolean isComplete() {
+ // Checks if the processed count meets or exceeds the expected total count.
+ return currentCount >= totalCount;
+ }
+
+ public int getCurrentCount() {
+ return currentCount;
+ }
+
+ public int getTotalCount() {
+ return totalCount;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/AbstractBucketedWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/AbstractBucketedWriter.java
new file mode 100644
index 0000000..6a49495
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/AbstractBucketedWriter.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer;
+
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+
+public abstract class AbstractBucketedWriter {
+ public void writeRowChangeEvent(RowChangeEvent event, C context) throws SinkException {
+ if (event == null) {
+ return;
+ }
+
+ event.initIndexKey();
+
+ switch (event.getOp()) {
+ case UPDATE -> {
+ if (!event.isPkChanged()) {
+ emitBefore(event, context);
+ } else {
+ emitPkChangedUpdate(event, context);
+ }
+ }
+
+ case DELETE -> emitBefore(event, context);
+
+ case INSERT, SNAPSHOT -> emitAfter(event, context);
+
+ case UNRECOGNIZED -> {
+ return;
+ }
+ }
+ }
+
+ /* ================= hook points ================= */
+
+ protected void emitBefore(RowChangeEvent event, C context) {
+ int bucketId = event.getBeforeBucketFromIndex();
+ emit(event, bucketId, context);
+ }
+
+ protected void emitAfter(RowChangeEvent event, C context) {
+ int bucketId = event.getAfterBucketFromIndex();
+ emit(event, bucketId, context);
+ }
+
+ protected void emitPkChangedUpdate(RowChangeEvent event, C context) throws SinkException {
+ // DELETE (before)
+ RowChangeEvent deleteEvent = buildDeleteEvent(event);
+ emitBefore(deleteEvent, context);
+
+ // INSERT (after)
+ RowChangeEvent insertEvent = buildInsertEvent(event);
+ emitAfter(insertEvent, context);
+ }
+
+ protected abstract void emit(RowChangeEvent event, int bucketId, C context);
+
+ /* ================= helpers ================= */
+
+ private RowChangeEvent buildDeleteEvent(RowChangeEvent event) throws SinkException {
+ SinkProto.RowRecord.Builder builder =
+ event.getRowRecord().toBuilder()
+ .clearAfter()
+ .setOp(SinkProto.OperationType.DELETE);
+
+ RowChangeEvent deleteEvent =
+ new RowChangeEvent(builder.build(), event.getSchema());
+ deleteEvent.initIndexKey();
+ return deleteEvent;
+ }
+
+ private RowChangeEvent buildInsertEvent(RowChangeEvent event) throws SinkException {
+ SinkProto.RowRecord.Builder builder =
+ event.getRowRecord().toBuilder()
+ .clearBefore()
+ .setOp(SinkProto.OperationType.INSERT);
+
+ RowChangeEvent insertEvent =
+ new RowChangeEvent(builder.build(), event.getSchema());
+ insertEvent.initIndexKey();
+ return insertEvent;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/NoneWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/NoneWriter.java
new file mode 100644
index 0000000..ccf5b60
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/NoneWriter.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer;
+
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+import io.pixelsdb.pixels.sink.util.MetricsFacade;
+import io.pixelsdb.pixels.sink.writer.retina.SinkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * NoneWriter implementation used for testing and metrics collection.
+ * It tracks transaction completeness based on row counts provided in the TXEND metadata,
+ * ensuring robust handling of out-of-order and concurrent TX BEGIN, TX END, and ROWChange events.
+ */
+public class NoneWriter implements PixelsSinkWriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(NoneWriter.class);
+
+ private final MetricsFacade metricsFacade = MetricsFacade.getInstance();
+
+ /**
+ * Data structure to track transaction progress:
+ * Map
+ */
+ private final Map transTracker = new ConcurrentHashMap<>();
+
+ /**
+ * Checks if all tables within a transaction have reached their expected row count.
+ * If complete, the transaction is removed from the tracker and final metrics are recorded.
+ *
+ * @param transId The ID of the transaction to check.
+ */
+ private void checkAndCleanupTransaction(String transId) {
+ TransactionContext context = transTracker.get(transId);
+
+ if (context == null) {
+ return;
+ }
+
+ boolean allComplete = context.sinkContext.isCompleted();
+ int actualProcessedRows = context.sinkContext.getProcessedRowsNum();
+
+ if (allComplete) {
+ // All rows expected have been processed. Remove and record metrics.
+ transTracker.remove(transId);
+ LOGGER.trace("Transaction {} successfully completed and removed from tracker. Total rows: {}.", transId, actualProcessedRows);
+
+ // Record final transaction metrics only upon completion
+ metricsFacade.recordTransaction();
+ metricsFacade.recordTransactionRowCount(actualProcessedRows);
+ } else {
+ // Not complete, keep tracking
+ LOGGER.debug("Transaction {} is partially complete ({} rows processed). Keeping tracker entry.", transId, actualProcessedRows);
+ }
+ }
+
+ @Override
+ public void flush() {
+ // No-op for NoneWriter
+ }
+
+ // --- Interface Methods ---
+
+ @Override
+ public boolean writeRow(RowChangeEvent rowChangeEvent) {
+ metricsFacade.recordRowEvent();
+ metricsFacade.recordRowChange(rowChangeEvent.getTable(), rowChangeEvent.getOp());
+ try {
+ rowChangeEvent.initIndexKey();
+ metricsFacade.recordPrimaryKeyUpdateDistribution(rowChangeEvent.getTable(), rowChangeEvent.getAfterKey().getKey());
+
+ // Get transaction ID and table name
+ String transId = rowChangeEvent.getTransaction().getId();
+ String fullTable = rowChangeEvent.getFullTableName();
+
+ // 1. Get or create the transaction context
+ TransactionContext context = transTracker.computeIfAbsent(transId, k -> new TransactionContext(transId));
+
+ context.sinkContext.getTableCounterLock().lock();
+ context.incrementEndCount(fullTable);
+ checkAndCleanupTransaction(transId);
+ context.sinkContext.getTableCounterLock().unlock();
+ } catch (SinkException e) {
+ throw new RuntimeException("Error processing row key or metrics.", e);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean writeTrans(SinkProto.TransactionMetadata transactionMetadata) {
+ String transId = transactionMetadata.getId();
+
+ if (transactionMetadata.getStatus() == SinkProto.TransactionStatus.BEGIN) {
+ // 1. BEGIN: Create context if not exists (in case ROWChange arrived first).
+ transTracker.computeIfAbsent(transId, k -> new TransactionContext(transId));
+ LOGGER.debug("Transaction {} BEGIN received.", transId);
+
+ } else if (transactionMetadata.getStatus() == SinkProto.TransactionStatus.END) {
+ // 2. END: Finalize tracker state, merge pre-counts, and trigger cleanup.
+
+ // Get existing context or create a new one (in case BEGIN was missed).
+ TransactionContext context = transTracker.computeIfAbsent(transId, k -> new TransactionContext(transId));
+ context.sinkContext.getTableCounterLock().lock();
+ context.sinkContext.setEndTx(transactionMetadata);
+ checkAndCleanupTransaction(transId);
+ context.sinkContext.getTableCounterLock().unlock();
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op for NoneWriter
+ LOGGER.info("Remaining unfinished transactions on close: {}", transTracker.size());
+
+ // Log details of transactions that were never completed
+ if (!transTracker.isEmpty()) {
+ transTracker.forEach((transId, context) ->
+ {
+ LOGGER.warn("Unfinished transaction {}", transId);
+ });
+ }
+ }
+
+ /**
+ * Helper class to manage the state of a single transaction, decoupling the row accumulation
+ * from the final TableCounters initialization (which requires total counts from TX END).
+ */
+ public static class TransactionContext {
+ // Key: Full Table Name, Value: Row Count
+ private SinkContext sinkContext = null;
+
+
+ TransactionContext(String txId) {
+ this.sinkContext = new SinkContext(txId);
+ }
+
+
+ /**
+ * @param table Full table name
+ */
+ public void incrementEndCount(String table) {
+ sinkContext.updateCounter(table, 1);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/PixelsSinkMode.java b/src/main/java/io/pixelsdb/pixels/sink/writer/PixelsSinkMode.java
new file mode 100644
index 0000000..42bea45
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/PixelsSinkMode.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer;
+
+
+public enum PixelsSinkMode {
+ CSV,
+ RETINA,
+ PROTO,
+ FLINK,
+ NONE;
+
+ public static PixelsSinkMode fromValue(String value) {
+ for (PixelsSinkMode mode : values()) {
+ if (mode.name().equalsIgnoreCase(value)) {
+ return mode;
+ }
+ }
+ throw new RuntimeException(String.format("Can't convert %s to writer type", value));
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/PixelsSinkWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/PixelsSinkWriter.java
new file mode 100644
index 0000000..d933a94
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/PixelsSinkWriter.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer;
+
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+
+import java.io.Closeable;
+
+public interface PixelsSinkWriter extends Closeable {
+ void flush();
+
+ boolean writeRow(RowChangeEvent rowChangeEvent);
+
+ boolean writeTrans(SinkProto.TransactionMetadata transactionMetadata);
+
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/PixelsSinkWriterFactory.java b/src/main/java/io/pixelsdb/pixels/sink/writer/PixelsSinkWriterFactory.java
new file mode 100644
index 0000000..dd884b5
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/PixelsSinkWriterFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer;
+
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.writer.csv.CsvWriter;
+import io.pixelsdb.pixels.sink.writer.flink.FlinkPollingWriter;
+import io.pixelsdb.pixels.sink.writer.proto.ProtoWriter;
+import io.pixelsdb.pixels.sink.writer.retina.RetinaWriter;
+
+import java.io.IOException;
+
+public class PixelsSinkWriterFactory {
+ private static final PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
+
+ private static volatile PixelsSinkWriter writer = null;
+
+
+ static public PixelsSinkWriter getWriter() {
+ if (writer == null) {
+ synchronized (PixelsSinkWriterFactory.class) {
+ if (writer == null) {
+ try {
+ switch (config.getPixelsSinkMode()) {
+ case CSV:
+ writer = new CsvWriter();
+ break;
+ case RETINA:
+ writer = new RetinaWriter();
+ break;
+ case PROTO:
+ writer = new ProtoWriter();
+ break;
+ case FLINK:
+ writer = new FlinkPollingWriter();
+ break;
+ case NONE:
+ writer = new NoneWriter();
+ break;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Can't create writer", e);
+ }
+ }
+ }
+ }
+ return writer;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/sink/CsvWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/csv/CsvWriter.java
similarity index 82%
rename from src/main/java/io/pixelsdb/pixels/sink/sink/CsvWriter.java
rename to src/main/java/io/pixelsdb/pixels/sink/writer/csv/CsvWriter.java
index 536bed8..f3baf07 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/sink/CsvWriter.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/csv/CsvWriter.java
@@ -1,20 +1,24 @@
/*
* Copyright 2025 PixelsDB.
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * This file is part of Pixels.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
*
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
*/
-package io.pixelsdb.pixels.sink.sink;
+
+package io.pixelsdb.pixels.sink.writer.csv;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.pixelsdb.pixels.sink.SinkProto;
@@ -22,6 +26,8 @@
import io.pixelsdb.pixels.sink.config.PixelsSinkDefaultConfig;
import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.writer.PixelsSinkMode;
+import io.pixelsdb.pixels.sink.writer.PixelsSinkWriter;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,10 +50,9 @@
public class CsvWriter implements PixelsSinkWriter {
private static final Logger log = LoggerFactory.getLogger(CsvWriter.class);
- private Long recordCnt = 0L;
-
@Getter
private static final PixelsSinkMode pixelsSinkMode = PixelsSinkMode.CSV;
+ private static final PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
private final ReentrantLock lock = new ReentrantLock();
private final ConcurrentMap tableWriters = new ConcurrentHashMap<>();
private final ScheduledExecutorService flushScheduler;
@@ -57,9 +62,8 @@ public class CsvWriter implements PixelsSinkWriter {
private final ReentrantLock globalLock = new ReentrantLock();
private final ReentrantLock writeLock = new ReentrantLock(true);
private final AtomicInteger writeCounter = new AtomicInteger(0);
-
- private static final PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
private final String CSV_DELIMITER = "|";
+ private final Long recordCnt = 0L;
public CsvWriter() throws IOException {
this.databaseName = config.getCaptureDatabase();
@@ -94,7 +98,7 @@ public void flush() {
}
@Override
- public boolean write(RowChangeEvent event) {
+ public boolean writeRow(RowChangeEvent event) {
final String tableName = event.getTable();
if (event.getOp() == SinkProto.OperationType.DELETE) {
return true;
@@ -123,9 +127,16 @@ public boolean write(RowChangeEvent event) {
}
}
+ @Override
+ public boolean writeTrans(SinkProto.TransactionMetadata transactionMetadata) {
+ // TODO(AntiO2): Write Trans info
+ return false;
+ }
+
private FileChannel getOrCreateChannel(RowChangeEvent event) throws IOException {
String tableName = event.getTable();
- return tableWriters.computeIfAbsent(tableName, key -> {
+ return tableWriters.computeIfAbsent(tableName, key ->
+ {
try {
Path tablePath = baseOutputPath.resolve(tableName + ".csv");
FileChannel channel = FileChannel.open(tablePath,
@@ -146,7 +157,8 @@ private FileChannel getOrCreateChannel(RowChangeEvent event) throws IOException
private String convertToCSV(Map message) {
return message.values().stream()
- .map(obj -> {
+ .map(obj ->
+ {
if (obj == null) return "";
return obj.toString();
})
@@ -154,6 +166,7 @@ private String convertToCSV(Map message) {
}
private List getHeaderFields(RowChangeEvent event) {
+
return event.getSchema().getFieldNames();
}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/flink/FlinkPollingWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/FlinkPollingWriter.java
new file mode 100644
index 0000000..4fdb4ae
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/FlinkPollingWriter.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.flink;
+
+import io.pixelsdb.pixels.common.metadata.SchemaTableName;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConstants;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.writer.AbstractBucketedWriter;
+import io.pixelsdb.pixels.sink.writer.PixelsSinkWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FlinkPollingWriter is a PixelsSinkWriter implementation designed for a long-polling pattern.
+ * It maintains in-memory blocking queues per table, acting as a buffer between the upstream
+ * data source (producer) and the gRPC service (consumer).
+ * This class is thread-safe and integrates FlushRateLimiter to control ingress traffic.
+ * It also manages the lifecycle of the gRPC server.
+ */
+public class FlinkPollingWriter extends AbstractBucketedWriter implements PixelsSinkWriter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkPollingWriter.class);
+ // Core data structure: A thread-safe map from table name to a thread-safe blocking queue.
+ private final Map> tableQueues;
+ // The gRPC server instance managed by this writer.
+ private final PollingRpcServer pollingRpcServer;
+
+ /**
+ * Constructor for FlinkPollingWriter.
+ * Initializes the data structures, rate limiter, and starts the gRPC server.
+ */
+ public FlinkPollingWriter() {
+ this.tableQueues = new ConcurrentHashMap<>();
+
+ // --- START: New logic to initialize and start the gRPC server ---
+ try {
+ // 1. Get configuration
+ PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
+ int rpcPort = config.getSinkFlinkServerPort();
+ // 2. Create the gRPC service implementation first, passing a reference to this writer.
+ PixelsPollingServiceImpl service = new PixelsPollingServiceImpl(this);
+
+ // 3. Create the PollingRpcServer instance with the service and port.
+ LOGGER.info("Attempting to start gRPC Polling Server on port {}...", rpcPort);
+ this.pollingRpcServer = new PollingRpcServer(service, rpcPort);
+ // 4. Start the server.
+ this.pollingRpcServer.start();
+ LOGGER.info("gRPC Polling Server successfully started and is managed by FlinkPollingWriter.");
+ } catch (IOException e) {
+ // If the server fails to start, the writer cannot function.
+ // Throw a RuntimeException to fail the Flink task initialization.
+ LOGGER.error("Failed to start gRPC server during FlinkPollingWriter initialization.", e);
+ throw new RuntimeException("Could not start gRPC server", e);
+ }
+ // --- END: New logic ---
+ }
+
+ /**
+ * [Producer side] Receives row change events from the data source, applies rate limiting,
+ * converts them, and places them into the in-memory queue.
+ *
+ * @param event The row change event
+ * @return always returns true, unless an interruption occurs.
+ */
+ @Override
+ public boolean writeRow(RowChangeEvent event) {
+ if (event == null) {
+ LOGGER.warn("Received a null RowChangeEvent, skipping.");
+ return false;
+ }
+
+ try {
+ writeRowChangeEvent(event, null);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error(
+ "Failed to process and write row for table: {}",
+ event.getFullTableName(),
+ e
+ );
+ return false;
+ }
+ }
+
+ /**
+ * [Consumer side] The gRPC service calls this method to pull data.
+ * Implements long-polling logic: if the queue is empty, it blocks for a specified timeout.
+ * batchSize acts as an upper limit on the number of records pulled to prevent oversized RPC responses.
+ *
+ * @param tableName The name of the table to pull data from
+ * @param bucketId
+ * @param batchSize The maximum number of records to pull
+ * @param timeout The maximum time to wait for data
+ * @param unit The time unit for the timeout
+ * @return A list of RowRecords, which will be empty if no data is available before the timeout.
+ * @throws InterruptedException if the thread is interrupted while waiting
+ */
+ public List pollRecords(
+ SchemaTableName tableName,
+ int bucketId,
+ int batchSize,
+ long timeout,
+ TimeUnit unit
+ ) throws InterruptedException {
+ List records = new ArrayList<>(batchSize);
+ TableBucketKey key = new TableBucketKey(tableName, bucketId);
+
+ BlockingQueue queue = tableQueues.get(key);
+
+ if (queue == null) {
+ unit.sleep(timeout);
+ return records;
+ }
+
+ SinkProto.RowRecord first = queue.poll(timeout, unit);
+ if (first == null) {
+ return records;
+ }
+
+ records.add(first);
+ queue.drainTo(records, batchSize - 1);
+
+ LOGGER.info(
+ "Polled {} records for table {}, bucket {}",
+ records.size(), tableName, bucketId
+ );
+ return records;
+ }
+
+ /**
+ * This implementation does not involve transactions, so this method is a no-op.
+ */
+ @Override
+ public boolean writeTrans(SinkProto.TransactionMetadata transactionMetadata) {
+ return true;
+ }
+
+ /**
+ * This implementation uses an in-memory queue, so data is immediately available. flush is a no-op.
+ */
+ @Override
+ public void flush() {
+ // No-op
+ }
+
+ /**
+ * Cleans up resources on close. This is where we stop the gRPC server.
+ */
+ @Override
+ public void close() throws IOException {
+ LOGGER.info("Closing FlinkPollingWriter...");
+ if (this.pollingRpcServer != null) {
+ LOGGER.info("Attempting to shut down the gRPC Polling Server...");
+ this.pollingRpcServer.stop();
+ LOGGER.info("gRPC Polling Server shut down.");
+ }
+ LOGGER.info("Clearing all table queues.");
+ tableQueues.clear();
+ LOGGER.info("FlinkPollingWriter closed.");
+ }
+
+ @Override
+ protected void emit(RowChangeEvent event, int bucketId, Void unused) {
+ TableBucketKey key =
+ new TableBucketKey(event.getSchemaTableName(), bucketId);
+
+ BlockingQueue queue =
+ tableQueues.computeIfAbsent(
+ key,
+ k -> new LinkedBlockingQueue<>(PixelsSinkConstants.MAX_QUEUE_SIZE)
+ );
+
+ try {
+ queue.put(event.getRowRecord());
+ LOGGER.debug(
+ "Enqueued row for table {}, bucket {}, queueSize={}",
+ event.getFullTableName(), bucketId, queue.size()
+ );
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted while enqueueing row for " + event.getFullTableName(),
+ e
+ );
+ }
+ }
+
+ record TableBucketKey(SchemaTableName table, int bucketId) {
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java
new file mode 100644
index 0000000..94ed1aa
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PixelsPollingServiceImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.flink;
+
+import io.grpc.stub.StreamObserver;
+import io.pixelsdb.pixels.common.metadata.SchemaTableName;
+import io.pixelsdb.pixels.sink.PixelsPollingServiceGrpc;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.util.FlushRateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class PixelsPollingServiceImpl extends PixelsPollingServiceGrpc.PixelsPollingServiceImplBase {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PixelsPollingServiceImpl.class);
+ private final FlinkPollingWriter writer;
+ private final int pollBatchSize;
+ private final long pollTimeoutMs;
+ private final FlushRateLimiter flushRateLimiter;
+
+ public PixelsPollingServiceImpl(FlinkPollingWriter writer) {
+ if (writer == null) {
+ throw new IllegalArgumentException("FlinkPollingWriter cannot be null.");
+ }
+ this.writer = writer;
+ PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
+ this.pollBatchSize = config.getCommitBatchSize();
+ this.pollTimeoutMs = config.getTimeoutMs();
+ this.flushRateLimiter = FlushRateLimiter.getInstance();
+ LOGGER.info("PixelsPollingServiceImpl initialized. Using 'sink.commit.batch.size' for pollBatchSize ({}) " +
+ "and 'sink.timeout.ms' for pollTimeoutMs ({}).",
+ this.pollBatchSize, this.pollTimeoutMs);
+ }
+
+ @Override
+ public void pollEvents(SinkProto.PollRequest request, StreamObserver responseObserver) {
+ SchemaTableName schemaTableName = new SchemaTableName(request.getSchemaName(), request.getTableName());
+ LOGGER.debug("Received poll request for table '{}'", schemaTableName);
+ List records = new ArrayList<>(pollBatchSize);
+
+ try {
+ for (int bucketId : request.getBucketsList()) {
+ if (records.size() >= pollBatchSize) {
+ break;
+ }
+
+ List polled =
+ writer.pollRecords(
+ schemaTableName,
+ bucketId,
+ pollBatchSize - records.size(),
+ 0,
+ TimeUnit.MILLISECONDS
+ );
+
+ if (polled != null && !polled.isEmpty()) {
+ records.addAll(polled);
+ }
+ }
+
+ SinkProto.PollResponse.Builder responseBuilder = SinkProto.PollResponse.newBuilder();
+ if (records != null && !records.isEmpty()) {
+ responseBuilder.addAllRecords(records);
+ this.flushRateLimiter.acquire(records.size());
+ }
+
+ responseObserver.onNext(responseBuilder.build());
+ responseObserver.onCompleted();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Polling thread was interrupted for table: " + schemaTableName, e);
+ responseObserver.onError(io.grpc.Status.INTERNAL
+ .withDescription("Server polling was interrupted")
+ .asRuntimeException());
+ } catch (Exception e) {
+ LOGGER.error("An unexpected error occurred while polling for table: " + schemaTableName, e);
+ responseObserver.onError(io.grpc.Status.UNKNOWN
+ .withDescription("An unexpected error occurred: " + e.getMessage())
+ .asRuntimeException());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PollingRpcServer.java b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PollingRpcServer.java
new file mode 100644
index 0000000..4e95836
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/flink/PollingRpcServer.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.flink;
+
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class PollingRpcServer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PollingRpcServer.class);
+ private final Server server;
+ private final int port;
+
+ public PollingRpcServer(PixelsPollingServiceImpl serviceImpl, int port) {
+ this.port = port;
+ this.server = ServerBuilder.forPort(port)
+ .addService(serviceImpl) // 将具体的服务实现绑定到服务器
+ .build();
+ }
+
+ public void start() throws IOException {
+ server.start();
+ LOGGER.info("gRPC Polling Server started, listening on port " + port);
+ }
+
+ public void stop() {
+ LOGGER.info("Attempting to shut down gRPC Polling Server...");
+ if (server != null) {
+ try {
+ if (!server.isTerminated()) {
+ server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("gRPC server shutdown interrupted.", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ if (!server.isTerminated()) {
+ LOGGER.warn("gRPC server did not terminate gracefully. Forcing shutdown.");
+ server.shutdownNow();
+ }
+ }
+ }
+ LOGGER.info("gRPC Polling Server shut down.");
+ }
+
+ public void awaitTermination() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/proto/ProtoWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/proto/ProtoWriter.java
new file mode 100644
index 0000000..d451332
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/proto/ProtoWriter.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.proto;
+
+
+import io.pixelsdb.pixels.common.physical.PhysicalWriter;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+import io.pixelsdb.pixels.sink.metadata.TableMetadataRegistry;
+import io.pixelsdb.pixels.sink.util.TableCounters;
+import io.pixelsdb.pixels.sink.writer.PixelsSinkWriter;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @package: io.pixelsdb.pixels.sink.writer
+ * @className: ProtoWriter
+ * @author: AntiO2
+ * @date: 2025/10/5 07:10
+ */
+public class ProtoWriter implements PixelsSinkWriter {
+ private final Logger LOGGER = LoggerFactory.getLogger(ProtoWriter.class);
+ private final RotatingWriterManager writerManager;
+ private final TableMetadataRegistry instance;
+ private final ReentrantLock lock = new ReentrantLock();
+ /**
+ * Data structure to track transaction progress:
+ * Map
+ */
+ private final Map transTracker = new ConcurrentHashMap<>();
+
+
+ public ProtoWriter() throws IOException {
+ PixelsSinkConfig sinkConfig = PixelsSinkConfigFactory.getInstance();
+
+ String dataPath = sinkConfig.getSinkProtoData();
+ this.writerManager = new RotatingWriterManager(dataPath);
+ this.instance = TableMetadataRegistry.Instance();
+ }
+
+ /**
+ * Checks if all tables within a transaction have reached their expected row count.
+ * If complete, the transaction is removed from the tracker and final metrics are recorded.
+ *
+ * @param transId The ID of the transaction to check.
+ */
+ private void checkAndCleanupTransaction(String transId) {
+ TransactionContext context = transTracker.get(transId);
+
+ if (context == null || !context.isEndReceived()) {
+ // Transaction has not received TX END or has been cleaned up already.
+ return;
+ }
+
+ Map tableMap = context.tableCounters;
+ if (tableMap == null || tableMap.isEmpty()) {
+ // Empty transaction with no tables. Clean up immediately.
+ transTracker.remove(transId);
+ LOGGER.info("Transaction {} (empty) successfully completed and removed from tracker.", transId);
+ return;
+ }
+
+ boolean allComplete = true;
+ int actualProcessedRows = 0;
+
+ // Iterate through all tables to check completion status
+ for (Map.Entry entry : tableMap.entrySet()) {
+ TableCounters counters = entry.getValue();
+ if (!counters.isComplete()) {
+ allComplete = false;
+ }
+ }
+
+ if (allComplete) {
+ transTracker.remove(transId);
+ ByteBuffer transInfo = getTransBuffer(context);
+ transInfo.rewind();
+ writeBuffer(transInfo);
+ }
+ }
+
+ @Override
+ public boolean writeTrans(SinkProto.TransactionMetadata transactionMetadata) {
+ try {
+ lock.lock();
+ String transId = transactionMetadata.getId();
+ if (transactionMetadata.getStatus() == SinkProto.TransactionStatus.BEGIN) {
+ // 1. BEGIN: Create context if not exists (in case ROWChange arrived first).
+ TransactionContext transactionContext = transTracker.computeIfAbsent(transId, k -> new TransactionContext());
+ LOGGER.debug("Transaction {} BEGIN received.", transId);
+ transactionContext.txBegin = transactionMetadata;
+ } else if (transactionMetadata.getStatus() == SinkProto.TransactionStatus.END) {
+ // 2. END: Finalize tracker state, merge pre-counts, and trigger cleanup.
+
+ // Get existing context or create a new one (in case BEGIN was missed).
+ TransactionContext context = transTracker.computeIfAbsent(transId, k -> new TransactionContext());
+
+ // --- Initialization Step: Set Total Counts ---
+ Map newTableCounters = new ConcurrentHashMap<>();
+ for (SinkProto.DataCollection dataCollection : transactionMetadata.getDataCollectionsList()) {
+ String fullTable = dataCollection.getDataCollection();
+ // Create official counter with total count
+ newTableCounters.put(fullTable, new TableCounters((int) dataCollection.getEventCount()));
+ }
+
+ // Set the final state (must be volatile write)
+ context.setEndReceived(newTableCounters);
+
+ // --- Merge Step: Apply pre-received rows ---
+ for (Map.Entry preEntry : context.preEndCounts.entrySet()) {
+ String table = preEntry.getKey();
+ int accumulatedCount = preEntry.getValue().get();
+ TableCounters finalCounter = newTableCounters.get(table);
+
+ if (finalCounter != null) {
+ // Apply the accumulated count to the official counter
+ for (int i = 0; i < accumulatedCount; i++) {
+ finalCounter.increment();
+ }
+ } else {
+ LOGGER.warn("Pre-received rows for table {} (count: {}) but table was not in TX END metadata. Discarding accumulated count.", table, accumulatedCount);
+ }
+ }
+ context.txEnd = transactionMetadata;
+
+ // --- Cleanup/Validation Step ---
+ // Trigger cleanup. This will validate if all rows (pre and post END) have satisfied the total counts.
+ checkAndCleanupTransaction(transId);
+ }
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private ByteBuffer getTransBuffer(TransactionContext transactionContext) {
+ int total = 0;
+ byte[] transDataBegin = transactionContext.txBegin.toByteArray();
+ ByteBuffer beginByteBuffer = writeData(-1, transDataBegin);
+ total += beginByteBuffer.limit();
+ beginByteBuffer.rewind();
+ byte[] transDataEnd = transactionContext.txEnd.toByteArray();
+ ByteBuffer endByteBuffer = writeData(-1, transDataEnd);
+ endByteBuffer.rewind();
+ total += endByteBuffer.limit();
+ List rowEvents = new ArrayList<>();
+ for (RowChangeEvent rowChangeEvent : transactionContext.rowChangeEventList) {
+ ByteBuffer byteBuffer = write(rowChangeEvent.getRowRecord());
+ if (byteBuffer == null) {
+ return null;
+ }
+ byteBuffer.rewind();
+ rowEvents.add(byteBuffer);
+ total += byteBuffer.limit();
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(total);
+ buffer.put(beginByteBuffer.array());
+ for (ByteBuffer rowEvent : rowEvents) {
+ buffer.put(rowEvent.array());
+ }
+ buffer.put(endByteBuffer.array());
+ return buffer;
+ }
+
+ public ByteBuffer write(SinkProto.RowRecord rowRecord) {
+ byte[] rowData = rowRecord.toByteArray();
+ String tableName = rowRecord.getSource().getTable();
+ String schemaName = rowRecord.getSource().getDb();
+
+ long tableId;
+ try {
+ tableId = instance.getTableId(schemaName, tableName);
+ } catch (SinkException e) {
+ LOGGER.error("Error while getting schema table id.", e);
+ return null;
+ }
+ {
+ return writeData((int) tableId, rowData);
+ }
+ }
+
+ // key: -1 means transaction, else means table id
+ private ByteBuffer writeData(int key, byte[] data) {
+ ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + Integer.BYTES + data.length).order(ByteOrder.BIG_ENDIAN); // key + value len + data
+ buf.putInt(key).putInt(data.length).put(data);
+ return buf;
+ }
+
+ private synchronized boolean writeBuffer(ByteBuffer buf) {
+ PhysicalWriter writer;
+ try {
+ writer = writerManager.current();
+ writer.prepare(buf.remaining());
+ writer.append(buf.array());
+ } catch (IOException e) {
+ LOGGER.error("Error while writing row record.", e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean writeRow(RowChangeEvent rowChangeEvent) {
+ try {
+ lock.lock();
+ String transId = rowChangeEvent.getTransaction().getId();
+ String fullTable = rowChangeEvent.getFullTableName();
+
+ // 1. Get or create the transaction context
+ TransactionContext context = transTracker.computeIfAbsent(transId, k -> new TransactionContext());
+ context.rowChangeEventList.add(rowChangeEvent);
+ // 2. Check if TX END has arrived
+ if (context.isEndReceived()) {
+ // TX END arrived: Use official TableCounters
+ TableCounters counters = context.tableCounters.get(fullTable);
+ if (counters != null) {
+ // Increment the processed row count for this table
+ counters.increment();
+
+ // If this table completed, check if the entire transaction is complete.
+ if (counters.isComplete()) {
+ checkAndCleanupTransaction(transId);
+ }
+ } else {
+ LOGGER.warn("Row received for TransId {} / Table {} but was not included in TX END metadata.", transId, fullTable);
+ }
+ } else {
+ context.incrementPreEndCount(fullTable);
+ LOGGER.debug("Row received for TransId {} / Table {} before TX END. Accumulating count.", transId, fullTable);
+ }
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.writerManager.close();
+ }
+
+ private static class TransactionContext {
+ // Key: Full Table Name, Value: Row Count
+ private final Map preEndCounts = new ConcurrentHashMap<>();
+ public List rowChangeEventList = new ArrayList<>();
+ public SinkProto.TransactionMetadata txBegin;
+ public SinkProto.TransactionMetadata txEnd;
+ @Getter
+ private volatile boolean endReceived = false;
+ // Key: Full Table Name
+ private Map tableCounters = null;
+
+ public void setEndReceived(Map counters) {
+ this.tableCounters = counters;
+ this.endReceived = true;
+ }
+
+ /**
+ * @param table Full table name
+ */
+ public void incrementPreEndCount(String table) {
+ preEndCounts.computeIfAbsent(table, k -> new AtomicInteger(0)).incrementAndGet();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/proto/RotatingWriterManager.java b/src/main/java/io/pixelsdb/pixels/sink/writer/proto/RotatingWriterManager.java
new file mode 100644
index 0000000..32dd2a9
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/proto/RotatingWriterManager.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.proto;
+
+
+import io.pixelsdb.pixels.common.physical.PhysicalWriter;
+import io.pixelsdb.pixels.common.physical.PhysicalWriterUtil;
+import io.pixelsdb.pixels.common.physical.Storage;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.util.EtcdFileRegistry;
+
+import java.io.IOException;
+
+/**
+ * @package: io.pixelsdb.pixels.sink.writer
+ * @className: RotatingWriterManager
+ * @author: AntiO2
+ * @date: 2025/10/5 07:34
+ */
+public class RotatingWriterManager {
+ private final String baseDir;
+ private final String topic;
+ private final int maxRecordsPerFile;
+ private final Storage.Scheme scheme;
+ private final EtcdFileRegistry registry;
+ private int currentCount = 0;
+ private PhysicalWriter currentWriter;
+ private String currentFileName;
+
+ public RotatingWriterManager(String topic) throws IOException {
+ PixelsSinkConfig sinkConfig = PixelsSinkConfigFactory.getInstance();
+ this.baseDir = sinkConfig.getSinkProtoDir();
+ this.topic = topic;
+ this.maxRecordsPerFile = sinkConfig.getMaxRecordsPerFile();
+ this.registry = new EtcdFileRegistry(topic, baseDir);
+ this.scheme = Storage.Scheme.fromPath(this.baseDir);
+ rotate();
+ }
+
+ private void rotate() throws IOException {
+ if (currentWriter != null) {
+ currentWriter.close();
+ registry.markFileCompleted(registry.getCurrentFileKey());
+ }
+
+ currentFileName = registry.createNewFile();
+ currentWriter = PhysicalWriterUtil.newPhysicalWriter(scheme, currentFileName);
+
+ currentCount = 0;
+ }
+
+ public PhysicalWriter current() throws IOException {
+ if (currentCount >= maxRecordsPerFile) {
+ rotate();
+ }
+ currentCount++;
+ return currentWriter;
+ }
+
+ public void close() throws IOException {
+ if (currentWriter != null) {
+ currentWriter.close();
+ registry.markFileCompleted(registry.getCurrentFileKey());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaBucketDispatcher.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaBucketDispatcher.java
new file mode 100644
index 0000000..ab4bcfe
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaBucketDispatcher.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.writer.AbstractBucketedWriter;
+
+public class RetinaBucketDispatcher extends AbstractBucketedWriter {
+ private final TableWriterProxy tableWriterProxy;
+
+ public RetinaBucketDispatcher() {
+ this.tableWriterProxy = TableWriterProxy.getInstance();
+ }
+
+ @Override
+ protected void emit(RowChangeEvent event, int bucketId, SinkContext ctx) {
+ tableWriterProxy
+ .getTableWriter(event.getTable(), event.getTableId(), bucketId)
+ .write(event, ctx);
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaServiceProxy.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaServiceProxy.java
new file mode 100644
index 0000000..245e309
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaServiceProxy.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+import io.pixelsdb.pixels.common.exception.RetinaException;
+import io.pixelsdb.pixels.common.node.BucketCache;
+import io.pixelsdb.pixels.common.retina.RetinaService;
+import io.pixelsdb.pixels.common.utils.RetinaUtils;
+import io.pixelsdb.pixels.retina.RetinaProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.util.MetricsFacade;
+import io.pixelsdb.pixels.sink.writer.PixelsSinkMode;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class RetinaServiceProxy {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RetinaServiceProxy.class);
+ @Getter
+ private static final PixelsSinkMode pixelsSinkMode = PixelsSinkMode.RETINA;
+ private static final PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
+ // private static final IndexService indexService = IndexService.Instance();
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final RetinaService retinaService;
+ private final MetricsFacade metricsFacade = MetricsFacade.getInstance();
+ private final int vNodeId;
+ private RetinaService.StreamHandler retinaStream = null;
+
+ public RetinaServiceProxy(int bucketId) {
+ if (bucketId == -1) {
+ this.retinaService = RetinaService.Instance();
+ } else {
+ this.retinaService = RetinaUtils.getRetinaServiceFromBucketId(bucketId);
+ }
+
+
+ if (config.getRetinaWriteMode() == RetinaWriteMode.STREAM) {
+ retinaStream = retinaService.startUpdateStream();
+ } else {
+ retinaStream = null;
+ }
+
+ this.vNodeId = BucketCache.getInstance().getRetinaNodeInfoByBucketId(bucketId).getVirtualNodeId();
+ }
+
+ public boolean writeTrans(String schemaName, List tableUpdateData) {
+ if (config.getRetinaWriteMode() == RetinaWriteMode.STUB) {
+ try {
+ retinaService.updateRecord(schemaName, vNodeId, tableUpdateData);
+ } catch (RetinaException e) {
+ e.printStackTrace();
+ return false;
+ }
+ } else {
+ try {
+ retinaStream.updateRecord(schemaName, vNodeId, tableUpdateData);
+ } catch (RetinaException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public CompletableFuture writeBatchAsync
+ (String schemaName, List tableUpdateData) {
+ if (config.getRetinaWriteMode() == RetinaWriteMode.STUB) {
+ try {
+ retinaService.updateRecord(schemaName, vNodeId, tableUpdateData);
+ } catch (RetinaException e) {
+ e.printStackTrace();
+ }
+ return null;
+ } else {
+ try {
+ return retinaStream.updateRecord(schemaName, vNodeId, tableUpdateData);
+ } catch (RetinaException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ }
+
+ public void close() throws IOException {
+ isClosed.compareAndSet(false, true);
+ if (config.getRetinaWriteMode() == RetinaWriteMode.STREAM) {
+ retinaStream.close();
+ }
+ }
+
+ public enum RetinaWriteMode {
+ STREAM,
+ STUB;
+
+ public static RetinaWriteMode fromValue(String value) {
+ for (RetinaWriteMode mode : values()) {
+ if (mode.name().equalsIgnoreCase(value)) {
+ return mode;
+ }
+ }
+ throw new RuntimeException(String.format("Can't convert %s to Retina writer type", value));
+ }
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaWriter.java
new file mode 100644
index 0000000..6ecc33c
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaWriter.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+import io.pixelsdb.pixels.sink.util.FlushRateLimiter;
+import io.pixelsdb.pixels.sink.util.MetricsFacade;
+import io.pixelsdb.pixels.sink.writer.PixelsSinkWriter;
+import org.apache.commons.lang3.RandomUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class RetinaWriter implements PixelsSinkWriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RetinaWriter.class);
+ final ExecutorService dispatchExecutor = Executors.newCachedThreadPool();
+ private final ScheduledExecutorService timeoutScheduler =
+ Executors.newSingleThreadScheduledExecutor();
+ private final FlushRateLimiter flushRateLimiter;
+ private final MetricsFacade metricsFacade = MetricsFacade.getInstance();
+ private final SinkContextManager sinkContextManager;
+ private final TransactionMode transactionMode;
+
+ public RetinaWriter() {
+ PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
+ this.sinkContextManager = SinkContextManager.getInstance();
+ this.flushRateLimiter = FlushRateLimiter.getInstance();
+ this.transactionMode = config.getTransactionMode();
+ }
+
+ @Override
+ public boolean writeTrans(SinkProto.TransactionMetadata txMeta) {
+ if (transactionMode.equals(TransactionMode.RECORD)) {
+ return true;
+ }
+
+ try {
+ if (txMeta.getStatus() == SinkProto.TransactionStatus.BEGIN) {
+ handleTxBegin(txMeta);
+ } else if (txMeta.getStatus() == SinkProto.TransactionStatus.END) {
+ handleTxEnd(txMeta);
+ }
+ } catch (SinkException e) {
+ LOGGER.error(e.getMessage(), e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean writeRow(RowChangeEvent event) {
+ try {
+ if (event == null) {
+ return false;
+ }
+
+ metricsFacade.recordRowChange(event.getTable(), event.getOp());
+ event.startLatencyTimer();
+ if (event.getTransaction() == null || event.getTransaction().getId().isEmpty()) {
+ handleNonTxEvent(event);
+ return true;
+ }
+
+
+ String table = event.getFullTableName();
+
+ long collectionOrder = event.getTransaction().getDataCollectionOrder();
+ long totalOrder = event.getTransaction().getTotalOrder();
+ if (transactionMode.equals(TransactionMode.RECORD)) {
+ sinkContextManager.writeRowChangeEvent(null, event);
+ } else {
+ AtomicBoolean canWrite = new AtomicBoolean(false);
+ SinkContext ctx = sinkContextManager.getActiveTxContext(event, canWrite);
+
+ if (canWrite.get()) {
+ sinkContextManager.writeRowChangeEvent(ctx, event);
+ }
+ }
+ } catch (SinkException e) {
+ LOGGER.error(e.getMessage(), e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private void handleTxBegin(SinkProto.TransactionMetadata txBegin) throws SinkException {
+ // startTrans(txBegin.getId()).get();
+ try {
+ // flushRateLimiter.acquire(1);
+ startTransSync(txBegin.getId());
+ } catch (SinkException e) {
+ throw new SinkException("Failed to start trans", e);
+ }
+
+ }
+
+ private void startTransSync(String sourceTxId) throws SinkException {
+ sinkContextManager.startTransSync(sourceTxId);
+ }
+
+ private void handleTxEnd(SinkProto.TransactionMetadata txEnd) {
+ sinkContextManager.processTxCommit(txEnd);
+ }
+
+ private void handleNonTxEvent(RowChangeEvent event) throws SinkException {
+ // virtual tx
+ String randomId = Long.toString(System.currentTimeMillis()) + RandomUtils.nextLong();
+ writeTrans(buildBeginTransactionMetadata(randomId));
+ sinkContextManager.writeRandomRowChangeEvent(randomId, event);
+ writeTrans(buildEndTransactionMetadata(event.getFullTableName(), randomId));
+ }
+
+ public void shutdown() {
+ dispatchExecutor.shutdown();
+ timeoutScheduler.shutdown();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ private SinkProto.TransactionMetadata buildBeginTransactionMetadata(String id) {
+ SinkProto.TransactionMetadata.Builder builder = SinkProto.TransactionMetadata.newBuilder();
+ builder.setStatus(SinkProto.TransactionStatus.BEGIN)
+ .setId(id);
+ return builder.build();
+ }
+
+ private SinkProto.TransactionMetadata buildEndTransactionMetadata(String fullTableName, String id) {
+ SinkProto.TransactionMetadata.Builder builder = SinkProto.TransactionMetadata.newBuilder();
+ builder.setStatus(SinkProto.TransactionStatus.END)
+ .setId(id)
+ .setEventCount(1L);
+
+ SinkProto.DataCollection.Builder dataCollectionBuilder = SinkProto.DataCollection.newBuilder();
+ dataCollectionBuilder.setDataCollection(fullTableName)
+ .setEventCount(1L);
+ builder.addDataCollections(dataCollectionBuilder);
+ return builder.build();
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContext.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContext.java
new file mode 100644
index 0000000..a420efe
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContext.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+import io.pixelsdb.pixels.common.transaction.TransContext;
+import io.pixelsdb.pixels.core.utils.Pair;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.metadata.TableMetadataRegistry;
+import lombok.Getter;
+import lombok.Setter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SinkContext {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SinkContext.class);
+ @Getter
+ final ReentrantLock lock = new ReentrantLock();
+ @Getter
+ final Condition cond = lock.newCondition(); // this cond is wait for pixels tx
+
+ @Getter
+ final ReentrantLock tableCounterLock = new ReentrantLock();
+ @Getter
+ final Condition tableCounterCond = tableCounterLock.newCondition();
+
+
+ @Getter
+ final String sourceTxId;
+ @Getter
+ final AtomicInteger pendingEvents = new AtomicInteger(0);
+ @Getter
+ final CompletableFuture completionFuture = new CompletableFuture<>();
+ @Getter
+ final TableMetadataRegistry tableMetadataRegistry = TableMetadataRegistry.Instance();
+ private final Queue> recordTimes = new ConcurrentLinkedQueue<>();
+ @Getter
+ Map tableCounters = new ConcurrentHashMap<>();
+ @Getter
+ @Setter
+ Queue orphanEvent = new ConcurrentLinkedQueue<>();
+ @Getter
+ @Setter
+ SinkProto.TransactionMetadata endTx;
+ @Getter
+ private TransContext pixelsTransCtx;
+ @Setter
+ @Getter
+ private boolean failed = false;
+ @Getter
+ @Setter
+ private volatile Long startTime = null;
+
+ public SinkContext(String sourceTxId) {
+ this.sourceTxId = sourceTxId;
+ this.pixelsTransCtx = null;
+ setCurrStartTime();
+ }
+
+ public SinkContext(String sourceTxId, TransContext pixelsTransCtx) {
+ this.sourceTxId = sourceTxId;
+ this.pixelsTransCtx = pixelsTransCtx;
+ setCurrStartTime();
+ }
+
+
+ void updateCounter(String table) {
+ updateCounter(table, 1L);
+ }
+
+ public void setPixelsTransCtx(TransContext pixelsTransCtx) {
+ if (this.pixelsTransCtx != null) {
+ throw new IllegalStateException("Pixels Trans Context Already Set");
+ }
+ this.pixelsTransCtx = pixelsTransCtx;
+ }
+
+ public void recordTimestamp(String table, LocalDateTime timestamp) {
+ recordTimes.offer(new Pair<>(table, timestamp));
+ }
+
+ public void updateCounter(String table, long count) {
+ tableCounterLock.lock();
+ tableCounters.compute(table, (k, v) ->
+ (v == null) ? count : v + count);
+ tableCounterCond.signalAll();
+ tableCounterLock.unlock();
+ }
+
+ public boolean isCompleted() {
+ try {
+ tableCounterLock.lock();
+ if (endTx == null) {
+ return false;
+ }
+ for (SinkProto.DataCollection dataCollection : endTx.getDataCollectionsList()) {
+ Long targetEventCount = tableCounters.get(dataCollection.getDataCollection());
+ long target = targetEventCount == null ? 0 : targetEventCount;
+ LOGGER.debug("TX {}, Table {}, event count {}, tableCursors {}", endTx.getId(), dataCollection.getDataCollection(), dataCollection.getEventCount(), target);
+ if (dataCollection.getEventCount() > target) {
+ return false;
+ }
+ }
+ return true;
+ } finally {
+ tableCounterLock.unlock();
+ }
+
+ }
+
+ public int getProcessedRowsNum() {
+ long num = 0;
+ try {
+ tableCounterLock.lock();
+ for (Long counter : tableCounters.values()) {
+ num += counter;
+ }
+ } finally {
+ tableCounterLock.unlock();
+ }
+ return (int) num;
+ }
+
+ public long getTimestamp() {
+ if (pixelsTransCtx == null) {
+ throw new RuntimeException("PixelsTransCtx is NULL");
+ }
+ return pixelsTransCtx.getTimestamp();
+ }
+
+ public void bufferOrphanedEvent(RowChangeEvent event) {
+ orphanEvent.add(event);
+ }
+
+ public void setCurrStartTime() {
+ if (startTime != null) {
+ return;
+ }
+
+ synchronized (this) {
+ if (startTime == null) {
+ startTime = System.currentTimeMillis();
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContextManager.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContextManager.java
new file mode 100644
index 0000000..13782f7
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContextManager.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+import io.pixelsdb.pixels.common.transaction.TransContext;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+import io.pixelsdb.pixels.sink.freshness.FreshnessClient;
+import io.pixelsdb.pixels.sink.util.BlockingBoundedMap;
+import io.pixelsdb.pixels.sink.util.DataTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SinkContextManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SinkContextManager.class);
+ private static final Logger BUCKET_TRACE_LOGGER = LoggerFactory.getLogger("bucket_trace");
+ private static volatile SinkContextManager instance;
+ private final BlockingBoundedMap activeTxContexts = new BlockingBoundedMap<>(100000);
+ // private final ConcurrentMap activeTxContexts = new ConcurrentHashMap<>(10000);
+ private final TransactionProxy transactionProxy = TransactionProxy.Instance();
+ private final CommitMethod commitMethod;
+ private final String freshnessLevel;
+ private final RetinaBucketDispatcher retinaBucketDispatcher;
+
+ private SinkContextManager() {
+ PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
+ if (config.getCommitMethod().equals("sync")) {
+ this.commitMethod = CommitMethod.Sync;
+ } else {
+ this.commitMethod = CommitMethod.Async;
+ }
+ this.freshnessLevel = config.getSinkMonitorFreshnessLevel();
+ this.retinaBucketDispatcher = new RetinaBucketDispatcher();
+ }
+
+ public static SinkContextManager getInstance() {
+ if (instance == null) {
+ synchronized (SinkContextManager.class) {
+ if (instance == null) {
+ instance = new SinkContextManager();
+ }
+ }
+ }
+ return instance;
+ }
+
+ protected SinkContext getActiveTxContext(RowChangeEvent event, AtomicBoolean canWrite) {
+ String txId = event.getTransaction().getId();
+ return activeTxContexts.compute(txId, (sourceTxId, sinkContext) ->
+ {
+ if (sinkContext == null) {
+ LOGGER.trace("Allocate new tx {}\torder:{}", sourceTxId, event.getTransaction().getTotalOrder());
+ SinkContext newSinkContext = new SinkContext(sourceTxId);
+ newSinkContext.bufferOrphanedEvent(event);
+ return newSinkContext;
+ } else {
+ try {
+ sinkContext.getLock().lock();
+ if (sinkContext.getPixelsTransCtx() == null) {
+ LOGGER.trace("Buffer in tx {}\torder:{}", sourceTxId, event.getTransaction().getTotalOrder());
+ canWrite.set(false);
+ sinkContext.bufferOrphanedEvent(event);
+ return sinkContext;
+ }
+ LOGGER.trace("Ready to write in tx {}\torder:{}", sourceTxId, event.getTransaction().getTotalOrder());
+ canWrite.set(true);
+ return sinkContext;
+ } finally {
+ sinkContext.getCond().signalAll();
+ sinkContext.getLock().unlock();
+ }
+
+ }
+ });
+ }
+
+ protected void startTransSync(String sourceTxId) {
+ LOGGER.trace("Start trans {}", sourceTxId);
+ TransContext pixelsTransContext = transactionProxy.getNewTransContext(sourceTxId);
+ activeTxContexts.compute(
+ sourceTxId,
+ (k, oldCtx) ->
+ {
+ if (oldCtx == null) {
+ LOGGER.trace("Start trans {} without buffered events", sourceTxId);
+ return new SinkContext(sourceTxId, pixelsTransContext);
+ } else {
+ oldCtx.getLock().lock();
+ try {
+ if (oldCtx.getPixelsTransCtx() != null) {
+ LOGGER.warn("Previous tx {} has been released, maybe due to loop process", sourceTxId);
+ oldCtx.tableCounters = new ConcurrentHashMap<>();
+ }
+ LOGGER.trace("Start trans with buffered events {}", sourceTxId);
+ oldCtx.setPixelsTransCtx(pixelsTransContext);
+ handleOrphanEvents(oldCtx);
+ oldCtx.getCond().signalAll();
+ } catch (SinkException e) {
+ throw new RuntimeException(e);
+ } finally {
+ oldCtx.getLock().unlock();
+ }
+ return oldCtx;
+ }
+ }
+ );
+ LOGGER.trace("Begin Tx Sync: {}", sourceTxId);
+ }
+
+ void processTxCommit(SinkProto.TransactionMetadata txEnd) {
+ String txId = txEnd.getId();
+ SinkContext ctx = getSinkContext(txId);
+ if (ctx == null) {
+ throw new RuntimeException("Sink Context is null");
+ }
+
+ try {
+ ctx.tableCounterLock.lock();
+ ctx.setEndTx(txEnd);
+ long startTs = System.currentTimeMillis();
+ if (ctx.isCompleted()) {
+ endTransaction(ctx);
+ }
+ } finally {
+ ctx.tableCounterLock.unlock();
+ }
+ }
+
+ void endTransaction(SinkContext ctx) {
+ String txId = ctx.getSourceTxId();
+ removeSinkContext(txId);
+ boolean failed = ctx.isFailed();
+ if (!failed) {
+ LOGGER.trace("Committed transaction: {}, Pixels Trans is {}", txId, ctx.getPixelsTransCtx().getTransId());
+ switch (commitMethod) {
+ case Sync -> {
+ transactionProxy.commitTransSync(ctx);
+ }
+ case Async -> {
+ transactionProxy.commitTransAsync(ctx);
+ }
+ }
+ if (freshnessLevel.equals("embed")) {
+ for (String table : ctx.getTableCounters().keySet()) {
+ String tableName = DataTransform.extractTableName(table);
+ FreshnessClient.getInstance().addMonitoredTable(tableName);
+ }
+ }
+ } else {
+ LOGGER.info("Abort transaction: {}", txId);
+ CompletableFuture.runAsync(() ->
+ {
+ transactionProxy.rollbackTrans(ctx.getPixelsTransCtx());
+ }).whenComplete((v, ex) ->
+ {
+ if (ex != null) {
+ LOGGER.error("Rollback failed", ex);
+ }
+ });
+ }
+ }
+
+ private void handleOrphanEvents(SinkContext ctx) throws SinkException {
+ Queue buffered = ctx.getOrphanEvent();
+ ctx.setOrphanEvent(null);
+ if (buffered != null) {
+ LOGGER.trace("Handle Orphan Events in {}", ctx.sourceTxId);
+ for (RowChangeEvent event : buffered) {
+ writeRowChangeEvent(ctx, event);
+ }
+ }
+ }
+
+ protected void writeRowChangeEvent(SinkContext ctx, RowChangeEvent event) throws SinkException {
+ if (ctx != null) {
+ event.setTimeStamp(ctx.getTimestamp());
+ }
+ retinaBucketDispatcher.writeRowChangeEvent(event, ctx);
+ }
+
+ protected SinkContext getSinkContext(String txId) {
+ return activeTxContexts.get(txId);
+ }
+
+ protected void removeSinkContext(String txId) {
+ activeTxContexts.remove(txId);
+ }
+
+ protected void writeRandomRowChangeEvent(String randomId, RowChangeEvent event) throws SinkException {
+ writeRowChangeEvent(getSinkContext(randomId), event);
+ }
+
+ public int getActiveTxnsNum() {
+ return activeTxContexts.size();
+ }
+
+ public String findMinActiveTx() {
+ Comparator customComparator = (key1, key2) ->
+ {
+ try {
+ String[] parts1 = key1.split("_");
+ int int1 = Integer.parseInt(parts1[0]);
+ int loopId1 = Integer.parseInt(parts1[1]);
+
+ String[] parts2 = key2.split("_");
+ int int2 = Integer.parseInt(parts2[0]);
+ int loopId2 = Integer.parseInt(parts2[1]);
+
+ int loopIdComparison = Integer.compare(loopId1, loopId2);
+ if (loopIdComparison != 0) {
+ return loopIdComparison;
+ }
+ return Integer.compare(int1, int2);
+ } catch (Exception e) {
+ System.err.println("Key format error for comparison: " + e.getMessage());
+ return 0;
+ }
+ };
+
+ Optional min = activeTxContexts.keySet().stream().min(customComparator);
+ return min.orElse("None");
+ }
+
+ private enum CommitMethod {
+ Sync,
+ Async
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableCrossTxWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableCrossTxWriter.java
new file mode 100644
index 0000000..51c597c
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableCrossTxWriter.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+
+import io.pixelsdb.pixels.retina.RetinaProto;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @package: io.pixelsdb.pixels.sink.writer.retina
+ * @className: TableCrossTxWriter
+ * @author: AntiO2
+ * @date: 2025/9/27 09:36
+ */
+public class TableCrossTxWriter extends TableWriter {
+ protected final ReentrantLock writeLock = new ReentrantLock();
+ @Getter
+ private final Logger LOGGER = LoggerFactory.getLogger(TableCrossTxWriter.class);
+ private final int flushBatchSize;
+
+ public TableCrossTxWriter(String t, int bucketId) {
+ super(t, bucketId);
+ flushBatchSize = config.getFlushBatchSize();
+ }
+
+ /**
+ * Flush any buffered events for the current transaction.
+ */
+ public void flush(List batch) {
+ writeLock.lock();
+ try {
+ String txId = null;
+ List smallBatch = null;
+ List txIds = new ArrayList<>();
+ List fullTableName = new ArrayList<>();
+ List tableUpdateDataBuilderList = new LinkedList<>();
+ List tableUpdateCount = new ArrayList<>();
+ for (RowChangeEvent event : batch) {
+ String currTxId = event.getTransaction().getId();
+ if (!currTxId.equals(txId)) {
+ if (smallBatch != null && !smallBatch.isEmpty()) {
+ RetinaProto.TableUpdateData.Builder builder = buildTableUpdateDataFromBatch(txId, smallBatch);
+ if (builder == null) {
+ continue;
+ }
+ tableUpdateDataBuilderList.add(builder);
+ tableUpdateCount.add(smallBatch.size());
+ }
+ txIds.add(currTxId);
+ fullTableName.add(event.getFullTableName());
+ txId = currTxId;
+ smallBatch = new LinkedList<>();
+ }
+ smallBatch.add(event);
+ }
+
+ if (smallBatch != null) {
+ RetinaProto.TableUpdateData.Builder builder = buildTableUpdateDataFromBatch(txId, smallBatch);
+ if (builder != null) {
+ tableUpdateDataBuilderList.add(buildTableUpdateDataFromBatch(txId, smallBatch));
+ tableUpdateCount.add(smallBatch.size());
+ }
+ }
+
+ // flushRateLimiter.acquire(batch.size());
+ long txStartTime = System.currentTimeMillis();
+
+// if(freshnessLevel.equals("embed"))
+// {
+// long freshness_ts = txStartTime * 1000;
+// FreshnessClient.getInstance().addMonitoredTable(tableName);
+// DataTransform.updateTimeStamp(tableUpdateDataBuilderList, freshness_ts);
+// }
+
+ List tableUpdateData = new ArrayList<>(tableUpdateDataBuilderList.size());
+ for (RetinaProto.TableUpdateData.Builder tableUpdateDataItem : tableUpdateDataBuilderList) {
+ tableUpdateData.add(tableUpdateDataItem.build());
+ }
+ CompletableFuture updateRecordResponseCompletableFuture =
+ delegate.writeBatchAsync(batch.get(0).getSchemaName(), tableUpdateData);
+
+ updateRecordResponseCompletableFuture.thenAccept(
+ resp ->
+ {
+ if (resp.getHeader().getErrorCode() != 0) {
+ failCtxs(txIds);
+ } else {
+ long txEndTime = System.currentTimeMillis();
+ if (freshnessLevel.equals("row")) {
+ metricsFacade.recordFreshness(txEndTime - txStartTime);
+ }
+ updateCtxCounters(txIds, fullTableName, tableUpdateCount);
+ }
+ }
+ );
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void failCtxs(List txIds) {
+ for (String writeTxId : txIds) {
+ SinkContext sinkContext = SinkContextManager.getInstance().getSinkContext(writeTxId);
+ if (sinkContext != null) {
+ sinkContext.setFailed(true);
+ }
+ }
+ }
+
+ private void updateCtxCounters(List txIds, List fullTableName, List tableUpdateCount) {
+ writeLock.lock();
+ for (int i = 0; i < txIds.size(); i++) {
+ metricsFacade.recordRowEvent(tableUpdateCount.get(i));
+ String writeTxId = txIds.get(i);
+ SinkContext sinkContext = SinkContextManager.getInstance().getSinkContext(writeTxId);
+
+ try {
+ sinkContext.tableCounterLock.lock();
+ sinkContext.recordTimestamp(fullTableName.get(i), LocalDateTime.now());
+ sinkContext.updateCounter(fullTableName.get(i), tableUpdateCount.get(i));
+ if (sinkContext.isCompleted()) {
+ SinkContextManager.getInstance().endTransaction(sinkContext);
+ }
+ } finally {
+ sinkContext.tableCounterLock.unlock();
+ }
+ }
+ writeLock.unlock();
+ }
+
+ protected RetinaProto.TableUpdateData.Builder buildTableUpdateDataFromBatch(String txId, List smallBatch) {
+ SinkContext sinkContext = SinkContextManager.getInstance().getSinkContext(txId);
+ if (sinkContext == null) {
+ return null;
+ }
+ try {
+ sinkContext.getLock().lock();
+ while (sinkContext.getPixelsTransCtx() == null) {
+ LOGGER.warn("Wait for tx to begin trans: {}", txId); // CODE SHOULD NOT REACH HERE
+ sinkContext.getCond().await();
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ sinkContext.getLock().unlock();
+ }
+ RowChangeEvent event1 = smallBatch.get(0);
+
+ RetinaProto.TableUpdateData.Builder builder = RetinaProto.TableUpdateData.newBuilder()
+ .setTimestamp(sinkContext.getTimestamp())
+ .setPrimaryIndexId(event1.getTableMetadata().getPrimaryIndexKeyId())
+ .setTableName(tableName);
+ try {
+ for (RowChangeEvent smallEvent : smallBatch) {
+ addUpdateData(smallEvent, builder);
+ }
+ } catch (SinkException e) {
+ throw new RuntimeException("Flush failed for table " + tableName, e);
+ }
+ return builder;
+ }
+
+ @Override
+ protected boolean needFlush() {
+ return buffer.size() >= flushBatchSize;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableSingleRecordWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableSingleRecordWriter.java
new file mode 100644
index 0000000..773181b
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableSingleRecordWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+import io.pixelsdb.pixels.common.transaction.TransContext;
+import io.pixelsdb.pixels.retina.RetinaProto;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+import io.pixelsdb.pixels.sink.freshness.FreshnessClient;
+import io.prometheus.client.Summary;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class TableSingleRecordWriter extends TableCrossTxWriter {
+ @Getter
+ private final Logger LOGGER = LoggerFactory.getLogger(TableSingleRecordWriter.class);
+ private final TransactionProxy transactionProxy;
+
+ public TableSingleRecordWriter(String t, int bucketId) {
+ super(t, bucketId);
+ this.transactionProxy = TransactionProxy.Instance();
+ }
+
+ /**
+ * Flush any buffered events for the current transaction.
+ */
+ public void flush(List batch) {
+ TransContext pixelsTransContext = transactionProxy.getNewTransContext(tableName);
+ writeLock.lock();
+ try {
+ List tableUpdateDataBuilderList = new LinkedList<>();
+ for (RowChangeEvent event : batch) {
+ event.setTimeStamp(pixelsTransContext.getTimestamp());
+ event.updateIndexKey();
+ }
+
+ RetinaProto.TableUpdateData.Builder builder = buildTableUpdateDataFromBatch(pixelsTransContext, batch);
+ if (builder != null) {
+ tableUpdateDataBuilderList.add(builder);
+ }
+
+ // flushRateLimiter.acquire(batch.size());
+ long txStartTime = System.currentTimeMillis();
+
+ List tableUpdateData = new ArrayList<>(tableUpdateDataBuilderList.size());
+ for (RetinaProto.TableUpdateData.Builder tableUpdateDataItem : tableUpdateDataBuilderList) {
+ tableUpdateData.add(tableUpdateDataItem.build());
+ }
+
+ final Summary.Timer startWriteLatencyTimer = metricsFacade.startWriteLatencyTimer(tableName);
+ CompletableFuture updateRecordResponseCompletableFuture = delegate.writeBatchAsync(batch.get(0).getSchemaName(), tableUpdateData);
+
+ updateRecordResponseCompletableFuture.thenAccept(
+ resp ->
+ {
+ if (freshness_embed) {
+ FreshnessClient.getInstance().addMonitoredTable(tableName);
+ }
+
+ if (resp.getHeader().getErrorCode() != 0) {
+ transactionProxy.rollbackTrans(pixelsTransContext);
+ } else {
+ metricsFacade.recordRowEvent(batch.size());
+ long txEndTime = System.currentTimeMillis();
+ if (freshnessLevel.equals("row")) {
+ metricsFacade.recordFreshness(txEndTime - txStartTime);
+ }
+ transactionProxy.commitTrans(pixelsTransContext);
+ if (startWriteLatencyTimer != null) {
+ startWriteLatencyTimer.observeDuration();
+ }
+ }
+ }
+ );
+ } catch (SinkException e) {
+ throw new RuntimeException(e);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ protected RetinaProto.TableUpdateData.Builder buildTableUpdateDataFromBatch(TransContext transContext, List smallBatch) {
+ RowChangeEvent event1 = smallBatch.get(0);
+ RetinaProto.TableUpdateData.Builder builder = RetinaProto.TableUpdateData.newBuilder()
+ .setTimestamp(transContext.getTimestamp())
+ .setPrimaryIndexId(event1.getTableMetadata().getPrimaryIndexKeyId())
+ .setTableName(tableName);
+ try {
+ for (RowChangeEvent smallEvent : smallBatch) {
+ addUpdateData(smallEvent, builder);
+ }
+ } catch (SinkException e) {
+ throw new RuntimeException("Flush failed for table " + tableName, e);
+ }
+ return builder;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableSingleTxWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableSingleTxWriter.java
new file mode 100644
index 0000000..ba61d69
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableSingleTxWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+import io.pixelsdb.pixels.retina.RetinaProto;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TableSingleTxWriter extends TableWriter {
+ private static final long TX_TIMEOUT_MS = 3000;
+ @Getter
+ private final Logger LOGGER = LoggerFactory.getLogger(TableSingleTxWriter.class);
+
+ public TableSingleTxWriter(String tableName, int bucketId) {
+ super(tableName, bucketId);
+ }
+
+ /**
+ * Flush any buffered events for the current transaction.
+ */
+ public void flush(List batchToFlush) {
+ List batch;
+ String txId;
+ RetinaProto.TableUpdateData.Builder toBuild;
+ SinkContext sinkContext = null;
+ bufferLock.lock();
+ try {
+ if (buffer.isEmpty() || currentTxId == null) {
+ return;
+ }
+ txId = currentTxId;
+ currentTxId = null;
+
+ sinkContext = SinkContextManager.getInstance().getSinkContext(txId);
+ sinkContext.getLock().lock();
+ try {
+ while (sinkContext.getPixelsTransCtx() == null) {
+ LOGGER.warn("Wait for prev tx to begin trans: {}", txId);
+ sinkContext.getCond().await();
+ }
+ } finally {
+ sinkContext.getLock().unlock();
+ }
+
+ // Swap buffers quickly under lock
+ batch = buffer;
+ buffer = new ArrayList<>();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ bufferLock.unlock();
+ }
+
+ RowChangeEvent event1 = batch.get(0);
+
+ RetinaProto.TableUpdateData.Builder builder = RetinaProto.TableUpdateData.newBuilder()
+ .setPrimaryIndexId(event1.getTableMetadata().getPrimaryIndexKeyId())
+ .setTableName(tableName);
+
+
+ try {
+ for (RowChangeEvent event : batch) {
+ addUpdateData(event, builder);
+ }
+ List tableUpdateData = List.of(builder.build());
+ delegate.writeTrans(event1.getSchemaName(), tableUpdateData);
+ sinkContext.updateCounter(fullTableName, batch.size());
+ // ---- Outside lock: build proto and write ----
+ LOGGER.info("Flushing {} events for table {} txId={}", batch.size(), fullTableName, txId);
+ } catch (SinkException e) {
+ throw new RuntimeException("Flush failed for table " + tableName, e);
+ }
+ }
+
+ @Override
+ protected boolean needFlush() {
+ if (currentTxId == null || !currentTxId.equals(txId)) {
+ return !buffer.isEmpty();
+ }
+ return false;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableWriter.java
new file mode 100644
index 0000000..cbd251d
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableWriter.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+
+import io.pixelsdb.pixels.retina.RetinaProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+import io.pixelsdb.pixels.sink.util.FlushRateLimiter;
+import io.pixelsdb.pixels.sink.util.MetricsFacade;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @package: io.pixelsdb.pixels.sink.writer.retina
+ * @className: TableWriter
+ * @author: AntiO2
+ * @date: 2025/9/27 09:58
+ */
+public abstract class TableWriter {
+
+ protected final RetinaServiceProxy delegate; // physical writer
+ protected final ReentrantLock bufferLock = new ReentrantLock();
+ protected final Condition flushCondition = bufferLock.newCondition();
+ protected final Thread flusherThread;
+ protected final String tableName;
+ protected final long flushInterval;
+ protected final FlushRateLimiter flushRateLimiter;
+ protected final SinkContextManager sinkContextManager;
+ protected final String freshnessLevel;
+ protected final boolean freshness_embed;
+ private final ScheduledExecutorService flushExecutor = Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService logScheduler = Executors.newScheduledThreadPool(1);
+ private final AtomicInteger counter = new AtomicInteger();
+ protected volatile boolean running = true;
+ // Shared state (protected by lock)
+ protected List buffer = new LinkedList<>();
+ protected volatile String currentTxId = null;
+ protected String txId = null;
+ protected String fullTableName;
+ protected PixelsSinkConfig config;
+ protected MetricsFacade metricsFacade = MetricsFacade.getInstance();
+ protected TransactionMode transactionMode;
+
+ protected TableWriter(String tableName, int bucketId) {
+ this.config = PixelsSinkConfigFactory.getInstance();
+ this.tableName = tableName;
+ this.flushInterval = config.getFlushIntervalMs();
+ this.flushRateLimiter = FlushRateLimiter.getInstance();
+ this.sinkContextManager = SinkContextManager.getInstance();
+ this.freshnessLevel = config.getSinkMonitorFreshnessLevel();
+ this.delegate = new RetinaServiceProxy(bucketId);
+ this.transactionMode = config.getTransactionMode();
+ String sinkMonitorFreshnessLevel = config.getSinkMonitorFreshnessLevel();
+ if (sinkMonitorFreshnessLevel.equals("embed")) {
+ freshness_embed = true;
+ } else {
+ freshness_embed = false;
+ }
+ if (this.config.isMonitorReportEnabled() && this.config.isRetinaLogQueueEnabled()) {
+ long interval = this.config.getMonitorReportInterval();
+ Runnable monitorTask = writerInfoTask(tableName);
+ logScheduler.scheduleAtFixedRate(
+ monitorTask,
+ 0,
+ interval,
+ TimeUnit.MILLISECONDS
+ );
+ }
+ this.flusherThread = new Thread(new FlusherRunnable(), "Pixels-Flusher-" + tableName);
+ this.flusherThread.start();
+ }
+
+ /**
+ * Helper: add insert/delete data into proto builder.
+ */
+ protected static void addUpdateData(RowChangeEvent rowChangeEvent,
+ RetinaProto.TableUpdateData.Builder builder) throws SinkException {
+ switch (rowChangeEvent.getOp()) {
+ case SNAPSHOT, INSERT -> {
+ RetinaProto.InsertData.Builder insertDataBuilder = RetinaProto.InsertData.newBuilder();
+ insertDataBuilder.addIndexKeys(rowChangeEvent.getAfterKey());
+ insertDataBuilder.addAllColValues(rowChangeEvent.getAfterData());
+ builder.addInsertData(insertDataBuilder);
+ }
+ case UPDATE -> {
+ RetinaProto.UpdateData.Builder updateDataBuilder = RetinaProto.UpdateData.newBuilder();
+ updateDataBuilder.addIndexKeys(rowChangeEvent.getAfterKey());
+ updateDataBuilder.addAllColValues(rowChangeEvent.getAfterData());
+ builder.addUpdateData(updateDataBuilder);
+ }
+ case DELETE -> {
+ RetinaProto.DeleteData.Builder deleteDataBuilder = RetinaProto.DeleteData.newBuilder();
+ deleteDataBuilder.addIndexKeys(rowChangeEvent.getBeforeKey());
+ builder.addDeleteData(deleteDataBuilder);
+ }
+ case UNRECOGNIZED -> {
+ throw new SinkException("Unrecognized op: " + rowChangeEvent.getOp());
+ }
+ }
+ }
+
+ private void submitFlushTask(List batch) {
+ if (batch == null || batch.isEmpty()) {
+ return;
+ }
+ flushExecutor.submit(() ->
+ {
+ flush(batch);
+ });
+ }
+
+ private Runnable writerInfoTask(String tableName) {
+ final AtomicInteger reportId = new AtomicInteger();
+ final AtomicInteger lastRunCounter = new AtomicInteger();
+ Runnable monitorTask = () ->
+ {
+ String firstTx = "none";
+ RowChangeEvent firstEvent = null;
+ int len = 0;
+ bufferLock.lock();
+ len = buffer.size();
+ if (!buffer.isEmpty()) {
+ firstEvent = buffer.get(0);
+ }
+ bufferLock.unlock();
+ if (firstEvent != null) {
+ firstTx = firstEvent.getTransaction().getId();
+ int count = counter.get();
+ getLOGGER().info("{} Writer {}: Tx Now is {}. Buffer Len is {}. Total Count {}", reportId.incrementAndGet(), tableName, firstTx, len, count);
+ }
+ };
+ return monitorTask;
+ }
+
+ protected abstract Logger getLOGGER();
+
+ public boolean write(RowChangeEvent event, SinkContext ctx) {
+ try {
+ bufferLock.lock();
+ try {
+ if (!transactionMode.equals(TransactionMode.RECORD)) {
+ txId = ctx.getSourceTxId();
+ }
+ currentTxId = txId;
+ if (fullTableName == null) {
+ fullTableName = event.getFullTableName();
+ }
+ counter.incrementAndGet();
+ buffer.add(event);
+
+ if (needFlush()) {
+ flushCondition.signalAll();
+ }
+ } finally {
+ bufferLock.unlock();
+ }
+ return true;
+ } catch (Exception e) {
+ getLOGGER().error("Write failed for table {}", tableName, e);
+ return false;
+ }
+ }
+
+ public abstract void flush(List batchToFlush);
+
+ protected abstract boolean needFlush();
+
+ public void close() {
+ this.running = false;
+ if (this.flusherThread != null) {
+ this.flusherThread.interrupt();
+ }
+ logScheduler.shutdown();
+ try {
+ logScheduler.awaitTermination(5, TimeUnit.SECONDS);
+ flushExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ if (this.flusherThread != null) {
+ this.flusherThread.join(5000);
+ }
+ delegate.close();
+ } catch (InterruptedException ignored) {
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private class FlusherRunnable implements Runnable {
+ @Override
+ public void run() {
+ while (running) {
+ bufferLock.lock();
+ try {
+ if (!needFlush()) {
+ try {
+ // Conditional wait: will wait until signaled by write() or timeout
+ flushCondition.await(flushInterval, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // Exit loop if interrupted during shutdown
+ running = false;
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+
+ List batchToFlush = buffer;
+ buffer = new LinkedList<>();
+ bufferLock.unlock();
+ submitFlushTask(batchToFlush);
+ bufferLock.lock();
+ } finally {
+ bufferLock.unlock();
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableWriterProxy.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableWriterProxy.java
new file mode 100644
index 0000000..3733e3c
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableWriterProxy.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+import io.pixelsdb.pixels.common.node.BucketCache;
+import io.pixelsdb.pixels.daemon.NodeProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TableWriterProxy {
+ private final static TableWriterProxy INSTANCE = new TableWriterProxy();
+
+ private final TransactionMode transactionMode;
+ private final int retinaCliNum;
+ private final Map WRITER_REGISTRY = new ConcurrentHashMap<>();
+
+ private TableWriterProxy() {
+ PixelsSinkConfig pixelsSinkConfig = PixelsSinkConfigFactory.getInstance();
+ this.transactionMode = pixelsSinkConfig.getTransactionMode();
+ this.retinaCliNum = pixelsSinkConfig.getRetinaClientNum();
+ }
+
+ protected static TableWriterProxy getInstance() {
+ return INSTANCE;
+ }
+
+ protected TableWriter getTableWriter(String tableName, long tableId, int bucket) {
+ int cliNo = bucket % retinaCliNum;
+ // warn: we assume table id is less than INT.MAX
+ WriterKey key = new WriterKey(tableId, BucketCache.getInstance().getRetinaNodeInfoByBucketId(bucket), cliNo);
+
+ return WRITER_REGISTRY.computeIfAbsent(key, t ->
+ {
+ switch (transactionMode) {
+ case SINGLE -> {
+ return new TableSingleTxWriter(tableName, bucket);
+ }
+ case BATCH -> {
+ return new TableCrossTxWriter(tableName, bucket);
+ }
+ case RECORD -> {
+ return new TableSingleRecordWriter(tableName, bucket);
+ }
+ default -> {
+ throw new IllegalArgumentException("Unknown transaction mode: " + transactionMode);
+ }
+ }
+ });
+ }
+
+ record WriterKey(long tableId, NodeProto.NodeInfo nodeInfo, int cliNo) {
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionMode.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionMode.java
new file mode 100644
index 0000000..145095e
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionMode.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+public enum TransactionMode {
+ SINGLE,
+ RECORD,
+ BATCH;
+
+ public static TransactionMode fromValue(String value) {
+ for (TransactionMode mode : values()) {
+ if (mode.name().equalsIgnoreCase(value)) {
+ return mode;
+ }
+ }
+ throw new RuntimeException(String.format("Can't convert %s to transaction mode", value));
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionProxy.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionProxy.java
new file mode 100644
index 0000000..d1092c7
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionProxy.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina;
+
+import io.pixelsdb.pixels.common.exception.TransException;
+import io.pixelsdb.pixels.common.transaction.TransContext;
+import io.pixelsdb.pixels.common.transaction.TransService;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.util.MetricsFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class if for pixels trans service
+ *
+ * @author AntiO2
+ */
+public class TransactionProxy {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TransactionProxy.class);
+ private static volatile TransactionProxy instance;
+ private final TransService transService;
+ private final Queue transContextQueue;
+ private final Object batchLock = new Object();
+ private final ExecutorService batchCommitExecutor;
+ private final MetricsFacade metricsFacade = MetricsFacade.getInstance();
+ private final BlockingQueue toCommitTransContextQueue;
+ private final String freshnessLevel;
+ private final int BATCH_SIZE;
+ private final int WORKER_COUNT;
+ private final int MAX_WAIT_MS;
+
+ private AtomicInteger beginCount = new AtomicInteger(0);
+ private AtomicInteger commitCount = new AtomicInteger(0);
+
+ private TransactionProxy() {
+ PixelsSinkConfig pixelsSinkConfig = PixelsSinkConfigFactory.getInstance();
+ BATCH_SIZE = pixelsSinkConfig.getCommitBatchSize();
+ WORKER_COUNT = pixelsSinkConfig.getCommitBatchWorkers();
+ MAX_WAIT_MS = pixelsSinkConfig.getCommitBatchDelay();
+
+ this.transService = TransService.Instance();
+ this.transContextQueue = new ConcurrentLinkedDeque<>();
+ this.toCommitTransContextQueue = new LinkedBlockingQueue<>();
+ this.batchCommitExecutor = Executors.newFixedThreadPool(
+ WORKER_COUNT,
+ r ->
+ {
+ Thread t = new Thread(r);
+ t.setName("commit-trans-batch-thread");
+ t.setDaemon(true);
+ return t;
+ }
+ );
+ for (int i = 0; i < WORKER_COUNT; i++) {
+ batchCommitExecutor.submit(this::batchCommitWorker);
+ }
+
+ this.freshnessLevel = pixelsSinkConfig.getSinkMonitorFreshnessLevel();
+ }
+
+ public static TransactionProxy Instance() {
+ if (instance == null) {
+ synchronized (TransactionProxy.class) {
+ if (instance == null) {
+ instance = new TransactionProxy();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public static void staticClose() {
+ if (instance != null) {
+ instance.close();
+ }
+ }
+
+ private void requestTransactions() {
+ try {
+ List