Skip to content

Commit

Permalink
Add command for creating pipelines
Browse files Browse the repository at this point in the history
Add a new command `datasets create-pipeline` for creating pipelines
for existing datasets.
  • Loading branch information
simenheg committed Sep 25, 2024
1 parent 047172b commit 1928f72
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 34 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## ?.?.? - Unreleased

* New command `datasets create-pipeline` for creating pipelines for existing
datasets.
* More robust dataset/version/edition URI parsing.

## 4.2.0 - 2024-06-18
Expand Down
9 changes: 8 additions & 1 deletion okdata/cli/commands/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from requests.exceptions import HTTPError

from okdata.cli.command import BaseCommand, BASE_COMMAND_OPTIONS
from okdata.cli.commands.datasets.wizards import DatasetCreateWizard
from okdata.cli.commands.datasets.wizards import (
DatasetCreateWizard,
PipelineCreateWizard,
)
from okdata.cli.io import read_json, resolve_output_filepath
from okdata.cli.output import create_output

Expand All @@ -22,6 +25,7 @@ class DatasetsCommand(BaseCommand):
okdata datasets create-version <datasetid> [options]
okdata datasets create-edition <datasetid> [<versionid>] [options]
okdata datasets create-distribution <datasetid> [<versionid> <editionid>] [options]
okdata datasets create-pipeline <datasetid> [options]
Examples:
okdata datasets ls
Expand All @@ -32,6 +36,7 @@ class DatasetsCommand(BaseCommand):
okdata datasets ls my-dataset/1/20240101T102030 --format=json
okdata datasets create --file=dataset.json
okdata datasets cp /tmp/file.csv ds:my-dataset-id
okdata datasets create-pipeline my-dataset
Options:{BASE_COMMAND_OPTIONS}
--file=<file> # Use this file for configuration or upload
Expand Down Expand Up @@ -60,6 +65,8 @@ def handler(self):
self.create_edition()
elif self.cmd("create-distribution"):
self.create_distribution()
elif self.cmd("create-pipeline"):
PipelineCreateWizard(self, self.arg("datasetid")).start()
else:
self.help()

Expand Down
14 changes: 13 additions & 1 deletion okdata/cli/commands/datasets/questions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
]


def qs_create():
def qs_create_dataset():
return [
{
**required_style,
Expand Down Expand Up @@ -114,3 +114,15 @@ def qs_create():
"when": lambda x: x["sourceType"] in pipeline_choices,
},
]


def qs_create_pipeline():
return [
{
**required_style,
"type": "select",
"name": "pipeline",
"message": "Prosessering",
"choices": pipeline_choices["file"],
},
]
80 changes: 48 additions & 32 deletions okdata/cli/commands/datasets/wizards.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,41 @@
from okdata.sdk.pipelines.client import PipelineApiClient

from okdata.cli.command import confirm_to_continue
from okdata.cli.commands.datasets.questions import qs_create
from okdata.cli.commands.datasets.questions import qs_create_dataset, qs_create_pipeline
from okdata.cli.commands.wizard import run_questionnaire


def _pipeline_config(pipeline_processor_id, dataset_id, version):
return {
"pipelineProcessorId": pipeline_processor_id,
"id": dataset_id,
"datasetUri": f"output/{dataset_id}/{version}",
}


def _pipeline_input_config(pipeline_id, dataset_id, version):
return {
"pipelineInstanceId": pipeline_id,
"datasetUri": f"input/{dataset_id}/{version}",
"stage": "raw",
}


def _create_pipeline(command, env, dataset_id, pipeline):
command.print("Creating pipeline...")
pipeline_client = PipelineApiClient(env=env)
pipeline_config = _pipeline_config(pipeline, dataset_id, "1")
pipeline_id = pipeline_client.create_pipeline_instance(pipeline_config)
pipeline_id = pipeline_id.strip('"') # What's up with these?
command.print(f"Created pipeline with ID: {pipeline_id}")

command.print("Creating pipeline input...")
pipeline_input_config = _pipeline_input_config(pipeline_id, dataset_id, "1")
pipeline_input_id = pipeline_client.create_pipeline_input(pipeline_input_config)
pipeline_input_id = pipeline_input_id.strip('"') # What's up with these?
command.print(f"Created pipeline input with ID: {pipeline_input_id}")


class DatasetCreateWizard:
"""Wizard for the `datasets create` command.
Expand Down Expand Up @@ -39,23 +70,9 @@ def dataset_config(self, choices):

return config

def pipeline_config(self, pipeline_processor_id, dataset_id, version):
return {
"pipelineProcessorId": pipeline_processor_id,
"id": dataset_id,
"datasetUri": f"output/{dataset_id}/{version}",
}

def pipeline_input_config(self, pipeline_id, dataset_id, version):
return {
"pipelineInstanceId": pipeline_id,
"datasetUri": f"input/{dataset_id}/{version}",
"stage": "raw",
}

def start(self):
env = self.command.opt("env")
choices = run_questionnaire(*qs_create())
choices = run_questionnaire(*qs_create_dataset())

confirm_to_continue(
"Will create a new dataset '{}'.{}".format(
Expand All @@ -76,22 +93,7 @@ def start(self):
self.command.print(f"Created dataset with ID: {dataset_id}")

if choices.get("pipeline"):
self.command.print("Creating pipeline...")
pipeline_client = PipelineApiClient(env=env)
pipeline_config = self.pipeline_config(choices["pipeline"], dataset_id, "1")
pipeline_id = pipeline_client.create_pipeline_instance(pipeline_config)
pipeline_id = pipeline_id.strip('"') # What's up with these?
self.command.print(f"Created pipeline with ID: {pipeline_id}")

self.command.print("Creating pipeline input...")
pipeline_input_config = self.pipeline_input_config(
pipeline_id, dataset_id, "1"
)
pipeline_input_id = pipeline_client.create_pipeline_input(
pipeline_input_config
)
pipeline_input_id = pipeline_input_id.strip('"') # What's up with these?
self.command.print(f"Created pipeline input with ID: {pipeline_input_id}")
_create_pipeline(self.command, env, dataset_id, choices["pipeline"])

if choices["sourceType"] == "file":
self.command.print(
Expand All @@ -102,3 +104,17 @@ def start(self):
)
else:
self.command.print("Done!")


class PipelineCreateWizard:
"""Wizard for the `datasets create-pipeline` command."""

def __init__(self, command, dataset_id):
self.command = command
self.dataset_id = dataset_id

def start(self):
choices = run_questionnaire(*qs_create_pipeline())
_create_pipeline(
self.command, self.command.opt("env"), self.dataset_id, choices["pipeline"]
)

0 comments on commit 1928f72

Please sign in to comment.