Skip to content

Commit 750dbe9

Browse files
shalev007Shalev Avhar
andauthored
fix resync-state kafka listener bug (#931)
- **fix: kafka listener constant resync loop** - **feat: bump version** # Description *Bugfix* ```diff ! What - Fix kafka listener never ending loop of resyncs bug - Why - This bug was created when we started to updated the resyncState along with integration data, - therefor each update would create a new audit-log change which triggered the Kafka listener to start a new resync + How - we validate that it was not an update created by the resyc-state by comparing them ``` ## Type of change Please leave one option from the following and delete the rest: - [X] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. --------- Co-authored-by: Shalev Avhar <shalev@getport.io>
1 parent d54b97b commit 750dbe9

File tree

2 files changed

+8
-3
lines changed

2 files changed

+8
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1212
### Improvements
1313

1414
- Add support for reporting the integration resync state to expose more information about the integration state in the portal
15+
- Fix kafka listener never ending resync loop due to resyncState updates
1516

1617

1718
## 0.9.14 (2024-08-19)

port_ocean/core/event_listener/kafka.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,13 @@ def _should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool:
9999
return False
100100

101101
integration_identifier = after.get("identifier")
102-
if integration_identifier == self.integration_identifier and (
103-
"change.log" in topic
104-
):
102+
if integration_identifier != self.integration_identifier:
103+
return False
104+
105+
if after.get("updatedAt") == after.get("resyncState", {}).get("updatedAt"):
106+
return False
107+
108+
if "change.log" in topic:
105109
return msg_value.get("changelogDestination", {}).get("type", "") == "KAFKA"
106110

107111
return False

0 commit comments

Comments
 (0)