diff --git a/src/main/java/com/unitvectory/crossfiresync/FirestoreChangePublisher.java b/src/main/java/com/unitvectory/crossfiresync/FirestoreChangePublisher.java index e2e0582..2c40b69 100644 --- a/src/main/java/com/unitvectory/crossfiresync/FirestoreChangePublisher.java +++ b/src/main/java/com/unitvectory/crossfiresync/FirestoreChangePublisher.java @@ -129,6 +129,23 @@ private boolean isConfigured() { @Override public void accept(CloudEvent event) throws InvalidProtocolBufferException { + byte[] data = event.getData().toBytes(); + + // Parse the Firestore data + DocumentEventData firestoreEventData = DocumentEventData.parseFrom(data); + + // Process the request + process(firestoreEventData, data); + } + + /** + * Process the Firestore event for replication. + * + * @param firestoreEventData the Firestore event; parsed from the data + * @param data the raw data; used for PubSub message replication + */ + public void process(DocumentEventData firestoreEventData, byte[] data) { + // Check if the consumer is configured properly if (!this.configured) { logger.severe( @@ -136,10 +153,6 @@ public void accept(CloudEvent event) throws InvalidProtocolBufferException { return; } - // Parse the Firestore data - DocumentEventData firestoreEventData = - DocumentEventData.parseFrom(event.getData().toBytes()); - // Get the resource name for the document for insert/update/delete String resourceName = null; if (firestoreEventData.hasValue()) { @@ -190,8 +203,7 @@ public void accept(CloudEvent event) throws InvalidProtocolBufferException { // Prepare the message to be published PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setOrderingKey(documentPath) - .setData(ByteString.copyFrom(event.getData().toBytes())) - .putAllAttributes(attributes).build(); + .setData(ByteString.copyFrom(data)).putAllAttributes(attributes).build(); // Publish the message this.publisher.publishMessage(pubsubMessage); diff --git a/src/main/java/com/unitvectory/crossfiresync/PubSubChangeConsumer.java b/src/main/java/com/unitvectory/crossfiresync/PubSubChangeConsumer.java index 38efb45..ea8d7d8 100644 --- a/src/main/java/com/unitvectory/crossfiresync/PubSubChangeConsumer.java +++ b/src/main/java/com/unitvectory/crossfiresync/PubSubChangeConsumer.java @@ -118,9 +118,29 @@ private boolean isConfigured() { return valid; } + @Override public void accept(CloudEvent event) throws InvalidProtocolBufferException { + // Parse the PubSub payload + String cloudEventData = new String(event.getData().toBytes()); + PubSubPublish data = gson.fromJson(cloudEventData, PubSubPublish.class); + + // Process the Pub/Sub message + process(data); + + // Log the event + logger.finest("Pub/Sub message: " + event); + } + + /** + * Process the Pub/Sub message. + * + * @param data The Pub/Sub message + * @throws InvalidProtocolBufferException + */ + public void process(PubSubPublish data) throws InvalidProtocolBufferException { + // Check if the consumer is configured properly if (!this.configured) { logger.severe( @@ -128,10 +148,6 @@ public void accept(CloudEvent event) throws InvalidProtocolBufferException { return; } - // Parse the payload - String cloudEventData = new String(event.getData().toBytes()); - PubSubPublish data = gson.fromJson(cloudEventData, PubSubPublish.class); - String pubsubDatabase = data.getMessage().getAttribute("database"); // Do not process updates when database change is for the same region @@ -214,8 +230,5 @@ public void accept(CloudEvent event) throws InvalidProtocolBufferException { logger.info("Document deleted: " + documentPath); } } - - logger.info("Pub/Sub message: " + event); } - }