Skip to content

Commit

Permalink
updated for redis store
Browse files Browse the repository at this point in the history
  • Loading branch information
saul-data committed Dec 28, 2022
1 parent bd44d48 commit 15d3149
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 18 deletions.
7 changes: 7 additions & 0 deletions src/dataplane/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
pipeline_pandas_redis_get,
)

from dataplane.pipelinerun.data_persist.redis_store import (
pipeline_redis_store,
pipeline_redis_get,
)

from dataplane.pipelinerun.data_persist.pandas_s3_store import (
pipeline_pandas_s3_get,
pipeline_pandas_s3_store,
Expand All @@ -27,6 +32,8 @@
"hello",

# Pipeline transfers
"pipeline_redis_store",
"pipeline_redis_get",
"pipeline_pandas_redis_store",
"pipeline_pandas_redis_get",
"pipeline_pandas_s3_get",
Expand Down
69 changes: 69 additions & 0 deletions src/dataplane/pipelinerun/data_persist/redis_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from datetime import timedelta

def RedisCheck(r):

import redis

try:
r.ping()
except (redis.exceptions.ConnectionError, ConnectionRefusedError):
print("Redis connection error.", redis.exceptions.ConnectionError, ConnectionRefusedError)
return False
return True

""" StoreKey: is the key to look up for retrieval later on.
Redis: e.g. Redis = redis.Redis(host='redis-service', port=6379, db=0)
Value: The value to pass
Expire: Expires the data if true.
ExpireDuration: If expires is true, how much time to expire. Default 15 mins
"""
def pipeline_redis_store(StoreKey, Value, Redis, Expire=True, ExpireDuration=timedelta(minutes=15)):

import os
import io
from datetime import datetime

# Start the timer
start = datetime.now()

InsertKey = StoreKey+ "-" +os.getenv("DP_RUNID")

# Connect to Redis
if RedisCheck(Redis) == False:
raise Exception("Redis connection failed.")

if Expire:
Redis.setex(InsertKey, ExpireDuration, value=Value)
else:
Redis.set(InsertKey, value=Value)

duration = datetime.now() - start

return {"result":"OK", "duration": str(duration), "key":InsertKey}


"""
StoreKey: is the key to look up for retrieval (set with RedisStore).
Redis: e.g. Redis = redis.Redis(host='redis-service', port=6379, db=0)
"""
def pipeline_redis_get(StoreKey, Redis):

import os
import io
from datetime import datetime

# Start the timer
start = datetime.now()

InsertKey = StoreKey+ "-" +os.getenv("DP_RUNID")

# Connect to Redis
if RedisCheck(Redis) == False:
raise Exception("Redis connection failed.")

# Retrieve dataframe from key
value = Redis.get(InsertKey)

duration = datetime.now() - start

return {"result":"OK", "duration": str(duration), "key":InsertKey,"value": value}
50 changes: 50 additions & 0 deletions src/dataplane/pipelinerun/data_persist/test_pandas_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

import os
from .pandas_redis_store import pipeline_pandas_redis_store
from .pandas_redis_store import pipeline_pandas_redis_get
import redis
from datetime import timedelta
from nanoid import generate
from dotenv import load_dotenv

def test_pandas_redis_store():

load_dotenv()

# ---------- Dataplane pipeline run ------------
REDIS_HOST = os.environ["REDIS_HOST"]
print("Redis:", REDIS_HOST)

# Dataplane run id
os.environ["DP_RUNID"] = generate('1234567890abcdef', 10)

# Data to store in Redis as parquet
data = {
"calories": [420, 380, 390],
"duration": [50, 40, 45]
}
import pandas as pd
df = pd.DataFrame(data)
dfrows = df.shape[0]

# Redis connection
redisConnect = redis.Redis(host=REDIS_HOST, port=6379, db=0)


# ---------- STORE PARQUET TO REDIS ------------

# Store the data with key hello - run id will be attached
rs = pipeline_pandas_redis_store(StoreKey="hello", DataFrame=df, Redis=redisConnect, Expire=True, ExpireDuration=timedelta(minutes=15))
print(rs)
assert rs["result"]=="OK"

# ---------- RETRIEVE PARQUET FROM REDIS ------------

# Get the data
rsget = pipeline_pandas_redis_get(StoreKey="hello", Redis=redisConnect)
print(rsget)
df = rsget["dataframe"]
print(df.shape[0])
# Test before and after rows
assert df.shape[0] == dfrows
assert rsget["result"]=="OK"
24 changes: 6 additions & 18 deletions src/dataplane/pipelinerun/data_persist/test_redis.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import os
from .pandas_redis_store import pipeline_pandas_redis_store
from .pandas_redis_store import pipeline_pandas_redis_get
from .redis_store import pipeline_redis_store
from .redis_store import pipeline_redis_get
import redis
from datetime import timedelta
from nanoid import generate
Expand All @@ -19,32 +19,20 @@ def test_redis_store():
os.environ["DP_RUNID"] = generate('1234567890abcdef', 10)

# Data to store in Redis as parquet
data = {
"calories": [420, 380, 390],
"duration": [50, 40, 45]
}
import pandas as pd
df = pd.DataFrame(data)
dfrows = df.shape[0]
data = "hi there 123"

# Redis connection
redisConnect = redis.Redis(host=REDIS_HOST, port=6379, db=0)


# ---------- STORE PARQUET TO REDIS ------------

# Store the data with key hello - run id will be attached
rs = pipeline_pandas_redis_store(StoreKey="hello", DataFrame=df, Redis=redisConnect, Expire=True, ExpireDuration=timedelta(minutes=15))
rs = pipeline_redis_store(StoreKey="hello", Value=data, Redis=redisConnect, Expire=True, ExpireDuration=timedelta(minutes=15))
print(rs)
assert rs["result"]=="OK"

# ---------- RETRIEVE PARQUET FROM REDIS ------------

# Get the data
rsget = pipeline_pandas_redis_get(StoreKey="hello", Redis=redisConnect)
print(rsget)
df = rsget["dataframe"]
print(df.shape[0])
rsget = pipeline_redis_get(StoreKey="hello", Redis=redisConnect)
# Test before and after rows
assert df.shape[0] == dfrows
assert rsget["value"] == data
assert rsget["result"]=="OK"

0 comments on commit 15d3149

Please sign in to comment.