From 15d3149fc12e944f850f52ea4e2ac1f493d3eb69 Mon Sep 17 00:00:00 2001 From: saul-data Date: Wed, 28 Dec 2022 23:29:59 +0400 Subject: [PATCH] updated for redis store --- src/dataplane/__init__.py | 7 ++ .../pipelinerun/data_persist/redis_store.py | 69 +++++++++++++++++++ .../data_persist/test_pandas_redis.py | 50 ++++++++++++++ .../pipelinerun/data_persist/test_redis.py | 24 ++----- 4 files changed, 132 insertions(+), 18 deletions(-) create mode 100644 src/dataplane/pipelinerun/data_persist/redis_store.py create mode 100644 src/dataplane/pipelinerun/data_persist/test_pandas_redis.py diff --git a/src/dataplane/__init__.py b/src/dataplane/__init__.py index eb59423..19ef4dd 100644 --- a/src/dataplane/__init__.py +++ b/src/dataplane/__init__.py @@ -3,6 +3,11 @@ pipeline_pandas_redis_get, ) +from dataplane.pipelinerun.data_persist.redis_store import ( + pipeline_redis_store, + pipeline_redis_get, +) + from dataplane.pipelinerun.data_persist.pandas_s3_store import ( pipeline_pandas_s3_get, pipeline_pandas_s3_store, @@ -27,6 +32,8 @@ "hello", # Pipeline transfers + "pipeline_redis_store", + "pipeline_redis_get", "pipeline_pandas_redis_store", "pipeline_pandas_redis_get", "pipeline_pandas_s3_get", diff --git a/src/dataplane/pipelinerun/data_persist/redis_store.py b/src/dataplane/pipelinerun/data_persist/redis_store.py new file mode 100644 index 0000000..45e00ba --- /dev/null +++ b/src/dataplane/pipelinerun/data_persist/redis_store.py @@ -0,0 +1,69 @@ +from datetime import timedelta + +def RedisCheck(r): + + import redis + + try: + r.ping() + except (redis.exceptions.ConnectionError, ConnectionRefusedError): + print("Redis connection error.", redis.exceptions.ConnectionError, ConnectionRefusedError) + return False + return True + +""" StoreKey: is the key to look up for retrieval later on. +Redis: e.g. Redis = redis.Redis(host='redis-service', port=6379, db=0) +Value: The value to pass +Expire: Expires the data if true. +ExpireDuration: If expires is true, how much time to expire. Default 15 mins +""" +def pipeline_redis_store(StoreKey, Value, Redis, Expire=True, ExpireDuration=timedelta(minutes=15)): + + import os + import io + from datetime import datetime + + # Start the timer + start = datetime.now() + + InsertKey = StoreKey+ "-" +os.getenv("DP_RUNID") + + # Connect to Redis + if RedisCheck(Redis) == False: + raise Exception("Redis connection failed.") + + if Expire: + Redis.setex(InsertKey, ExpireDuration, value=Value) + else: + Redis.set(InsertKey, value=Value) + + duration = datetime.now() - start + + return {"result":"OK", "duration": str(duration), "key":InsertKey} + + +""" +StoreKey: is the key to look up for retrieval (set with RedisStore). +Redis: e.g. Redis = redis.Redis(host='redis-service', port=6379, db=0) +""" +def pipeline_redis_get(StoreKey, Redis): + + import os + import io + from datetime import datetime + + # Start the timer + start = datetime.now() + + InsertKey = StoreKey+ "-" +os.getenv("DP_RUNID") + + # Connect to Redis + if RedisCheck(Redis) == False: + raise Exception("Redis connection failed.") + + # Retrieve dataframe from key + value = Redis.get(InsertKey) + + duration = datetime.now() - start + + return {"result":"OK", "duration": str(duration), "key":InsertKey,"value": value} \ No newline at end of file diff --git a/src/dataplane/pipelinerun/data_persist/test_pandas_redis.py b/src/dataplane/pipelinerun/data_persist/test_pandas_redis.py new file mode 100644 index 0000000..b1370dc --- /dev/null +++ b/src/dataplane/pipelinerun/data_persist/test_pandas_redis.py @@ -0,0 +1,50 @@ + +import os +from .pandas_redis_store import pipeline_pandas_redis_store +from .pandas_redis_store import pipeline_pandas_redis_get +import redis +from datetime import timedelta +from nanoid import generate +from dotenv import load_dotenv + +def test_pandas_redis_store(): + + load_dotenv() + + # ---------- Dataplane pipeline run ------------ + REDIS_HOST = os.environ["REDIS_HOST"] + print("Redis:", REDIS_HOST) + + # Dataplane run id + os.environ["DP_RUNID"] = generate('1234567890abcdef', 10) + + # Data to store in Redis as parquet + data = { + "calories": [420, 380, 390], + "duration": [50, 40, 45] + } + import pandas as pd + df = pd.DataFrame(data) + dfrows = df.shape[0] + + # Redis connection + redisConnect = redis.Redis(host=REDIS_HOST, port=6379, db=0) + + + # ---------- STORE PARQUET TO REDIS ------------ + + # Store the data with key hello - run id will be attached + rs = pipeline_pandas_redis_store(StoreKey="hello", DataFrame=df, Redis=redisConnect, Expire=True, ExpireDuration=timedelta(minutes=15)) + print(rs) + assert rs["result"]=="OK" + + # ---------- RETRIEVE PARQUET FROM REDIS ------------ + + # Get the data + rsget = pipeline_pandas_redis_get(StoreKey="hello", Redis=redisConnect) + print(rsget) + df = rsget["dataframe"] + print(df.shape[0]) + # Test before and after rows + assert df.shape[0] == dfrows + assert rsget["result"]=="OK" \ No newline at end of file diff --git a/src/dataplane/pipelinerun/data_persist/test_redis.py b/src/dataplane/pipelinerun/data_persist/test_redis.py index 2fadb92..6621eb3 100644 --- a/src/dataplane/pipelinerun/data_persist/test_redis.py +++ b/src/dataplane/pipelinerun/data_persist/test_redis.py @@ -1,7 +1,7 @@ import os -from .pandas_redis_store import pipeline_pandas_redis_store -from .pandas_redis_store import pipeline_pandas_redis_get +from .redis_store import pipeline_redis_store +from .redis_store import pipeline_redis_get import redis from datetime import timedelta from nanoid import generate @@ -19,32 +19,20 @@ def test_redis_store(): os.environ["DP_RUNID"] = generate('1234567890abcdef', 10) # Data to store in Redis as parquet - data = { - "calories": [420, 380, 390], - "duration": [50, 40, 45] - } - import pandas as pd - df = pd.DataFrame(data) - dfrows = df.shape[0] + data = "hi there 123" # Redis connection redisConnect = redis.Redis(host=REDIS_HOST, port=6379, db=0) - - - # ---------- STORE PARQUET TO REDIS ------------ # Store the data with key hello - run id will be attached - rs = pipeline_pandas_redis_store(StoreKey="hello", DataFrame=df, Redis=redisConnect, Expire=True, ExpireDuration=timedelta(minutes=15)) + rs = pipeline_redis_store(StoreKey="hello", Value=data, Redis=redisConnect, Expire=True, ExpireDuration=timedelta(minutes=15)) print(rs) assert rs["result"]=="OK" # ---------- RETRIEVE PARQUET FROM REDIS ------------ # Get the data - rsget = pipeline_pandas_redis_get(StoreKey="hello", Redis=redisConnect) - print(rsget) - df = rsget["dataframe"] - print(df.shape[0]) + rsget = pipeline_redis_get(StoreKey="hello", Redis=redisConnect) # Test before and after rows - assert df.shape[0] == dfrows + assert rsget["value"] == data assert rsget["result"]=="OK" \ No newline at end of file