Skip to content

Commit

Permalink
moved to pickle from parquet for Redis transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
Saul Frank committed Feb 8, 2023
1 parent 7aaaa12 commit 3716e4b
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .devcontainer/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ services:

# Overrides default command so things don't shut down after the process ends.
command: /bin/sh -c "while sleep 1000; do :; done"
environment:
S3_HOST: "http://minio:9000"
REDIS_HOST: "redis-service"

# Use "forwardPorts" in **devcontainer.json** to forward an app port locally.
# (Adding the "ports" property to this file will not forward from a Codespace.)
Expand Down
1 change: 0 additions & 1 deletion .devcontainer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ redis==4.3.4
pandas==1.5.1
boto3==1.24.93
pytest==7.1.3
pyarrow==9.0.0
nanoid==0.1
requests==2.28.1
python-dotenv==0.21.0
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[build-system]
requires = [ "setuptools>=61.0", "redis>=1.0.0", "pyarrow>1.0.0", "pandas>1.0.0", "requests>1.0.0",]
requires = [ "setuptools>=61.0", "redis>=1.0.0", "pandas>1.0.0", "requests>1.0.0", "boto3>1.0.0",]
build-backend = "setuptools.build_meta"

[project]
Expand Down
2 changes: 1 addition & 1 deletion src/dataplane/Microsoft/Sharepoint/sharepoint_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def sharepoint_download(Host, TenantID, ClientID, Secret, SiteName, SharepointF

if ItemID.status_code != 200:
duration = datetime.now() - start
return {"result":"Fail", "reason":"Get upload session", "duration": str(duration), "status": ItemID.status_code, "error": ItemID.json(), "payload": json.dumps(payload), "url": url}
return {"result":"Fail", "reason":"Get download session", "duration": str(duration), "status": ItemID.status_code, "error": ItemID.json(), "payload": json.dumps(payload), "url": url}

ItemID = ItemID.json()

Expand Down
2 changes: 1 addition & 1 deletion src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def sharepoint_upload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath
upload = requests.put(UploadUrl["uploadUrl"], data=UploadObject, headers=headers, proxies=proxies)
if upload.status_code != 201:
duration = datetime.now() - start
return {"result":"Fail", "reason":"Upload file", "duration": str(duration), "status": upload.status_code, "error": upload.json()}
return {"result":"non 201", "reason":"Upload file", "duration": str(duration), "status": upload.status_code, "response": upload.json()}

duration = datetime.now() - start

Expand Down
4 changes: 2 additions & 2 deletions src/dataplane/pipelinerun/data_persist/pandas_redis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def pipeline_pandas_redis_store(StoreKey, DataFrame, Redis, Expire=True, ExpireD
raise Exception("Redis connection failed.")

buffer = io.BytesIO()
DataFrame.to_parquet(buffer, compression='gzip')
DataFrame.to_pickle(buffer, compression='gzip')
buffer.seek(0) # re-set the pointer to the beginning after reading

if Expire:
Expand Down Expand Up @@ -69,7 +69,7 @@ def pipeline_pandas_redis_get(StoreKey, Redis):
buffer = io.BytesIO(Redis.get(InsertKey))
buffer.seek(0)
import pandas as pd
df = pd.read_parquet(buffer)
df = pd.read_pickle(buffer,compression='gzip')

duration = datetime.now() - start

Expand Down
4 changes: 2 additions & 2 deletions src/dataplane/pipelinerun/data_persist/pandas_s3_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def pipeline_pandas_s3_store(StoreKey, DataFrame, S3Client, Bucket, Expire=True,
InsertKey = f"/dataplane-transfer/{EnvID}/" + StoreKey+ "-" +os.getenv("DP_RUNID")+".parquet"

output_buffer=BytesIO()
DataFrame.to_parquet(output_buffer,index=False,compression='gzip',engine='pyarrow',allow_truncated_timestamps=True)
DataFrame.to_pickle(output_buffer,compression='gzip')
S3Client.put_object(Bucket=Bucket,Key=InsertKey,Body=output_buffer.getvalue())

duration = datetime.now() - start
Expand Down Expand Up @@ -53,7 +53,7 @@ def pipeline_pandas_s3_get(StoreKey, S3Client, Bucket):
# buffer = BytesIO()
objectGet = S3Client.get_object(Bucket=Bucket, Key=InsertKey, ChecksumMode='ENABLED')["Body"].read()
import pandas as pd
df = pd.read_parquet(BytesIO(objectGet))
df = pd.read_pickle(BytesIO(objectGet),compression='gzip')

duration = datetime.now() - start

Expand Down

0 comments on commit 3716e4b

Please sign in to comment.