Skip to content

Commit

Permalink
feat(#3280): Add support for pipeline templates
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Oct 3, 2024
1 parent a4c5d00 commit 3f5c07c
Show file tree
Hide file tree
Showing 50 changed files with 1,232 additions and 1,213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,72 +18,93 @@

package org.apache.streampipes.connect.management.compact;

import org.apache.streampipes.manager.template.PipelineTemplateManagement;
import org.apache.streampipes.manager.pipeline.PipelineManager;
import org.apache.streampipes.manager.pipeline.compact.CompactPipelineManagement;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.compact.CreateOptions;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
import org.apache.streampipes.model.template.PipelineTemplateInvocation;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.model.template.CompactPipelineTemplate;
import org.apache.streampipes.storage.api.CRUDStorage;
import org.apache.streampipes.vocabulary.SO;

public class PersistPipelineHandler {
import java.util.List;
import java.util.Map;

import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_CONNECTOR_ID;
import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_DIMENSIONS_FIELD;
import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_MEASUREMENT_FIELD;
import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_TEMPLATE_ID;
import static org.apache.streampipes.manager.template.instances.PersistDataLakePipelineTemplate.DATA_LAKE_TIMESTAMP_FIELD;

private static final String templateId = "org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate";
private static final String configPrefix = "jsplumb_domId2";
public class PersistPipelineHandler {

private final PipelineTemplateManagement pipelineTemplateManagement;
private final CRUDStorage<CompactPipelineTemplate> templateStorage;
private final CompactPipelineManagement pipelineManagement;
private final String authenticatedUserSid;

public PersistPipelineHandler(PipelineTemplateManagement pipelineTemplateManagement,
public PersistPipelineHandler(CRUDStorage<CompactPipelineTemplate> templateStorage,
CompactPipelineManagement pipelineManagement,
String authenticatedUserSid) {
this.pipelineTemplateManagement = pipelineTemplateManagement;
this.templateStorage = templateStorage;
this.pipelineManagement = pipelineManagement;
this.authenticatedUserSid = authenticatedUserSid;
}

public PipelineOperationStatus createAndStartPersistPipeline(AdapterDescription adapterDescription) {
var pipelineTemplateInvocation = pipelineTemplateManagement.prepareInvocation(
adapterDescription.getCorrespondingDataStreamElementId(),
templateId
);

applyPipelineName(pipelineTemplateInvocation, adapterDescription.getName());
applyDataLakeConfig(pipelineTemplateInvocation, adapterDescription);

return pipelineTemplateManagement.createAndStartPipeline(pipelineTemplateInvocation, authenticatedUserSid);
public PipelineOperationStatus createAndStartPersistPipeline(AdapterDescription adapterDescription) throws Exception {
var template = getTemplate();
if (template != null) {
var compactPipeline = new CompactPipeline(
String.format("persist-%s", adapterDescription.getName().replaceAll(" ", "-")),
String.format("Persist %s", adapterDescription.getName()),
null,
makeTemplateConfig(adapterDescription, template.getPipeline()),
new CreateOptions(false, true)
);
var pipelineGenerationResult = pipelineManagement.makePipeline(compactPipeline);
if (pipelineGenerationResult.allPipelineElementsValid()) {
String pipelineId = PipelineManager.addPipeline(authenticatedUserSid, pipelineGenerationResult.pipeline());
if (compactPipeline.createOptions().start()) {
return PipelineManager.startPipeline(pipelineId);
}
}
}
throw new IllegalArgumentException("Could not start persist pipeline");
}

private void applyPipelineName(PipelineTemplateInvocation pipelineTemplateInvocation,
String adapterName) {
pipelineTemplateInvocation.setPipelineTemplateId(templateId);
pipelineTemplateInvocation.setKviName(adapterName);
private CompactPipelineTemplate getTemplate() {
return this.templateStorage.getElementById(DATA_LAKE_TEMPLATE_ID);
}

private void applyDataLakeConfig(PipelineTemplateInvocation pipelineTemplateInvocation,
AdapterDescription adapterDescription) {
pipelineTemplateInvocation.getStaticProperties().forEach(sp -> {
if (sp.getInternalName().equalsIgnoreCase(withPrefix("db_measurement"))) {
((FreeTextStaticProperty) sp).setValue(adapterDescription.getName());
}
if (sp.getInternalName().equalsIgnoreCase(withPrefix("timestamp_mapping"))) {
((MappingPropertyUnary) sp).setSelectedProperty(
String.format("s0::%s", getTimestampField(adapterDescription)
));
}
if (sp.getInternalName().equalsIgnoreCase(withPrefix("schema_update"))) {
((OneOfStaticProperty) sp).getOptions().forEach(o -> {
if (o.getName().equals("Update schema")) {
o.setSelected(true);
}
});
}
});
private List<CompactPipelineElement> makeTemplateConfig(AdapterDescription adapterDescription,
List<CompactPipelineElement> pipelineElements) {
pipelineElements.get(0).configuration().addAll(
List.of(
Map.of(DATA_LAKE_MEASUREMENT_FIELD, adapterDescription.getName()),
Map.of(DATA_LAKE_TIMESTAMP_FIELD, String.format("s0::%s", getTimestampField(adapterDescription))),
Map.of(DATA_LAKE_DIMENSIONS_FIELD, getDimensions(adapterDescription))
)
);
pipelineElements.add(new CompactPipelineElement(
"stream",
DATA_LAKE_CONNECTOR_ID,
adapterDescription.getCorrespondingDataStreamElementId(),
null,
null
));
return pipelineElements;
}

private String withPrefix(String config) {
return configPrefix + config;
private List<String> getDimensions(AdapterDescription adapterDescription) {
return adapterDescription.getEventSchema().getEventProperties()
.stream()
.filter(ep -> ep.getPropertyScope().equalsIgnoreCase(PropertyScope.DIMENSION_PROPERTY.name()))
.map(EventProperty::getRuntimeName)
.toList();
}

private String getTimestampField(AdapterDescription adapterDescription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.streampipes.model.pipeline.compact;

import org.apache.streampipes.model.connect.adapter.compact.CreateOptions;
import org.apache.streampipes.model.shared.annotation.TsModel;

import java.util.List;

@TsModel
public record CompactPipeline(
String id,
String name,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.template;

import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
import org.apache.streampipes.model.shared.annotation.TsModel;
import org.apache.streampipes.model.shared.api.Storable;

import com.fasterxml.jackson.annotation.JsonAlias;
import com.google.gson.annotations.SerializedName;

import java.util.List;

@TsModel
public class CompactPipelineTemplate implements Storable {

@JsonAlias("id")
protected @SerializedName("_id") String elementId;

@JsonAlias("_rev")
protected @SerializedName("_rev") String rev;

private String name;
private String description;
private List<CompactPipelineElement> pipeline;
private PipelinePlaceholders placeholders;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public List<CompactPipelineElement> getPipeline() {
return pipeline;
}

public void setPipeline(List<CompactPipelineElement> pipeline) {
this.pipeline = pipeline;
}

public PipelinePlaceholders getPlaceholders() {
return placeholders;
}

public void setPlaceholders(PipelinePlaceholders placeholders) {
this.placeholders = placeholders;
}

@Override
public String getRev() {
return rev;
}

@Override
public void setRev(String rev) {
this.rev = rev;
}

@Override
public String getElementId() {
return elementId;
}

@Override
public void setElementId(String elementId) {
this.elementId = elementId;
}

public CompactPipeline toCompactPipeline() {
return new CompactPipeline(
null,
null,
null,
this.getPipeline(),
null
);
}
}

This file was deleted.

Loading

0 comments on commit 3f5c07c

Please sign in to comment.