Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
175 commits
Select commit Hold shift + click to select a range
d1ae694
chore: initial and incomplete refactor
alepane21 Mar 3, 2025
bca3e2b
feat(pubsub): refactor pubsub handling and add Kafka/NATS support
alepane21 Mar 18, 2025
d72f8c8
Merge branch 'refs/heads/main' into ale/eng-6482-edfs-refactor-kafka-…
alepane21 Mar 18, 2025
5df3613
feat(plan-generator): add logger to router initialization
alepane21 Mar 18, 2025
efaa122
refactor(pubsub): nats is now working
alepane21 Mar 20, 2025
23eb5bb
test(kafka): fix mutation kafka test
alepane21 Mar 21, 2025
b0660d9
chore: initial refactor to reduce code needed to start new provider
alepane21 Mar 22, 2025
163aa5b
refactor(pubsub): implement lazy connection handling for NATS and Kaf…
alepane21 Mar 24, 2025
1ccf0c7
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 Mar 24, 2025
9dd59b2
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 Mar 24, 2025
33603e8
refactor(pubsub): remove unused variables and comments in datasource …
alepane21 Mar 24, 2025
ebab2e3
Merge remote-tracking branch 'origin/main' into ale/eng-6482-edfs-ref…
alepane21 Apr 4, 2025
a473fdd
make it works with different providers in the same datasource
alepane21 Apr 7, 2025
fa7e378
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 Apr 7, 2025
e6645c8
chore: cleaning up a little, reintroduced host and listener vars
alepane21 Apr 8, 2025
5e900cb
feat: introduce InstanceData struct for improved configuration manage…
alepane21 Apr 8, 2025
3b9ac24
chore: improved naming, simplify code
alepane21 Apr 8, 2025
51f1343
chore: remove unused pub/sub configuration code
alepane21 Apr 8, 2025
60c9b4b
chore: remove unused field
alepane21 Apr 8, 2025
4017c9f
chore: remove registry
alepane21 Apr 8, 2025
cbd58e0
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 Apr 8, 2025
e76ceb3
fix: improve error message in Kafka provider
alepane21 Apr 9, 2025
47e83bd
chore: remove commented error handling code in Planner.ConfigureFetch
alepane21 Apr 9, 2025
546be70
chore: add adapter interfaces and tests
alepane21 Apr 9, 2025
3a023e2
chore: add github.com/stretchr/objx v0.5.2 dependency
alepane21 Apr 9, 2025
2a96355
chore: remove unused bytesBuffer helper from Kafka pubsub tests
alepane21 Apr 9, 2025
04e425e
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 Apr 9, 2025
fe2f388
chore: add tests
alepane21 Apr 10, 2025
d4ed48a
chore: cleaned a bit the implementation, add a README to make it easi…
alepane21 Apr 11, 2025
592cbb6
feat: implement PubSub provider startup and shutdown on graphServer
alepane21 Apr 11, 2025
c2bb1cb
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 Apr 11, 2025
bb77254
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 Apr 14, 2025
8295c26
refactor: update Kafka and NATS adapters to use direct client connect…
alepane21 Apr 14, 2025
0a6e8a0
refactor: enhance NATS adapter with flush timeout and update test exp…
alepane21 Apr 14, 2025
3aeba04
test: log unexpected errors during WebSocket message read in tests
alepane21 Apr 14, 2025
5096734
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 Apr 14, 2025
f5e8e90
feat: add custom JSON unmarshalling for WebSocketMessage to improve e…
alepane21 Apr 14, 2025
ad96e3d
refactor: replace custom JSON unmarshalling in WebSocketMessage with …
alepane21 Apr 15, 2025
85adb37
fix: adapter base interface
alepane21 Apr 16, 2025
c571d9a
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 Apr 16, 2025
1bb2dfd
docs: update README for PubSub Provider usage section
alepane21 Apr 16, 2025
389d330
Merge remote-tracking branch 'origin/main' into ale/eng-6482-edfs-ref…
alepane21 Apr 18, 2025
a1e3cb8
Merge remote-tracking branch 'origin/main' into ale/eng-6482-edfs-ref…
alepane21 Apr 18, 2025
bb4e0a1
Merge remote-tracking branch 'origin/main' into ale/eng-6482-edfs-ref…
alepane21 Apr 23, 2025
43977a0
refactor: remove unused EnginePubSubProviders struct and related impo…
alepane21 Apr 23, 2025
13f8d33
refactor: update MoodHandler and NewSchema to include GetPubSubName p…
alepane21 Apr 23, 2025
5f95739
refactor: enhance NATS publishing logic in UpdateMood resolver to han…
alepane21 Apr 23, 2025
5886403
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 May 7, 2025
01e12bc
chore: pass up providers instead of passing down a callback, small fixes
alepane21 May 7, 2025
b5c7357
fix: rename NatParams to NatsParams for consistency in pubsub.go
alepane21 May 7, 2025
8eb72eb
refactor: replace datasource package with pubsubtest for common PubSu…
alepane21 May 7, 2025
3148ce3
docs: enhance PubSubDataSource interface documentation with detailed …
alepane21 May 7, 2025
02c0a45
fix: improve pubsub provider startup and shutdown processes with time…
alepane21 May 7, 2025
711c37d
feat: add tests for Kafka and NATS startup/shutdown behavior with inc…
alepane21 May 8, 2025
25e15aa
refactor: extract JSON reading and checking logic into a reusable fun…
alepane21 May 8, 2025
1874521
refactor: move ReadAndCheckJSON function to improve code organization…
alepane21 May 8, 2025
8b7fbb1
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 May 8, 2025
92b91d7
refactor: remove outdated comment regarding Kafka client connection b…
alepane21 May 9, 2025
802659a
refactor: rename datasource import to pubsub_datasource for clarity i…
alepane21 May 9, 2025
86fada3
fix: instanceData not set in the structure
alepane21 May 9, 2025
ee87b59
refactor: streamline pubsub provider startup and shutdown with unifie…
alepane21 May 9, 2025
afe6ac6
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 May 9, 2025
39517e8
fix: replace timeout error formatting with errors.New for consistency…
alepane21 May 9, 2025
914f21c
refactor: update ExecutorConfigurationBuilder and Loader to return Pu…
alepane21 May 9, 2025
ddb437d
refactor: update BuildEventDataBytes to use operation from ast.Docume…
alepane21 May 9, 2025
0205b27
refactor: enhance PubSub provider structure by consolidating data sou…
alepane21 May 12, 2025
158313e
fix: change tests to use EventConfiguration istead of EventConfigurat…
alepane21 May 12, 2025
48f305f
Merge branch 'main' into ale/eng-6482-edfs-refactor-filtered-datasources
alepane21 May 13, 2025
73644c8
chore: add PubSubProviderBuilder to allow the filter of datasource me…
alepane21 May 13, 2025
a24c72b
chore: a datasource per event
alepane21 May 13, 2025
3d9ef5b
Merge branch 'main' into ale/eng-6482-edfs-refactor-filtered-datasources
alepane21 May 13, 2025
abdfcfd
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
devsergiy May 13, 2025
da50cc7
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 May 15, 2025
f26ded9
Merge remote-tracking branch 'origin/ale/eng-6482-edfs-refactor-kafka…
alepane21 May 15, 2025
57c3d7f
feat: refactor data source handling and improve provider initialization
alepane21 May 15, 2025
56217b0
refactor: simplify BuildProvidersAndDataSources function signature an…
alepane21 May 15, 2025
526dc48
refactor: move EngineEventConfiguration interface to provider.go
alepane21 May 15, 2025
c3c5889
chore: improved docs
alepane21 May 15, 2025
a898da3
refactor: reorganize NATS provider structure
alepane21 May 15, 2025
ffff337
feat: reuse provider in different graphs, streamline pub sub builder
alepane21 May 15, 2025
765d731
test: add unit tests for BuildProvidersAndDataSources function
alepane21 May 15, 2025
c5623ca
test: add unit tests for provider
alepane21 May 15, 2025
839e8b7
docs: update README with detailed PubSub provider implementation steps
alepane21 May 15, 2025
5e4a9b8
feat: add Redis event configuration and PubSub support
alepane21 May 16, 2025
0689f07
feat: add EDFS Redis publish and subscribe constants
alepane21 May 26, 2025
703bb29
chore: generate index.global.js
alepane21 May 26, 2025
4c4293b
chore: lint
alepane21 May 26, 2025
5496d7e
chore: rename PublishAndRequestEventConfiguration to PublishEventConf…
alepane21 May 26, 2025
a150cc1
chore: compile index.global.js
alepane21 May 26, 2025
0d0912d
feat: add Redis configuration support in schema and fixtures
alepane21 May 26, 2025
2597f57
chore: fix small typo
alepane21 May 26, 2025
7b89736
Merge branch 'ale/eng-6482-edfs-refactor-filtered-datasources' into a…
alepane21 May 26, 2025
31af4da
test: update base config to include redis operations, update structur…
alepane21 May 26, 2025
89b29c9
chore: add redis config to demo config
alepane21 May 26, 2025
4edb873
Merge remote-tracking branch 'origin/ale/eng-6380-support-redis-pubsu…
alepane21 May 26, 2025
4395be2
feat: update Redis configuration to use the right format in demo conf…
alepane21 May 27, 2025
2f9c870
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 May 27, 2025
c84c844
Merge branch 'ale/eng-6482-edfs-refactor-kafka-and-nats-as-datasource…
alepane21 May 27, 2025
8144278
Merge branch 'ale/eng-6482-edfs-refactor-filtered-datasources' into a…
alepane21 May 27, 2025
b535e9f
refactor: streamline provider and data source construction in PubSub
alepane21 May 27, 2025
cb6ce98
chore: fix unused field
alepane21 May 28, 2025
dec7f6d
chore: testenv now don't add empty providers
alepane21 May 28, 2025
562b96d
Merge remote-tracking branch 'origin/main' into ale/eng-6482-edfs-ref…
alepane21 May 28, 2025
cfecb90
Merge branch 'ale/eng-6482-edfs-refactor-kafka-and-nats-as-datasource…
alepane21 May 28, 2025
49db63a
refactor: update datasource import to use pubsub_datasource in factor…
alepane21 May 28, 2025
0d0429c
Merge branch 'ale/eng-6482-edfs-refactor-kafka-and-nats-as-datasource…
alepane21 May 28, 2025
0e3ffe4
feat: add ProviderNotDefinedError and separated logic by provider
alepane21 May 28, 2025
2e37a40
chore: add a basic provider implementation for common usage and remov…
alepane21 May 28, 2025
eb8f95e
chore: removed reference to *nodev1.EngineEventConfiguration and *nod…
alepane21 May 28, 2025
28f489d
chore: remove unused interface
alepane21 May 28, 2025
72830c9
chore: move EngineEventConfiguration in a more appropriate package
alepane21 May 28, 2025
8276186
chore: generate mocks with mockery
alepane21 May 28, 2025
4764c74
chore: update go.mod and go.sum by removing unused dependencies and a…
alepane21 May 28, 2025
d5c19d8
chore: avoid duplicated code inside BuildProvidersAndDataSources, res…
alepane21 May 28, 2025
8f29bf0
chore: add mock implementations for PubSubProvider and PubSubDataSour…
alepane21 May 29, 2025
0f7fd6f
style: update comment formatting in BuildProvidersAndDataSources func…
alepane21 May 29, 2025
7000f5a
docs: enhance README for PubSub provider integration and update confi…
alepane21 May 29, 2025
22503c9
test: rename existing test and add Kafka provider test for BuildProvi…
alepane21 May 29, 2025
64043e3
Merge branch 'ale/eng-6482-edfs-refactor-filtered-datasources' into a…
alepane21 May 29, 2025
f7e48bf
chore: update redis implementation
alepane21 May 29, 2025
4b6e211
chore: restored test "subscribe async with filter"
alepane21 May 29, 2025
ad15f00
fix: remove unnecessary URLs from Redis event source in tests and cle…
alepane21 May 29, 2025
ce969a2
Merge branch 'main' into ale/eng-6482-edfs-refactor-kafka-and-nats-as…
alepane21 May 29, 2025
4cd2d7e
Merge branch 'ale/eng-6482-edfs-refactor-kafka-and-nats-as-datasource…
alepane21 May 29, 2025
156916b
chore: build the datasource when the planner gets int o ConfigureFetc…
alepane21 May 30, 2025
684a4fc
chore: add test to verify no double transformation happens when the s…
alepane21 May 30, 2025
b72c7b5
chore: go mod tidy
alepane21 May 30, 2025
22bf957
test: update PubSub data source tests to use BuildDataSourceFactory a…
alepane21 May 30, 2025
8f98766
Merge branch 'ale/eng-6482-edfs-refactor-filtered-datasources' into a…
alepane21 Jun 3, 2025
0a93736
refactor: add BuildDataSourceFactory to redis implementation
alepane21 Jun 3, 2025
dfd3bf9
chore: add tests to Redis pubsub provider
alepane21 Jun 3, 2025
aeced3b
chore: reduce parallel things in the matrix
alepane21 Jun 5, 2025
963c37d
Merge remote-tracking branch 'origin/main' into ale/eng-6482-edfs-ref…
alepane21 Jun 5, 2025
ac59b12
chore: go mod tidy
alepane21 Jun 5, 2025
31ae243
chore: use table in the README
alepane21 Jun 5, 2025
5dbef22
chore: made changes suggested in PR
alepane21 Jun 5, 2025
8a1a036
chore: change adapters names
alepane21 Jun 6, 2025
6b50be0
chore: remove BuildDataSourceFactory from ProviderBuilder
alepane21 Jun 6, 2025
dc206b5
Merge remote-tracking branch 'origin/main' into ale/eng-6482-edfs-ref…
alepane21 Jun 6, 2025
17f4df6
Merge branch 'ale/eng-6482-edfs-refactor-kafka-and-nats-as-datasource…
alepane21 Jun 6, 2025
0ff66a9
chore: renamed PubSubDataSource -> EngineDataSourceFactory, removed P…
alepane21 Jun 6, 2025
e971616
Merge branch 'ale/eng-6482-edfs-refactor-filtered-datasources' into a…
alepane21 Jun 6, 2025
3a6875d
chore: port the adapter to new pubsub structure
alepane21 Jun 6, 2025
f794a57
chore: read redis tests
alepane21 Jun 6, 2025
e6b5c67
chore: explain the MarshalJSONTemplate
alepane21 Jun 6, 2025
03f69eb
chore: fix comment
alepane21 Jun 6, 2025
fae3a54
Merge branch 'main' into ale/eng-6482-edfs-refactor-filtered-datasources
alepane21 Jun 6, 2025
6c43253
Merge branch 'ale/eng-6482-edfs-refactor-filtered-datasources' into a…
alepane21 Jun 6, 2025
605fa02
chore: fix redis tests
alepane21 Jun 6, 2025
850939b
Merge remote-tracking branch 'origin/main' into ale/eng-6380-support-…
alepane21 Jun 6, 2025
76b957d
chore: fix mockery config
alepane21 Jun 9, 2025
1348137
chore: go back to basic runner
alepane21 Jun 9, 2025
e8ea17b
chore: restore runner ubuntu-latest-xl
alepane21 Jun 9, 2025
f3b1e82
chore: change runner to ubuntu-latest-l
alepane21 Jun 9, 2025
60e5799
Merge branch 'main' into ale/eng-6380-support-redis-pubsub-in-edfs
alepane21 Jun 9, 2025
299d2c4
feat: add redis cluster support
alepane21 Jun 9, 2025
1990518
Merge branch 'main' into ale/eng-6380-support-redis-pubsub-in-edfs
alepane21 Jun 9, 2025
2cfd1c1
chore: fixed redis adapter behaviour on shutdown and removed nodeev1 …
alepane21 Jun 10, 2025
ca51014
Merge branch 'main' into ale/eng-6380-support-redis-pubsub-in-edfs
alepane21 Jun 10, 2025
7ac3b92
fix: redis event mapping
alepane21 Jun 10, 2025
e2553f3
test: add redis cluster tests
alepane21 Jun 10, 2025
ba21786
fix: redis-cluster-configure is launched only after the redis nodes a…
alepane21 Jun 10, 2025
aa54cec
Merge branch 'main' into ale/eng-6380-support-redis-pubsub-in-edfs
alepane21 Jun 10, 2025
9814f31
test: rewrote test to not use testify.require/assert inside go routin…
alepane21 Jun 11, 2025
6cdda9a
Merge remote-tracking branch 'origin/main' into ale/eng-6380-support-…
alepane21 Jun 11, 2025
fba9f18
test: remove useless test
alepane21 Jun 11, 2025
a2d20f7
Merge remote-tracking branch 'origin/main' into ale/eng-6380-support-…
alepane21 Jun 11, 2025
1c80b98
chore: remove redundant code in a test, fixed config descriptions
alepane21 Jun 11, 2025
7daeecb
Merge branch 'main' into ale/eng-6380-support-redis-pubsub-in-edfs
alepane21 Jun 17, 2025
267a9ae
chore: improve redis-cluster-create reliability
alepane21 Jun 17, 2025
0b64e46
chore: fix redis test after last changes to configuration
alepane21 Jun 17, 2025
2007f56
chore: fix redis test after merge
alepane21 Jun 17, 2025
244e7de
Merge branch 'main' into ale/eng-6380-support-redis-pubsub-in-edfs
alepane21 Jun 25, 2025
7a15d0f
chore: don't use normalizeSubgraph
alepane21 Jun 25, 2025
74ccdb2
chore: formatting
alepane21 Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ setup-dev-tools: setup-build-tools
go install github.com/yannh/kubeconform/cmd/kubeconform@v0.6.3
go install github.com/norwoodj/helm-docs/cmd/helm-docs@v1.11.3
go install github.com/vektra/mockery/v3@v3.3.1
go install github.com/Antonboom/testifylint@v1.6.1

prerequisites: setup-dev-tools
go version
Expand All @@ -20,6 +21,7 @@ prerequisites: setup-dev-tools
docker -v
dbmate -v
mockery version
testifylint -V=full

infra-up: dc-dev

Expand Down
392 changes: 196 additions & 196 deletions composition-go/index.global.js

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion composition/src/router-configuration/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ export type NatsEventType = 'subscribe' | 'publish' | 'request';

export type KafkaEventType = 'subscribe' | 'publish';

export type RedisEventType = 'subscribe' | 'publish';

export type StreamConfiguration = {
consumerInactiveThreshold: number;
consumerName: string;
Expand All @@ -25,7 +27,15 @@ export type NatsEventConfiguration = {
streamConfiguration?: StreamConfiguration;
};

export type EventConfiguration = KafkaEventConfiguration | NatsEventConfiguration;
export type RedisEventConfiguration = {
fieldName: string;
providerId: string;
providerType: 'redis';
channels: string[];
type: RedisEventType;
};

export type EventConfiguration = KafkaEventConfiguration | NatsEventConfiguration | RedisEventConfiguration;

export type SubscriptionFilterValue = boolean | null | number | string;

Expand Down
5 changes: 5 additions & 0 deletions composition/src/utils/string-constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export const EDFS_NATS_REQUEST = 'edfs__natsRequest';
export const EDFS_NATS_SUBSCRIBE = 'edfs__natsSubscribe';
export const EDFS_PUBLISH_RESULT = 'edfs__PublishResult';
export const EDFS_NATS_STREAM_CONFIGURATION = 'edfs__NatsStreamConfiguration';
export const EDFS_REDIS_PUBLISH = 'edfs__redisPublish';
export const EDFS_REDIS_SUBSCRIBE = 'edfs__redisSubscribe';
export const ENTITIES = 'entities';
export const ENTITIES_FIELD = '_entities';
export const ENTITY_UNION = '_Entity';
Expand Down Expand Up @@ -83,6 +85,7 @@ export const MUTATION_UPPER = 'MUTATION';
export const PROPAGATE = 'propagate';
export const PROVIDER_TYPE_KAFKA = 'kafka';
export const PROVIDER_TYPE_NATS = 'nats';
export const PROVIDER_TYPE_REDIS = 'redis';
export const NOT_APPLICABLE = 'N/A';
export const NAME = 'name';
export const NON_NULLABLE_EDFS_PUBLISH_EVENT_RESULT = 'edfs__PublishResult!';
Expand Down Expand Up @@ -146,6 +149,8 @@ export const UNION_UPPER = 'UNION';
export const URL_LOWER = 'url';
export const VALUES = 'values';
export const VARIABLE_DEFINITION_UPPER = 'VARIABLE_DEFINITION';
export const CHANNEL = 'channel';
export const CHANNELS = 'channels';

export const EXECUTABLE_DIRECTIVE_LOCATIONS = new Set<string>([
FIELD_UPPER,
Expand Down
70 changes: 70 additions & 0 deletions composition/src/v1/normalization/directive-definition-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {
EDFS_NATS_PUBLISH_DEFINITION,
EDFS_NATS_REQUEST_DEFINITION,
EDFS_NATS_SUBSCRIBE_DEFINITION,
EDFS_REDIS_PUBLISH_DEFINITION,
EDFS_REDIS_SUBSCRIBE_DEFINITION,
EXTENDS_DEFINITION,
EXTERNAL_DEFINITION,
INACCESSIBLE_DEFINITION,
Expand Down Expand Up @@ -92,6 +94,10 @@ import {
TOPICS,
UNION_UPPER,
URL_LOWER,
CHANNEL,
CHANNELS,
EDFS_REDIS_PUBLISH,
EDFS_REDIS_SUBSCRIBE,
} from '../../utils/string-constants';

export const AUTHENTICATED_DEFINITION_DATA: DirectiveDefinitionData = {
Expand Down Expand Up @@ -433,6 +439,70 @@ export const NATS_SUBSCRIBE_DEFINITION_DATA: DirectiveDefinitionData = {
requiredArgumentNames: new Set<string>([SUBJECTS]),
};

export const REDIS_PUBLISH_DEFINITION_DATA: DirectiveDefinitionData = {
argumentTypeNodeByArgumentName: new Map<string, ArgumentData>([
[
CHANNEL,
{
name: CHANNEL,
typeNode: REQUIRED_STRING_TYPE_NODE,
},
],
[
PROVIDER_ID,
{
name: PROVIDER_ID,
typeNode: REQUIRED_STRING_TYPE_NODE,
defaultValue: {
kind: Kind.STRING,
value: DEFAULT_EDFS_PROVIDER_ID,
},
},
],
]),
isRepeatable: false,
locations: new Set<string>([FIELD_DEFINITION_UPPER]),
name: EDFS_REDIS_PUBLISH,
node: EDFS_REDIS_PUBLISH_DEFINITION,
optionalArgumentNames: new Set<string>([PROVIDER_ID]),
requiredArgumentNames: new Set<string>([CHANNEL]),
};

export const REDIS_SUBSCRIBE_DEFINITION_DATA: DirectiveDefinitionData = {
argumentTypeNodeByArgumentName: new Map<string, ArgumentData>([
[
CHANNELS,
{
name: CHANNELS,
typeNode: {
kind: Kind.NON_NULL_TYPE,
type: {
kind: Kind.LIST_TYPE,
type: REQUIRED_STRING_TYPE_NODE,
},
},
},
],
[
PROVIDER_ID,
{
name: PROVIDER_ID,
typeNode: REQUIRED_STRING_TYPE_NODE,
defaultValue: {
kind: Kind.STRING,
value: DEFAULT_EDFS_PROVIDER_ID,
},
},
],
]),
isRepeatable: false,
locations: new Set<string>([FIELD_DEFINITION_UPPER]),
name: EDFS_REDIS_SUBSCRIBE,
node: EDFS_REDIS_SUBSCRIBE_DEFINITION,
optionalArgumentNames: new Set<string>([PROVIDER_ID]),
requiredArgumentNames: new Set<string>([CHANNELS]),
};

export const OVERRIDE_DEFINITION_DATA: DirectiveDefinitionData = {
argumentTypeNodeByArgumentName: new Map<string, ArgumentData>([
[
Expand Down
110 changes: 108 additions & 2 deletions composition/src/v1/normalization/normalization-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ import {
SUCCESS,
TOPIC,
TOPICS,
CHANNEL,
CHANNELS,
EDFS_REDIS_PUBLISH,
EDFS_REDIS_SUBSCRIBE,
PROVIDER_TYPE_REDIS,
} from '../../utils/string-constants';
import { MAX_INT32 } from '../../utils/integer-constants';
import {
Expand Down Expand Up @@ -2413,6 +2418,89 @@ export class NormalizationFactory {
};
}

getRedisPublishConfiguration(
directive: ConstDirectiveNode,
argumentDataByArgumentName: Map<string, InputValueData>,
fieldName: string,
errorMessages: string[],
): EventConfiguration | undefined {
const channels: string[] = [];
let providerId = DEFAULT_EDFS_PROVIDER_ID;
for (const argumentNode of directive.arguments || []) {
switch (argumentNode.name.value) {
case CHANNEL: {
if (argumentNode.value.kind !== Kind.STRING || argumentNode.value.value.length < 1) {
errorMessages.push(invalidEventSubjectErrorMessage(CHANNEL));
continue;
}
validateArgumentTemplateReferences(argumentNode.value.value, argumentDataByArgumentName, errorMessages);
channels.push(argumentNode.value.value);
break;
}
case PROVIDER_ID: {
if (argumentNode.value.kind !== Kind.STRING || argumentNode.value.value.length < 1) {
errorMessages.push(invalidEventProviderIdErrorMessage);
continue;
}
providerId = argumentNode.value.value;
break;
}
}
}
if (errorMessages.length > 0) {
return;
}
return { fieldName, providerId, providerType: PROVIDER_TYPE_REDIS, channels, type: PUBLISH };
}

getRedisSubscribeConfiguration(
directive: ConstDirectiveNode,
argumentDataByArgumentName: Map<string, InputValueData>,
fieldName: string,
errorMessages: string[],
): EventConfiguration | undefined {
const channels: string[] = [];
let providerId = DEFAULT_EDFS_PROVIDER_ID;
for (const argumentNode of directive.arguments || []) {
switch (argumentNode.name.value) {
case CHANNELS: {
//@TODO list coercion
if (argumentNode.value.kind !== Kind.LIST) {
errorMessages.push(invalidEventSubjectsErrorMessage(CHANNELS));
continue;
}
for (const value of argumentNode.value.values) {
if (value.kind !== Kind.STRING || value.value.length < 1) {
errorMessages.push(invalidEventSubjectsItemErrorMessage(CHANNELS));
break;
}
validateArgumentTemplateReferences(value.value, argumentDataByArgumentName, errorMessages);
channels.push(value.value);
}
break;
}
case PROVIDER_ID: {
if (argumentNode.value.kind !== Kind.STRING || argumentNode.value.value.length < 1) {
errorMessages.push(invalidEventProviderIdErrorMessage);
continue;
}
providerId = argumentNode.value.value;
break;
}
}
}
if (errorMessages.length > 0) {
return;
}
return {
fieldName,
providerId,
providerType: PROVIDER_TYPE_REDIS,
channels,
type: SUBSCRIBE,
};
}

validateSubscriptionFilterDirectiveLocation(node: FieldDefinitionNode) {
if (!node.directives) {
return;
Expand Down Expand Up @@ -2490,6 +2578,24 @@ export class NormalizationFactory {
);
break;
}
case EDFS_REDIS_PUBLISH: {
eventConfiguration = this.getRedisPublishConfiguration(
directive,
argumentDataByArgumentName,
fieldName,
errorMessages,
);
break;
}
case EDFS_REDIS_SUBSCRIBE: {
eventConfiguration = this.getRedisSubscribeConfiguration(
directive,
argumentDataByArgumentName,
fieldName,
errorMessages,
);
break;
}
default:
continue;
}
Expand All @@ -2515,11 +2621,11 @@ export class NormalizationFactory {
getValidEventsDirectiveNamesForOperationTypeNode(operationTypeNode: OperationTypeNode): Set<string> {
switch (operationTypeNode) {
case OperationTypeNode.MUTATION:
return new Set<string>([EDFS_KAFKA_PUBLISH, EDFS_NATS_PUBLISH, EDFS_NATS_REQUEST]);
return new Set<string>([EDFS_KAFKA_PUBLISH, EDFS_NATS_PUBLISH, EDFS_NATS_REQUEST, EDFS_REDIS_PUBLISH]);
case OperationTypeNode.QUERY:
return new Set<string>([EDFS_NATS_REQUEST]);
case OperationTypeNode.SUBSCRIPTION:
return new Set<string>([EDFS_KAFKA_SUBSCRIBE, EDFS_NATS_SUBSCRIBE]);
return new Set<string>([EDFS_KAFKA_SUBSCRIBE, EDFS_NATS_SUBSCRIBE, EDFS_REDIS_SUBSCRIBE]);
}
}

Expand Down
6 changes: 6 additions & 0 deletions composition/src/v1/normalization/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import {
NATS_PUBLISH_DEFINITION_DATA,
NATS_REQUEST_DEFINITION_DATA,
NATS_SUBSCRIBE_DEFINITION_DATA,
REDIS_PUBLISH_DEFINITION_DATA,
REDIS_SUBSCRIBE_DEFINITION_DATA,
OVERRIDE_DEFINITION_DATA,
PROVIDES_DEFINITION_DATA,
REQUIRES_DEFINITION_DATA,
Expand All @@ -57,6 +59,8 @@ import {
EDFS_NATS_PUBLISH,
EDFS_NATS_REQUEST,
EDFS_NATS_SUBSCRIBE,
EDFS_REDIS_PUBLISH,
EDFS_REDIS_SUBSCRIBE,
EXTENDS,
EXTERNAL,
FIELDS,
Expand Down Expand Up @@ -390,6 +394,8 @@ export function initializeDirectiveDefinitionDatas(): Map<string, DirectiveDefin
[EDFS_NATS_PUBLISH, NATS_PUBLISH_DEFINITION_DATA],
[EDFS_NATS_REQUEST, NATS_REQUEST_DEFINITION_DATA],
[EDFS_NATS_SUBSCRIBE, NATS_SUBSCRIBE_DEFINITION_DATA],
[EDFS_REDIS_PUBLISH, REDIS_PUBLISH_DEFINITION_DATA],
[EDFS_REDIS_SUBSCRIBE, REDIS_SUBSCRIBE_DEFINITION_DATA],
[EXTENDS, EXTENDS_DEFINITION_DATA],
[EXTERNAL, EXTERNAL_DEFINITION_DATA],
[INACCESSIBLE, INACCESSIBLE_DEFINITION_DATA],
Expand Down
Loading
Loading