Skip to content

Commit eda2c8a

Browse files
authored
Merge pull request #64 from Cosmo-Tech/EMIN_dataset_api_update_function_PROD-15103
feat(dataset-api): extend upload_dataset and add upload_dataset_parts
2 parents d46a29c + 4691163 commit eda2c8a

File tree

4 files changed

+381
-2
lines changed

4 files changed

+381
-2
lines changed

cosmotech/coal/cosmotech_api/apis/dataset.py

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,21 @@ def upload_dataset(
7070
dataset_name: str,
7171
as_files: Optional[list[Union[Path, str]]] = (),
7272
as_db: Optional[list[Union[Path, str]]] = (),
73+
tags: Optional[list[str]] = None,
74+
additional_data: Optional[dict] = None,
7375
) -> Dataset:
76+
"""Upload a new dataset with optional tags and additional data.
77+
78+
Args:
79+
dataset_name: The name of the dataset to create
80+
as_files: List of file paths to upload as FILE type parts
81+
as_db: List of file paths to upload as DB type parts
82+
tags: Optional list of tags to associate with the dataset
83+
additional_data: Optional dictionary of additional metadata
84+
85+
Returns:
86+
The created Dataset object
87+
"""
7488
_parts = list()
7589

7690
for _f in as_files:
@@ -81,6 +95,8 @@ def upload_dataset(
8195

8296
d_request = DatasetCreateRequest(
8397
name=dataset_name,
98+
tags=tags,
99+
additional_data=additional_data,
84100
parts=list(
85101
DatasetPartCreateRequest(
86102
name=_p_name,
@@ -92,12 +108,96 @@ def upload_dataset(
92108
),
93109
)
94110

111+
_files = []
112+
for _p in _parts:
113+
with _p[1].open("rb") as _p_file:
114+
_files.append((_p[0], _p_file.read()))
115+
95116
d_ret = self.create_dataset(
96117
self.configuration.cosmotech.organization_id,
97118
self.configuration.cosmotech.workspace_id,
98119
d_request,
99-
files=list((_p[0], _p[1].open("rb").read()) for _p in _parts),
120+
files=_files,
100121
)
101122

102123
LOGGER.info(T("coal.services.dataset.dataset_created").format(dataset_id=d_ret.id))
103124
return d_ret
125+
126+
def upload_dataset_parts(
127+
self,
128+
dataset_id: str,
129+
as_files: Optional[list[Union[Path, str]]] = (),
130+
as_db: Optional[list[Union[Path, str]]] = (),
131+
replace_existing: bool = False,
132+
) -> Dataset:
133+
"""Upload parts to an existing dataset.
134+
135+
Args:
136+
dataset_id: The ID of the existing dataset
137+
as_files: List of file paths to upload as FILE type parts
138+
as_db: List of file paths to upload as DB type parts
139+
replace_existing: If True, replace existing parts with same name
140+
141+
Returns:
142+
The updated Dataset object
143+
"""
144+
# Get current dataset to check existing parts
145+
current_dataset = self.get_dataset(
146+
organization_id=self.configuration.cosmotech.organization_id,
147+
workspace_id=self.configuration.cosmotech.workspace_id,
148+
dataset_id=dataset_id,
149+
)
150+
151+
# Build set of existing part names and their IDs for quick lookup
152+
existing_parts = {part.source_name: part.id for part in (current_dataset.parts or [])}
153+
154+
# Collect parts to upload
155+
_parts = list()
156+
for _f in as_files:
157+
_parts.extend(self.path_to_parts(_f, DatasetPartTypeEnum.FILE))
158+
for _db in as_db:
159+
_parts.extend(self.path_to_parts(_db, DatasetPartTypeEnum.DB))
160+
161+
# Process each part
162+
for _p_name, _p_path, _type in _parts:
163+
if _p_name in existing_parts:
164+
if replace_existing:
165+
# Delete existing part before creating new one
166+
self.delete_dataset_part(
167+
organization_id=self.configuration.cosmotech.organization_id,
168+
workspace_id=self.configuration.cosmotech.workspace_id,
169+
dataset_id=dataset_id,
170+
dataset_part_id=existing_parts[_p_name],
171+
)
172+
LOGGER.info(T("coal.services.dataset.part_replaced").format(part_name=_p_name))
173+
else:
174+
LOGGER.warning(T("coal.services.dataset.part_skipped").format(part_name=_p_name))
175+
continue
176+
177+
# Create new part
178+
part_request = DatasetPartCreateRequest(
179+
name=_p_name,
180+
description=_p_name,
181+
sourceName=_p_name,
182+
type=_type,
183+
)
184+
185+
with _p_path.open("rb") as _p_file:
186+
self.create_dataset_part(
187+
organization_id=self.configuration.cosmotech.organization_id,
188+
workspace_id=self.configuration.cosmotech.workspace_id,
189+
dataset_id=dataset_id,
190+
dataset_part_create_request=part_request,
191+
file=(_p_name, _p_file.read()),
192+
)
193+
LOGGER.debug(T("coal.services.dataset.part_uploaded").format(part_name=_p_name))
194+
195+
# Return updated dataset
196+
updated_dataset = self.get_dataset(
197+
organization_id=self.configuration.cosmotech.organization_id,
198+
workspace_id=self.configuration.cosmotech.workspace_id,
199+
dataset_id=dataset_id,
200+
)
201+
202+
LOGGER.info(T("coal.services.dataset.parts_uploaded").format(dataset_id=dataset_id))
203+
return updated_dataset

cosmotech/translation/coal/en-US/coal/services/dataset.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,9 @@ text_processed: "Processed text file {file_name} with {lines} lines"
5656
# Dataset API operations
5757
part_downloaded: "Downloaded part {part_name} to {file_path}"
5858
dataset_created: "Created dataset {dataset_id}"
59+
60+
# Dataset parts operations
61+
part_uploaded: "Uploaded part {part_name}"
62+
part_replaced: "Replaced existing part {part_name}"
63+
part_skipped: "Skipped existing part {part_name} (use replace_existing=True to overwrite)"
64+
parts_uploaded: "Successfully uploaded parts to dataset {dataset_id}"

tests/unit/coal/test_azure/test_azure_blob.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from azure.identity import ClientSecretCredential
1414
from azure.storage.blob import BlobServiceClient, ContainerClient
1515

16-
from cosmotech.coal.azure.blob import dump_store_to_azure
16+
from cosmotech.coal.azure.blob import VALID_TYPES, dump_store_to_azure
1717
from cosmotech.coal.store.store import Store
1818
from cosmotech.coal.utils.configuration import Configuration
1919

0 commit comments

Comments
 (0)