-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog_parser.py
34 lines (30 loc) · 1.15 KB
/
log_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType, IntegerType
from pyspark.sql.functions import from_json, col
def get_log_schema():
schema = StructType([
StructField("timestamp", TimestampType(), True),
StructField("user_id", StringType(), True),
StructField("session_id", StringType(), True),
StructField("activity", StringType(), True),
StructField("duration", IntegerType(), True),
StructField("error", BooleanType(), True),
StructField("message", StringType(), True)
])
return schema
def parse_log(df, json_column="json_string"):
schema = get_log_schema()
parsed_df = df.withColumn("data", from_json(col(json_column), schema)).select("data.*")
return parsed_df
def parse_json_string(json_string):
try:
data = json.loads(json_string)
return data
except Exception as e:
return None
def validate_log(log):
required_fields = ["timestamp", "user_id", "session_id", "activity", "duration", "error"]
for field in required_fields:
if field not in log:
return False
return True