-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDBReader.py
32 lines (26 loc) · 1.21 KB
/
DBReader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import pyspark.sql
from pyspark.sql.types import *
import os
#url = jdbc:postgresql://localhost:5432/<db_name>
PROJECT_PATH = os.getcwd()
def readDB(spark):
url = "jdbc:postgresql://localhost:5432/traffic_limits?user=postgres&password=newpassword"
dataframe = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/traffic_limits") \
.option("dbtable", "limits_per_hour") \
.option("user", "postgres") \
.option("password", "newpassword") \
.load()
dataframe.show()
dataframe.registerTempTable("df_table")
limits = spark.sql("SELECT limit_name, limit_value, MAX(effective_date) as last_date "+ \
"FROM df_table GROUP BY limit_name, limit_value")
return limits
def createDBstream(spark):
schema = StructType([ StructField("limit_name", StringType(), True),
StructField("limit_value", IntegerType(), True),
StructField("effective_date", TimestampType(), True) ])
streamingDF = (spark.readStream.schema(schema).option("maxFilesPerTrigger", 1) \
.json(PROJECT_PATH+"/traffic_limits.json"))
return streamingDF