Skip to content

Commit

Permalink
feat(event-subscriptions): event consumer extension
Browse files Browse the repository at this point in the history
- implemented DI for event consumers
- added custom config for event consumers
- initialize the event scheduler along with application
- added workflow custom type
  • Loading branch information
sushi30 committed Jan 29, 2025
1 parent 58d5988 commit e354c40
Show file tree
Hide file tree
Showing 16 changed files with 161 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ
EventPubSub.start();

ApplicationHandler.initialize(catalogConfig);
EventSubscriptionScheduler.initialize(catalogConfig);
registerResources(catalogConfig, environment, jdbi);

// Register Event Handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.events.scheduled.ConsumerService;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
Expand All @@ -53,7 +54,7 @@ public abstract class AbstractEventConsumer
public static final String OFFSET_EXTENSION = "eventSubscription.Offset";
public static final String METRICS_EXTENSION = "eventSubscription.metrics";
public static final String FAILED_EVENT_EXTENSION = "eventSubscription.failedEvent";

protected final ConsumerService consumerService;
private long offset = -1;
private long startingOffset = -1;

Expand All @@ -63,7 +64,9 @@ public abstract class AbstractEventConsumer
protected EventSubscription eventSubscription;
protected Map<UUID, Destination<ChangeEvent>> destinationMap;

protected AbstractEventConsumer() {}
protected AbstractEventConsumer(ConsumerService consumerService) {
this.consumerService = consumerService;
}

private void init(JobExecutionContext context) {
EventSubscription sub =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.events.scheduled.ConsumerService;

@Slf4j
public class AlertPublisher extends AbstractEventConsumer {
public AlertPublisher(ConsumerService consumerService) {
super(consumerService);
}

@Override
public void sendAlert(UUID receiverId, ChangeEvent event) throws EventPublisherException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.openmetadata.service.events.scheduled;

import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;

public interface ConsumerService {
PipelineServiceClientResponse runAutomationWorkflow(Workflow wf);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.openmetadata.service.events.scheduled;

import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.util.DIContainer;

public class ConsumerServiceImpl implements ConsumerService {
private final DIContainer di;

public ConsumerServiceImpl(DIContainer diContainer) {
di = diContainer;
}

@Override
public PipelineServiceClientResponse runAutomationWorkflow(Workflow wf) {
return di.getResource(PipelineServiceClientInterface.class).runAutomationsWorkflow(wf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer;
import org.openmetadata.service.apps.bundles.changeEvent.AlertPublisher;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EventSubscriptionRepository;
import org.openmetadata.service.resources.events.subscription.TypedEvent;
import org.openmetadata.service.util.DIContainer;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
Expand All @@ -56,31 +61,60 @@
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.spi.JobFactory;
import org.quartz.spi.TriggerFiredBundle;

@Slf4j
public class EventSubscriptionScheduler {
public static final String ALERT_JOB_GROUP = "OMAlertJobGroup";
public static final String ALERT_TRIGGER_GROUP = "OMAlertJobGroup";
private static EventSubscriptionScheduler instance;
private static volatile boolean initialized = false;

private final Scheduler alertsScheduler = new StdSchedulerFactory().getScheduler();

private EventSubscriptionScheduler() throws SchedulerException {
private record CustomJobFactory(ConsumerService consumerService) implements JobFactory {

@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
try {
JobDetail jobDetail = bundle.getJobDetail();
Class<? extends Job> jobClass = jobDetail.getJobClass();
Job job =
jobClass.getDeclaredConstructor(ConsumerService.class).newInstance(consumerService);
return job;
} catch (Exception e) {
throw new SchedulerException("Failed to create job instance", e);
}
}
}

private EventSubscriptionScheduler(PipelineServiceClientInterface pipelineServiceClient)
throws SchedulerException {
DIContainer di = new DIContainer();
di.registerResource(PipelineServiceClientInterface.class, pipelineServiceClient);
ConsumerService consumerService = new ConsumerServiceImpl(di);
this.alertsScheduler.setJobFactory(new CustomJobFactory(consumerService));
this.alertsScheduler.start();
}

@SneakyThrows
public static EventSubscriptionScheduler getInstance() {
if (!initialized) {
initialize();
throw new RuntimeException("Event Subscription Scheduler is not initialized");
}
return instance;
}

private static void initialize() throws SchedulerException {
public static void initialize(OpenMetadataApplicationConfig openMetadataApplicationConfig) {
PipelineServiceClientInterface pipelineServiceClient =
PipelineServiceClientFactory.createPipelineServiceClient(
openMetadataApplicationConfig.getPipelineServiceClientConfiguration());
if (!initialized) {
instance = new EventSubscriptionScheduler();
try {
instance = new EventSubscriptionScheduler(pipelineServiceClient);
} catch (SchedulerException e) {
throw new RuntimeException("Failed to initialize Event Subscription Scheduler", e);
}
initialized = true;
} else {
LOG.info("Event Subscription Scheduler is already initialized");
Expand All @@ -101,7 +135,9 @@ public void addSubscriptionPublisher(EventSubscription eventSubscription, boolea
Optional.ofNullable(eventSubscription.getClassName())
.orElse(defaultClass.getCanonicalName()))
.asSubclass(AbstractEventConsumer.class);
AbstractEventConsumer publisher = clazz.getDeclaredConstructor().newInstance();
ConsumerService consumerService = new ConsumerServiceImpl(null);
AbstractEventConsumer publisher =
clazz.getDeclaredConstructor(ConsumerService.class).newInstance(consumerService);
if (reinstall && isSubscriptionRegistered(eventSubscription)) {
deleteEventSubscriptionPublisher(eventSubscription);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public void entitySpecificUpdate(boolean consolidatingChanges) {
objectMatch,
false);
recordChange("trigger", original.getTrigger(), updated.getTrigger(), true);
recordChange("config", original.getConfig(), updated.getConfig(), true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public EventSubscription createToEntity(CreateEventSubscription create, String u
.withClassName(
validateConsumerClass(
Optional.ofNullable(create.getClassName())
.orElse(AlertPublisher.class.getCanonicalName())));
.orElse(AlertPublisher.class.getCanonicalName())))
.withConfig(create.getConfig());
}

private String validateConsumerClass(String className) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
"The `Events` are changes to metadata and are sent when entities are created, modified, or updated. External systems can subscribe to events using event subscription API over Webhooks, Slack, or Microsoft Teams.")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "events/subscriptions")
@Collection(name = "events/subscriptions", order = 7) // needs to initialize before applications
public class EventSubscriptionResource
extends EntityResource<EventSubscription, EventSubscriptionRepository> {
public static final String COLLECTION_PATH = "/v1/events/subscriptions";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.openmetadata.service.util;

import java.util.HashMap;
import java.util.Map;

public class DIContainer {
private final Map<Class<?>, Object> resources = new HashMap<>();

public <T> void registerResource(Class<T> resourceClass, T resource) {
resources.put(resourceClass, resource);
}

public <T> T getResource(Class<T> resourceClass) {
T resource = resourceClass.cast(resources.get(resourceClass));
if (resource == null) {
throw new IllegalStateException("Resource not initialized: " + resourceClass.getName());
}
return resource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"description": "This enum defines the type for which this workflow applies to.",
"type": "string",
"enum": [
"TEST_CONNECTION"
"TEST_CONNECTION",
"CUSTOM"
]
},
"workflowStatus": {
Expand Down Expand Up @@ -47,6 +48,10 @@
"description": "Type of the workflow.",
"$ref": "#/definitions/workflowType"
},
"pythonClass": {
"description": "Python class to be executed for this workflow (for custom workflows)",
"type": "string"
},
"status": {
"description": "Workflow computation status.",
"$ref": "#/definitions/workflowStatus",
Expand All @@ -57,6 +62,9 @@
"oneOf": [
{
"$ref": "testServiceConnection.json"
},
{
"$ref": "../../type/basic.json#/definitions/map"
}
]
},
Expand All @@ -65,6 +73,9 @@
"oneOf": [
{
"$ref": "../services/connections/testConnectionResult.json"
},
{
"$ref": "../../type/basic.json#/definitions/map"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@
"domain" : {
"description": "Fully qualified name of the domain the Table belongs to.",
"type": "string"
},
"config": {
"$ref": "../../type/basic.json#/definitions/map"
}
},
"required": ["name", "alertType"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@
"domain" : {
"description": "Domain the asset belongs to. When not set, the asset inherits the domain from the parent it belongs to.",
"$ref": "../type/entityReference.json"
},
"config": {
"$ref": "../type/basic.json#/definitions/map"
}
},
"required": ["id", "name", "alertType", "destinations"],
Expand Down
Loading

0 comments on commit e354c40

Please sign in to comment.