From f53d8fa49309db1753d08aee47af977906b71a9b Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Tue, 21 May 2024 14:12:33 +0200 Subject: [PATCH 1/5] update individual list rows --- CHANGELOG.md | 5 ++ .../sharepoint-online-append-list/recipe.json | 15 +++- .../sharepoint-online-append-list/recipe.py | 44 ++++++++-- python-lib/dss_constants.py | 2 +- python-lib/sharepoint_client.py | 48 +++++++++-- python-lib/sharepoint_lists.py | 80 +++++++++++++++++-- 6 files changed, 174 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d036f26..24f9a5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## [Version 1.1.2](https://github.com/dataiku/dss-plugin-sharepoint-online/releases/tag/v1.1.2) - Feature and bugfix release - 2024-05-21 + +- Update of individual list records +- Fix access to list with special characters in name + ## [Version 1.1.1](https://github.com/dataiku/dss-plugin-sharepoint-online/releases/tag/v1.1.1) - Bugfix release - 2024-01-24 - Fix file creation when using username / password authentication diff --git a/custom-recipes/sharepoint-online-append-list/recipe.json b/custom-recipes/sharepoint-online-append-list/recipe.json index 0c05aba..b60da28 100644 --- a/custom-recipes/sharepoint-online-append-list/recipe.json +++ b/custom-recipes/sharepoint-online-append-list/recipe.json @@ -131,14 +131,25 @@ "name": "write_mode", "label": "Write mode", "type": "SELECT", - "defaultValue": "append", "selectChoices": [ { "value": "append", "label": "Append to existing list" + }, + { + "value": "update", + "label": "Update items in existing list" } ], - "visibilityCondition": false + "defaultValue": "append", + "visibilityCondition": "model.advanced_parameters == true" + }, + { + "name": "columns_to_update", + "label": "Columns to update", + "type": "COLUMNS", + "columnRole": "input_dataset", + "visibilityCondition": "model.advanced_parameters == true && model.write_mode == 'update'" }, { "name": "max_workers", diff --git a/custom-recipes/sharepoint-online-append-list/recipe.py b/custom-recipes/sharepoint-online-append-list/recipe.py index 502c3ef..1d7395e 100644 --- a/custom-recipes/sharepoint-online-append-list/recipe.py +++ b/custom-recipes/sharepoint-online-append-list/recipe.py @@ -21,9 +21,35 @@ def convert_date_format(json_row): return json_row +def newprocess_nones(input_row, columns_types): + output_row = {} + for key in input_row: + column_value = input_row.get(key) + if key == "ID": + output_row[key] = str(column_value) + continue + column_type = columns_types.get(key, "string") + if column_type in ["int"] and (not column_value or pandas.isna(column_value)): + continue + output_row[key] = str(column_value) + return output_row + +def process_nones(json_row, columns_types): + for key in json_row: + value = json_row.get(key) + if not isinstance(value, str): + json_row[key] = "{}".format(value) + if not value or pandas.isna(value): + target_type = columns_types.get(key) + if target_type in ["int", "bigint", "float"]: + json_row[key] = None + return json_row + + input_dataset_names = get_input_names_for_role('input_dataset') input_dataset = dataiku.Dataset(input_dataset_names[0]) input_dataframe = input_dataset.get_dataframe() + input_schema = input_dataset.read_schema() output_dataset_names = get_output_names_for_role('api_output') output_dataset = dataiku.Dataset(output_dataset_names[0]) @@ -43,7 +69,11 @@ def convert_date_format(json_row): expand_lookup = config.get("expand_lookup", False) metadata_to_retrieve = config.get("metadata_to_retrieve", []) advanced_parameters = config.get("advanced_parameters", False) -write_mode = "append" +write_mode = config.get("write_mode", "append") +columns_to_update = config.get("columns_to_update", []) +if columns_to_update and "ID" not in columns_to_update: + columns_to_update.append("ID") + if not advanced_parameters: max_workers = 1 # no multithread per default batch_size = 100 @@ -57,11 +87,13 @@ def convert_date_format(json_row): display_metadata = len(metadata_to_retrieve) > 0 client = SharePointClient(config) -sharepoint_writer = client.get_writer({"columns": input_schema}, None, None, max_workers, batch_size, write_mode) +sharepoint_writer = client.get_writer({"columns": input_schema}, None, None, max_workers, + batch_size, write_mode, columns_to_update) + + with output_dataset.get_writer() as writer: for index, input_parameters_row in input_dataframe.iterrows(): - json_row = input_parameters_row.to_dict() - json_row = convert_date_format(json_row) - sharepoint_writer.write_row_dict(json_row) - writer.write_row_dict(json_row) + straighten_json_row = sharepoint_writer.pandas_row_to_json(input_parameters_row) + sharepoint_writer.write_row_dict(straighten_json_row) + writer.write_row_dict(sharepoint_writer.fix_dates_for_pandas_output(input_parameters_row)) sharepoint_writer.close() diff --git a/python-lib/dss_constants.py b/python-lib/dss_constants.py index 919870b..e69bd48 100644 --- a/python-lib/dss_constants.py +++ b/python-lib/dss_constants.py @@ -28,7 +28,7 @@ class DSSConstants(object): "sharepoint_oauth": "The access token is missing" } PATH = 'path' - PLUGIN_VERSION = "1.1.1" + PLUGIN_VERSION = "1.1.2-beta.1" SECRET_PARAMETERS_KEYS = ["Authorization", "sharepoint_username", "sharepoint_password", "client_secret"] SITE_APP_DETAILS = { "sharepoint_tenant": "The tenant name is missing", diff --git a/python-lib/sharepoint_client.py b/python-lib/sharepoint_client.py index 3c92c50..2672bef 100644 --- a/python-lib/sharepoint_client.py +++ b/python-lib/sharepoint_client.py @@ -500,6 +500,21 @@ def get_item_structure(self, list_title, item): "checkInComment": None } + def get_update_list_item_kwargs(self, list_title, list_item_entity_type_full_name, item_id, item_update): + # https://sharepoint.stackexchange.com/questions/250659/how-implement-rest-queries-in-sharepoint + list_item_update_info = self.get_list_item_update(item_update, list_item_entity_type_full_name) + headers = DSSConstants.JSON_HEADERS + headers["X-HTTP-Method"] = "MERGE" + headers["If-Match"] = "*" + update_item_url = self.get_update_item_url(list_title, item_id) + kwargs = { + "verb": "PATCH", + "url": update_item_url, + "headers": headers, + "json": list_item_update_info + } + return kwargs + @staticmethod def get_form_value(field_name, field_value): return { @@ -522,6 +537,17 @@ def get_list_item_create_info(self, list_title): } } + @staticmethod + def get_list_item_update(item, list_item_entity_type_full_name): + # https://learn.microsoft.com/en-us/sharepoint/dev/sp-add-ins/working-with-lists-and-list-items-with-rest + ret = {} + ret["__metadata"] = { + "type": "{}".format(list_item_entity_type_full_name) + } + for field_name in item: + ret[field_name] = item.get(field_name) + return ret + def process_batch(self, kwargs_array): batch_id = self.get_random_guid() change_set_id = self.get_random_guid() @@ -586,6 +612,7 @@ def log_batch_errors(self, response, kwargs_array): logger.info("Batch error analysis") statuses = re.findall('HTTP/1.1 (.*?) ', str(response.content)) dump_response_content = False + reason_to_raise = None for status, kwarg in zip(statuses, kwargs_array): if not status.startswith("20"): if dump_response_content: @@ -602,6 +629,7 @@ def log_batch_errors(self, response, kwargs_array): error_messages = re.findall('"ErrorMessage":"(.*?)}', str(response.content)) for error_message in error_messages: logger.warning("Error:'{}'".format(error_message)) + reason_to_raise = error_message if dump_response_content: if self.number_dumped_logs == 0: logger.warning("response.content={}".format(response.content)) @@ -610,6 +638,8 @@ def log_batch_errors(self, response, kwargs_array): self.number_dumped_logs += 1 else: logger.info("Batch error analysis OK") + if reason_to_raise: + raise SharePointClientError("There was at least one issue during batch processing ({}). Look into the logs for more details.".format(reason_to_raise)) def get_base_url(self): return "{}/{}/_api/Web".format( @@ -647,6 +677,13 @@ def get_list_add_item_using_path_url(self, list_title): self.escape_path(list_title) ) + def get_update_item_url(self, list_title, item_id): + return self.get_base_url() + "/GetList(@a1)/items({})?@a1='/{}/Lists/{}'".format( + item_id, + self.sharepoint_site, + self.escape_path(list_title) + ) + def get_list_fields_url(self, list_title): return self.get_lists_by_title_url(list_title) + "/fields" @@ -665,16 +702,16 @@ def get_folder_url(self, full_path): if full_path == '/': full_path = "" return self.get_base_url() + "/GetFolderByServerRelativeUrl({})".format( - self.get_site_path(full_path) + self.get_site_path(full_path.replace("#", "%23")) ) def get_file_url(self, full_path): - return self.get_base_url() + "/GetFileByServerRelativeUrl({})".format( + return self.get_base_url() + "/GetFileByServerRelativePath(decodedUrl={})".format( self.get_site_path(full_path) ) def get_file_content_url(self, full_path): - return self.get_file_url(full_path) + "/$value" + return self.get_file_url(full_path.replace("#", "%23")) + "/$value" def get_move_url(self, from_path, to_path): return self.get_file_url(from_path) + "/moveto(newurl={},flags=1)".format( @@ -853,7 +890,7 @@ def escape_path(path): return path.replace("'", "''") def get_writer(self, dataset_schema, dataset_partitioning, - partition_id, max_workers, batch_size, write_mode): + partition_id, max_workers, batch_size, write_mode, columns_to_update=[]): return SharePointListWriter( self.config, self, @@ -862,7 +899,8 @@ def get_writer(self, dataset_schema, dataset_partitioning, partition_id, max_workers=max_workers, batch_size=batch_size, - write_mode=write_mode + write_mode=write_mode, + columns_to_update=columns_to_update ) def get_read_schema(self, display_metadata=False, metadata_to_retrieve=[]): diff --git a/python-lib/sharepoint_lists.py b/python-lib/sharepoint_lists.py index 5effbf0..3545f39 100644 --- a/python-lib/sharepoint_lists.py +++ b/python-lib/sharepoint_lists.py @@ -1,4 +1,5 @@ import datetime +import pandas from concurrent.futures import ThreadPoolExecutor, as_completed from sharepoint_constants import SharePointConstants from dss_constants import DSSConstants @@ -54,6 +55,8 @@ def assert_list_title(list_title): def dss_to_sharepoint_date(date): + if "T" not in date or "Z" not in date: + return date return format_date(date, DSSConstants.DATE_FORMAT, SharePointConstants.DATE_FORMAT) @@ -78,9 +81,18 @@ def format_date(date, from_format, to_format): return date +def get_columns_types(input_schema): + columns_types = {} + for column_schema in input_schema: + column_name = column_schema.get("name", "") + column_type = column_schema.get("type", "string") + columns_types[column_name] = column_type + return columns_types + + class SharePointListWriter(object): - def __init__(self, config, client, dataset_schema, dataset_partitioning, partition_id, max_workers=5, batch_size=100, write_mode="create"): + def __init__(self, config, client, dataset_schema, dataset_partitioning, partition_id, max_workers=5, batch_size=100, write_mode="create", columns_to_update=[]): self.client = client self.config = config self.dataset_schema = dataset_schema @@ -89,10 +101,13 @@ def __init__(self, config, client, dataset_schema, dataset_partitioning, partiti self.buffer = [] logger.info('init SharepointListWriter with {} workers and batch size of {}'.format(max_workers, batch_size)) self.columns = dataset_schema[SharePointConstants.COLUMNS] + self.dss_columns_types = get_columns_types(self.columns) self.sharepoint_column_ids = {} self.sharepoint_existing_column_names = {} self.sharepoint_existing_column_entity_property_names = {} self.web_name = self.client.sharepoint_list_title + self.write_mode = write_mode + self.columns_to_update = columns_to_update if write_mode == SharePointConstants.WRITE_MODE_CREATE: logger.info('flush:recycle_list "{}"'.format(self.client.sharepoint_list_title)) @@ -132,14 +147,52 @@ def write_row(self, row): def write_row_dict(self, row_dict): row = [] for element in row_dict: - row.append(str(row_dict.get(element))) + row.append(row_dict.get(element)) self.write_row(row) + def pandas_row_to_json(self, input_pandas_row): + input_row = input_pandas_row.to_dict() + # Now lets fix what has been savaged by the panda + output_row = {} + for key in input_row: + target_type = self.dss_columns_types.get(key) + value = input_row.get(key) + if not value or pandas.isna(value): + straighten_value = None + elif target_type in ["int", "bigint"]: + if isinstance(value, int): + straighten_value = str(value) + else: + # If there was one NaN in the int column, the whole column has been converted in float + # Because, obviously... + straighten_value = str(int(value)) + else: + straighten_value = str(value) + output_row[key] = straighten_value + return output_row + + def fix_dates_for_pandas_output(self, input_pandas_row): + input_row = input_pandas_row.to_dict() + fixed_output_row = {} + for key in input_row: + target_type = self.dss_columns_types.get(key) + value = input_row.get(key) + if pandas.isna(value): + fixed_output_row[key] = None + elif target_type == "date": + fixed_output_row[key] = value.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + else: + fixed_output_row[key] = value + return fixed_output_row + def flush(self): - if self.max_workers > 1: - self.upload_rows_multithreaded() + if self.write_mode == "update": + self.update_rows() else: - self.upload_rows() + if self.max_workers > 1: + self.upload_rows_multithreaded() + else: + self.upload_rows() def upload_rows_multithreaded(self): logger.info("Starting multithreaded rows adding") @@ -171,6 +224,18 @@ def upload_rows(self): self.client.process_batch(kwargs) logger.info("{} items written".format(len(kwargs))) + def update_rows(self): + logger.info("Starting updating items") + kwargs = [] + for row in self.buffer: + item = self.build_row_dictionary(row, self.columns_to_update) + item_id = item.pop("ID", None) + if item_id is None: + raise Exception("Item in column 'ID' cannot be left empty") + kwargs.append(self.client.get_update_list_item_kwargs(self.web_name, self.list_item_entity_type_full_name, item_id, item)) + self.client.process_batch(kwargs) + logger.info("{} items written".format(len(kwargs))) + def create_sharepoint_columns(self): """ Create the list's columns on SP, retrieve their SP id and map it to their DSS column name """ logger.info("create_sharepoint_columns") @@ -198,9 +263,12 @@ def create_sharepoint_columns(self): else: self.sharepoint_column_ids[dss_column_name] = dss_column_name - def build_row_dictionary(self, row): + def build_row_dictionary(self, row, columns_to_update=None): ret = {} for column, structure in zip(row, self.columns): + if columns_to_update: + if structure[SharePointConstants.NAME_COLUMN] not in columns_to_update: + continue key_to_use = self.sharepoint_existing_column_names.get( structure[SharePointConstants.NAME_COLUMN], self.sharepoint_column_ids[structure[SharePointConstants.NAME_COLUMN]] From f754aed17596401ac6fc6f7a221cb53fee2317b5 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Tue, 21 May 2024 15:38:11 +0200 Subject: [PATCH 2/5] Adding integration test --- tests/python/integration/test_scenario.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/python/integration/test_scenario.py b/tests/python/integration/test_scenario.py index 011af71..1843508 100644 --- a/tests/python/integration/test_scenario.py +++ b/tests/python/integration/test_scenario.py @@ -37,3 +37,6 @@ def test_run_sharepoint_online_file_overwrite(user_dss_clients): def test_run_sharepoint_online_append_to_list_recipe(user_dss_clients): dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="APPENDTOLISTRECIPE") + +def test_run_sharepoint_online_update_individual_list_rows(user_dss_clients): + dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="UPDATEINDIVIDUALLISTROWS") From 0b00f587dd0c7e5cfa719834d585da3a563f7091 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Tue, 28 May 2024 10:23:04 +0200 Subject: [PATCH 3/5] cleaning --- .../sharepoint-online-append-list/recipe.py | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/custom-recipes/sharepoint-online-append-list/recipe.py b/custom-recipes/sharepoint-online-append-list/recipe.py index 1d7395e..459372a 100644 --- a/custom-recipes/sharepoint-online-append-list/recipe.py +++ b/custom-recipes/sharepoint-online-append-list/recipe.py @@ -21,35 +21,9 @@ def convert_date_format(json_row): return json_row -def newprocess_nones(input_row, columns_types): - output_row = {} - for key in input_row: - column_value = input_row.get(key) - if key == "ID": - output_row[key] = str(column_value) - continue - column_type = columns_types.get(key, "string") - if column_type in ["int"] and (not column_value or pandas.isna(column_value)): - continue - output_row[key] = str(column_value) - return output_row - -def process_nones(json_row, columns_types): - for key in json_row: - value = json_row.get(key) - if not isinstance(value, str): - json_row[key] = "{}".format(value) - if not value or pandas.isna(value): - target_type = columns_types.get(key) - if target_type in ["int", "bigint", "float"]: - json_row[key] = None - return json_row - - input_dataset_names = get_input_names_for_role('input_dataset') input_dataset = dataiku.Dataset(input_dataset_names[0]) input_dataframe = input_dataset.get_dataframe() - input_schema = input_dataset.read_schema() output_dataset_names = get_output_names_for_role('api_output') output_dataset = dataiku.Dataset(output_dataset_names[0]) From 33517cdacf681ccd51d008fdee8cade92fb7877f Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Wed, 5 Jun 2024 10:51:09 +0200 Subject: [PATCH 4/5] remove 'create' from possible write modes for append recipe --- custom-recipes/sharepoint-online-append-list/recipe.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/custom-recipes/sharepoint-online-append-list/recipe.py b/custom-recipes/sharepoint-online-append-list/recipe.py index 459372a..fec59a7 100644 --- a/custom-recipes/sharepoint-online-append-list/recipe.py +++ b/custom-recipes/sharepoint-online-append-list/recipe.py @@ -21,6 +21,13 @@ def convert_date_format(json_row): return json_row +def get_write_mode(config): + write_mode = config.get("write_mode", "append") + if write_mode == "create": + write_mode = "append" + return write_mode + + input_dataset_names = get_input_names_for_role('input_dataset') input_dataset = dataiku.Dataset(input_dataset_names[0]) input_dataframe = input_dataset.get_dataframe() @@ -43,7 +50,7 @@ def convert_date_format(json_row): expand_lookup = config.get("expand_lookup", False) metadata_to_retrieve = config.get("metadata_to_retrieve", []) advanced_parameters = config.get("advanced_parameters", False) -write_mode = config.get("write_mode", "append") +write_mode = get_write_mode(config) columns_to_update = config.get("columns_to_update", []) if columns_to_update and "ID" not in columns_to_update: columns_to_update.append("ID") From a040edb87edce55c88f164903ae97b10aef1dd77 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Tue, 11 Jun 2024 12:33:38 +0200 Subject: [PATCH 5/5] change behavior if no ID --- python-lib/sharepoint_lists.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python-lib/sharepoint_lists.py b/python-lib/sharepoint_lists.py index 3545f39..3dff4a0 100644 --- a/python-lib/sharepoint_lists.py +++ b/python-lib/sharepoint_lists.py @@ -231,8 +231,10 @@ def update_rows(self): item = self.build_row_dictionary(row, self.columns_to_update) item_id = item.pop("ID", None) if item_id is None: - raise Exception("Item in column 'ID' cannot be left empty") - kwargs.append(self.client.get_update_list_item_kwargs(self.web_name, self.list_item_entity_type_full_name, item_id, item)) + kwargs.append(self.client.get_add_list_item_kwargs(self.web_name, item)) + # raise Exception("Item in column 'ID' cannot be left empty") + else: + kwargs.append(self.client.get_update_list_item_kwargs(self.web_name, self.list_item_entity_type_full_name, item_id, item)) self.client.process_batch(kwargs) logger.info("{} items written".format(len(kwargs)))