Skip to content

Commit

Permalink
Splitting out processing to dedicated method
Browse files Browse the repository at this point in the history
This is intended to allow the processing to be decoupled from the CloudEvent method that is specific to how Cloud Functions will process and allow the processing to be performed directly such as from Cloud Run
  • Loading branch information
JaredHatfield committed Aug 1, 2024
1 parent 9e2fdd9 commit 6b268e2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,30 @@ private boolean isConfigured() {
@Override
public void accept(CloudEvent event) throws InvalidProtocolBufferException {

byte[] data = event.getData().toBytes();

// Parse the Firestore data
DocumentEventData firestoreEventData = DocumentEventData.parseFrom(data);

// Process the request
process(firestoreEventData, data);
}

/**
* Process the Firestore event for replication.
*
* @param firestoreEventData the Firestore event; parsed from the data
* @param data the raw data; used for PubSub message replication
*/
public void process(DocumentEventData firestoreEventData, byte[] data) {

// Check if the consumer is configured properly
if (!this.configured) {
logger.severe(
"Not configured, document will not be replicated and databases will be out of sync.");
return;
}

// Parse the Firestore data
DocumentEventData firestoreEventData =
DocumentEventData.parseFrom(event.getData().toBytes());

// Get the resource name for the document for insert/update/delete
String resourceName = null;
if (firestoreEventData.hasValue()) {
Expand Down Expand Up @@ -190,8 +203,7 @@ public void accept(CloudEvent event) throws InvalidProtocolBufferException {

// Prepare the message to be published
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setOrderingKey(documentPath)
.setData(ByteString.copyFrom(event.getData().toBytes()))
.putAllAttributes(attributes).build();
.setData(ByteString.copyFrom(data)).putAllAttributes(attributes).build();

// Publish the message
this.publisher.publishMessage(pubsubMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,36 @@ private boolean isConfigured() {
return valid;
}


@Override
public void accept(CloudEvent event) throws InvalidProtocolBufferException {

// Parse the PubSub payload
String cloudEventData = new String(event.getData().toBytes());
PubSubPublish data = gson.fromJson(cloudEventData, PubSubPublish.class);

// Process the Pub/Sub message
process(data);

// Log the event
logger.finest("Pub/Sub message: " + event);
}

/**
* Process the Pub/Sub message.
*
* @param data The Pub/Sub message
* @throws InvalidProtocolBufferException
*/
public void process(PubSubPublish data) throws InvalidProtocolBufferException {

// Check if the consumer is configured properly
if (!this.configured) {
logger.severe(
"Not configured, document will not be replicated and databases will be out of sync.");
return;
}

// Parse the payload
String cloudEventData = new String(event.getData().toBytes());
PubSubPublish data = gson.fromJson(cloudEventData, PubSubPublish.class);

String pubsubDatabase = data.getMessage().getAttribute("database");

// Do not process updates when database change is for the same region
Expand Down Expand Up @@ -214,8 +230,5 @@ public void accept(CloudEvent event) throws InvalidProtocolBufferException {
logger.info("Document deleted: " + documentPath);
}
}

logger.info("Pub/Sub message: " + event);
}

}

0 comments on commit 6b268e2

Please sign in to comment.