From 3716e4b4662f2c023095cff401d46f7f351f40e2 Mon Sep 17 00:00:00 2001 From: Saul Frank Date: Wed, 8 Feb 2023 21:00:57 +0000 Subject: [PATCH] moved to pickle from parquet for Redis transfer --- .devcontainer/docker-compose.yaml | 3 +++ .devcontainer/requirements.txt | 1 - pyproject.toml | 2 +- src/dataplane/Microsoft/Sharepoint/sharepoint_download.py | 2 +- src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py | 2 +- src/dataplane/pipelinerun/data_persist/pandas_redis_store.py | 4 ++-- src/dataplane/pipelinerun/data_persist/pandas_s3_store.py | 4 ++-- 7 files changed, 10 insertions(+), 8 deletions(-) diff --git a/.devcontainer/docker-compose.yaml b/.devcontainer/docker-compose.yaml index b9c3136..01f7f4b 100644 --- a/.devcontainer/docker-compose.yaml +++ b/.devcontainer/docker-compose.yaml @@ -55,6 +55,9 @@ services: # Overrides default command so things don't shut down after the process ends. command: /bin/sh -c "while sleep 1000; do :; done" + environment: + S3_HOST: "http://minio:9000" + REDIS_HOST: "redis-service" # Use "forwardPorts" in **devcontainer.json** to forward an app port locally. # (Adding the "ports" property to this file will not forward from a Codespace.) diff --git a/.devcontainer/requirements.txt b/.devcontainer/requirements.txt index 91db433..53854ef 100644 --- a/.devcontainer/requirements.txt +++ b/.devcontainer/requirements.txt @@ -4,7 +4,6 @@ redis==4.3.4 pandas==1.5.1 boto3==1.24.93 pytest==7.1.3 -pyarrow==9.0.0 nanoid==0.1 requests==2.28.1 python-dotenv==0.21.0 diff --git a/pyproject.toml b/pyproject.toml index 75b3fb4..80e854a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = [ "setuptools>=61.0", "redis>=1.0.0", "pyarrow>1.0.0", "pandas>1.0.0", "requests>1.0.0",] +requires = [ "setuptools>=61.0", "redis>=1.0.0", "pandas>1.0.0", "requests>1.0.0", "boto3>1.0.0",] build-backend = "setuptools.build_meta" [project] diff --git a/src/dataplane/Microsoft/Sharepoint/sharepoint_download.py b/src/dataplane/Microsoft/Sharepoint/sharepoint_download.py index 97a5cb7..e616018 100644 --- a/src/dataplane/Microsoft/Sharepoint/sharepoint_download.py +++ b/src/dataplane/Microsoft/Sharepoint/sharepoint_download.py @@ -95,7 +95,7 @@ def sharepoint_download(Host, TenantID, ClientID, Secret, SiteName, SharepointF if ItemID.status_code != 200: duration = datetime.now() - start - return {"result":"Fail", "reason":"Get upload session", "duration": str(duration), "status": ItemID.status_code, "error": ItemID.json(), "payload": json.dumps(payload), "url": url} + return {"result":"Fail", "reason":"Get download session", "duration": str(duration), "status": ItemID.status_code, "error": ItemID.json(), "payload": json.dumps(payload), "url": url} ItemID = ItemID.json() diff --git a/src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py b/src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py index 436d53e..724d28c 100644 --- a/src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py +++ b/src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py @@ -135,7 +135,7 @@ def sharepoint_upload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath upload = requests.put(UploadUrl["uploadUrl"], data=UploadObject, headers=headers, proxies=proxies) if upload.status_code != 201: duration = datetime.now() - start - return {"result":"Fail", "reason":"Upload file", "duration": str(duration), "status": upload.status_code, "error": upload.json()} + return {"result":"non 201", "reason":"Upload file", "duration": str(duration), "status": upload.status_code, "response": upload.json()} duration = datetime.now() - start diff --git a/src/dataplane/pipelinerun/data_persist/pandas_redis_store.py b/src/dataplane/pipelinerun/data_persist/pandas_redis_store.py index ef69ec1..b954313 100644 --- a/src/dataplane/pipelinerun/data_persist/pandas_redis_store.py +++ b/src/dataplane/pipelinerun/data_persist/pandas_redis_store.py @@ -33,7 +33,7 @@ def pipeline_pandas_redis_store(StoreKey, DataFrame, Redis, Expire=True, ExpireD raise Exception("Redis connection failed.") buffer = io.BytesIO() - DataFrame.to_parquet(buffer, compression='gzip') + DataFrame.to_pickle(buffer, compression='gzip') buffer.seek(0) # re-set the pointer to the beginning after reading if Expire: @@ -69,7 +69,7 @@ def pipeline_pandas_redis_get(StoreKey, Redis): buffer = io.BytesIO(Redis.get(InsertKey)) buffer.seek(0) import pandas as pd - df = pd.read_parquet(buffer) + df = pd.read_pickle(buffer,compression='gzip') duration = datetime.now() - start diff --git a/src/dataplane/pipelinerun/data_persist/pandas_s3_store.py b/src/dataplane/pipelinerun/data_persist/pandas_s3_store.py index 22a9bac..cab4412 100644 --- a/src/dataplane/pipelinerun/data_persist/pandas_s3_store.py +++ b/src/dataplane/pipelinerun/data_persist/pandas_s3_store.py @@ -23,7 +23,7 @@ def pipeline_pandas_s3_store(StoreKey, DataFrame, S3Client, Bucket, Expire=True, InsertKey = f"/dataplane-transfer/{EnvID}/" + StoreKey+ "-" +os.getenv("DP_RUNID")+".parquet" output_buffer=BytesIO() - DataFrame.to_parquet(output_buffer,index=False,compression='gzip',engine='pyarrow',allow_truncated_timestamps=True) + DataFrame.to_pickle(output_buffer,compression='gzip') S3Client.put_object(Bucket=Bucket,Key=InsertKey,Body=output_buffer.getvalue()) duration = datetime.now() - start @@ -53,7 +53,7 @@ def pipeline_pandas_s3_get(StoreKey, S3Client, Bucket): # buffer = BytesIO() objectGet = S3Client.get_object(Bucket=Bucket, Key=InsertKey, ChecksumMode='ENABLED')["Body"].read() import pandas as pd - df = pd.read_parquet(BytesIO(objectGet)) + df = pd.read_pickle(BytesIO(objectGet),compression='gzip') duration = datetime.now() - start