-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcode1.py
52 lines (41 loc) · 1.6 KB
/
code1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
# command:
#spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 code1.py
spark = SparkSession.builder.appName('abc1').master('local[1]').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
schema = StructType([
StructField("symbol", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("priceData", StructType([
StructField("close", FloatType(), True),
StructField("high", FloatType(), True),
StructField("low", FloatType(), True),
StructField("open", FloatType(), True),
StructField("volume", FloatType(), True)
]), True)
])
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "52.55.237.11:9092") \
.option("subscribe", "stockData") \
.option("auto.offset.reset","latest") \
.load()
df = df.selectExpr('CAST(value AS STRING)') \
.select(from_json('value', schema).alias("value" )) \
.select("value.symbol","value.timestamp","value.priceData.high","value.priceData.low","value.priceData.open","value.priceData.close","value.priceData.volume")
df = df \
.withWatermark("timestamp", "1 minutes") \
.groupBy(
df.symbol,
df.timestamp, window(df.timestamp, "10 minutes", "5 minutes")) \
.agg(F.avg(df.close).alias("Moving Average"))
df.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start() \
.awaitTermination()