Skip to content

Commit

Permalink
feat: add sdc metadata (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgpayne authored Jan 5, 2023
1 parent e319c01 commit d26ab2b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 27 deletions.
4 changes: 4 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ plugins:
kind: string
- name: s3.paths
kind: array
- name: add_record_metadata
kind: boolean
- name: tap-carbon-intensity
variant: meltano
pip_url: git+https://gitlab.com/meltano/tap-carbon-intensity.git
Expand All @@ -66,6 +68,8 @@ plugins:
kind: string
- name: s3.prefix
kind: string
- name: add_record_metadata
kind: boolean
jobs:
- name: job-simple-test
tasks:
Expand Down
90 changes: 63 additions & 27 deletions target_singer_jsonl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import sys
import time
from datetime import datetime
from functools import reduce
from pathlib import Path
Expand All @@ -23,11 +24,13 @@
"destination": "local",
"s3": {"bucket": "my-s3-bucket", "prefix": "put/files/in/here/"},
"local": {"folder": ".secrets/output/"},
"add_record_metadata": False,
}

stream_files = {}
stream_lines = {}
now = datetime.now().strftime("%Y%m%dT%H%M%S%z")
file_timestamp = datetime.now().strftime("%Y%m%dT%H%M%S%z")
target_start_timestamp = datetime.now().isoformat()


def join_slash(a, b):
Expand All @@ -47,7 +50,7 @@ def emit_state(state):


def get_file_path(stream, destination, config):
filename = f"{stream}/{stream}-{now}.singer.gz"
filename = f"{stream}/{stream}-{file_timestamp}.singer.gz"
if destination == "local":
return Path(config["folder"]).joinpath(filename)
elif destination == "s3":
Expand All @@ -68,7 +71,7 @@ def write_lines_local(destination, config, stream, lines):
with stream_files[stream].open("w", encoding="utf-8") as outfile:
logging.info(f"Writing to file: {stream_files[stream]}")
for line in lines:
outfile.write(line)
outfile.write(line + "\n")


def write_lines_s3(destination, config, stream, lines):
Expand All @@ -79,7 +82,7 @@ def write_lines_s3(destination, config, stream, lines):
with open(stream_files[stream], "w", encoding="utf-8") as outfile:
logging.info(f"Writing to file: {stream_files[stream]}")
for line in lines:
outfile.write(line)
outfile.write(line + "\n")


def write_lines(config, stream, lines):
Expand All @@ -104,60 +107,93 @@ def persist_lines(config, lines):
state = None
schemas = {}
key_properties = {}
headers = {}
validators = {}
add_record_metadata = config.get("add_record_metadata", True)

# Loop over lines from stdin
for line in lines:
try:
o = json.loads(line)
message = json.loads(line)
except json.decoder.JSONDecodeError:
logger.error(f"Unable to parse:\n{line}")
raise

if "type" not in o:
if "type" not in message:
raise Exception(f"Line is missing required key 'type': {line}")
t = o["type"]
t = message["type"]

if t != "STATE":
if "stream" not in o:
if "stream" not in message:
raise Exception(f"Line is missing required key 'stream': {line}")

stream = o["stream"]
stream = message["stream"]

if stream not in stream_lines:
stream_lines[stream] = []

# persisting STATE messages is problematic when splitting records into separate
# files, therefore we omit them and allow tap-singer-jsonl to create new
# state messages from observed records
stream_lines[stream].append(line)

if t == "RECORD":

if stream not in schemas:
raise Exception(
f"A record for stream {stream} was encountered before a corresponding schema"
)

record = message["record"]
# Get schema for this record's stream
schema = schemas[stream]

# Validate record
validators[stream].validate(o["record"])

validators[stream].validate(record)
# Process record
if add_record_metadata:
now = datetime.now().isoformat()
record.update(
{
"_sdc_extracted_at": message.get(
"time_extracted", target_start_timestamp
),
"_sdc_received_at": now,
"_sdc_batched_at": now,
"_sdc_deleted_at": record.get("_sdc_deleted_at"),
"_sdc_sequence": int(round(time.time() * 1000)),
"_sdc_table_version": message.get("version"),
}
)
# Queue message for write
state = None
elif t == "STATE":
logger.debug(f'Setting state to {o["value"]}')
state = o["value"]
stream_lines[stream].append(json.dumps(message))

elif t == "SCHEMA":
schemas[stream] = o["schema"]
validators[stream] = Draft4Validator(o["schema"])
if "key_properties" not in o:
schemas[stream] = message["schema"]
validators[stream] = Draft4Validator(message["schema"])
if "key_properties" not in message:
raise Exception("key_properties field is required")
key_properties[stream] = o["key_properties"]
key_properties[stream] = message["key_properties"]
# Add metadata properties
if add_record_metadata:
properties_dict = schemas[stream]["properties"]
for col in {
"_sdc_extracted_at",
"_sdc_received_at",
"_sdc_batched_at",
"_sdc_deleted_at",
}:
properties_dict[col] = {
"type": ["null", "string"],
"format": "date-time",
}
for col in {"_sdc_sequence", "_sdc_table_version"}:
properties_dict[col] = {"type": ["null", "integer"]}
# Queue message for write
stream_lines[stream].append(json.dumps(message))

elif t == "STATE":
# persisting STATE messages is problematic when splitting records into separate
# files, therefore we omit them and allow tap-singer-jsonl to create new
# state messages from observed records on read
logger.debug(f'Setting state to {message["value"]}')
state = message["value"]

else:
raise Exception(f"Unknown message type {t} in message {o}")
raise Exception(f"Unknown message type {t} in message {message}")

for stream, messages in stream_lines.items():
write_lines(config, stream, messages)
Expand Down

0 comments on commit d26ab2b

Please sign in to comment.