-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
DynamoDB: "DynamoDB Streams Kinesis Adapter" project is dead
So, stop investigating that trail, ceasing the "standalone" attempt, and focusing on the decoder and software testing instead.
- Loading branch information
Showing
3 changed files
with
287 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
# DynamoDB CDC to CrateDB using DynamoDB Streams Kinesis Adapter | ||
|
||
|
||
## Introduction | ||
> DynamoDB Streams captures a time-ordered sequence of item-level modification | ||
> in any DynamoDB table and stores this information in a log for up to 24 hours. | ||
> | ||
> Applications can access this log and view the data items as they appeared | ||
> before and after they were modified, in near-real time. | ||
> | ||
> -- [Change data capture for DynamoDB Streams] | ||
|
||
## About | ||
A [change data capture (CDC)] pipeline made of a DynamoDB | ||
egress CDC processor, sinking data into the CrateDB | ||
OLAP database, using the [DynamoDB Streams Kinesis Adapter] | ||
([GitHub][DynamoDB Streams Kinesis Adapter for Java]). | ||
|
||
> Using the Amazon Kinesis Adapter is the recommended way to | ||
> consume streams from Amazon DynamoDB. | ||
> | ||
> -- [Using the DynamoDB Streams Kinesis adapter to process stream records] | ||
|
||
## What's Inside | ||
|
||
- On a compute-environment of your choice, supporting Python, a traditional | ||
KCL v2 application using the client-side DynamoDB Streams Kinesis Adapter, | ||
subscribes to a DynamoDB Change Stream, which is pretending to be a Kinesis | ||
Stream, in order to receive published CDC opslog messages. | ||
|
||
- On the egress side, the application re-materializes the items of the | ||
operations log into any database with [SQLAlchemy] support. | ||
|
||
|
||
## Holzweg! | ||
|
||
It looks like the "DynamoDB Streams Kinesis Adapter" project is dead. | ||
|
||
- https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/40 | ||
- https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/42 | ||
- https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/46 | ||
|
||
There would be an option to try this by downgrading to KCL v1. We are not | ||
sure if it is worth to try it, though. | ||
|
||
|
||
[change data capture (CDC)]: https://en.wikipedia.org/wiki/Change_data_capture | ||
[Change data capture for DynamoDB Streams]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html | ||
[DynamoDB]: https://aws.amazon.com/dynamodb/ | ||
[DynamoDB Streams Kinesis Adapter]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html | ||
[DynamoDB Streams Kinesis Adapter for Java]: https://github.com/awslabs/dynamodb-streams-kinesis-adapter | ||
[Kinesis]: https://aws.amazon.com/kinesis/ | ||
[SQLAlchemy]: https://www.sqlalchemy.org/ | ||
[Using the DynamoDB Streams Kinesis adapter to process stream records]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
#!/usr/bin/env python | ||
# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# ruff: noqa: B006,E501 | ||
""" | ||
This script provides two utility functions: | ||
``--print_classpath`` | ||
which prints a java class path. It optionally takes --properties | ||
and any number of --path options. It will generate a java class path which will include | ||
the properties file and paths and the location of the KCL jars based on the location of | ||
the amazon_kclpy.kcl module. | ||
``--print_command`` | ||
which prints a command to run an Amazon KCLpy application. It requires a --java | ||
and --properties argument and optionally takes any number of --path arguments to prepend | ||
to the classpath that it generates for the command. | ||
""" | ||
from __future__ import print_function | ||
|
||
import argparse | ||
import os | ||
import sys | ||
from glob import glob | ||
from pathlib import Path | ||
|
||
import samples | ||
from amazon_kclpy import kcl | ||
|
||
|
||
def get_dir_of_file(f): | ||
""" | ||
Returns the absolute path to the directory containing the specified file. | ||
:type f: str | ||
:param f: A path to a file, either absolute or relative | ||
:rtype: str | ||
:return: The absolute path of the directory represented by the relative path provided. | ||
""" | ||
return os.path.dirname(os.path.abspath(f)) | ||
|
||
|
||
def get_kcl_dir(): | ||
""" | ||
Returns the absolute path to the dir containing the amazon_kclpy.kcl module. | ||
:rtype: str | ||
:return: The absolute path of the KCL package. | ||
""" | ||
return get_dir_of_file(kcl.__file__) | ||
|
||
|
||
def get_kcl_jar_path(): | ||
""" | ||
Returns the absolute path to the KCL jars needed to run an Amazon KCLpy app. | ||
:rtype: str | ||
:return: The absolute path of the KCL jar files needed to run the MultiLangDaemon. | ||
""" | ||
return ":".join(glob(os.path.join(get_kcl_dir(), "jars", "*jar"))) | ||
|
||
|
||
def get_kcl_classpath(properties=None, paths=[]): | ||
""" | ||
Generates a classpath that includes the location of the kcl jars, the | ||
properties file and the optional paths. | ||
:type properties: str | ||
:param properties: Path to properties file. | ||
:type paths: list | ||
:param paths: List of strings. The paths that will be prepended to the classpath. | ||
:rtype: str | ||
:return: A java class path that will allow your properties to be found and the MultiLangDaemon and its deps and | ||
any custom paths you provided. | ||
""" | ||
# First make all the user provided paths absolute | ||
paths = [os.path.abspath(p) for p in paths] | ||
# We add our paths after the user provided paths because this permits users to | ||
# potentially inject stuff before our paths (otherwise our stuff would always | ||
# take precedence). | ||
paths.append(get_kcl_jar_path()) | ||
if properties: | ||
# Add the dir that the props file is in | ||
dir_of_file = get_dir_of_file(properties) | ||
paths.append(dir_of_file) | ||
|
||
# HACK: Add additional JARs to classpath, in order to satisfy Dynamodb Streams Kinesis Adapter for Python. | ||
# https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/46#issuecomment-1260222792 | ||
""" | ||
wget https://repo1.maven.org/maven2/com/amazonaws/amazon-kinesis-client/1.14.10/amazon-kinesis-client-1.14.10.jar | ||
wget https://repo1.maven.org/maven2/com/amazonaws/dynamodb-streams-kinesis-adapter/1.6.0/dynamodb-streams-kinesis-adapter-1.6.0.jar | ||
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.760/aws-java-sdk-1.12.760.jar | ||
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-cloudwatch/1.12.760/aws-java-sdk-cloudwatch-1.12.760.jar | ||
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.12.760/aws-java-sdk-dynamodb-1.12.760.jar | ||
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-kinesis/1.12.760/aws-java-sdk-kinesis-1.12.760.jar | ||
""" | ||
paths.append(str(Path.cwd() / "amazon-kinesis-client-1.14.10.jar")) | ||
paths.append(str(Path.cwd() / "dynamodb-streams-kinesis-adapter-1.6.0.jar")) | ||
paths.append(str(Path.cwd() / "aws-java-sdk-1.12.760.jar")) | ||
paths.append(str(Path.cwd() / "aws-java-sdk-cloudwatch-1.12.760.jar")) | ||
paths.append(str(Path.cwd() / "aws-java-sdk-dynamodb-1.12.760.jar")) | ||
paths.append(str(Path.cwd() / "aws-java-sdk-kinesis-1.12.760.jar")) | ||
|
||
return ":".join([p for p in paths if p != ""]) | ||
|
||
|
||
def get_kcl_app_command(args, multi_lang_daemon_class, properties, log_configuration, paths=[]): | ||
""" | ||
Generates a command to run the MultiLangDaemon. | ||
:type java: str | ||
:param java: Path to java | ||
:type multi_lang_daemon_class: str | ||
:param multi_lang_daemon_class: Name of multi language daemon class e.g. com.amazonaws.services.kinesis.multilang.MultiLangDaemon | ||
:type properties: str | ||
:param properties: Optional properties file to be included in the classpath. | ||
:type paths: list | ||
:param paths: List of strings. Additional paths to prepend to the classpath. | ||
:rtype: str | ||
:return: A command that will run the MultiLangDaemon with your properties and custom paths and java. | ||
""" | ||
return "{java} -cp {cp} {daemon} {props} {log_config}".format( | ||
java=args.java, | ||
cp=get_kcl_classpath(args.properties, paths), | ||
daemon=multi_lang_daemon_class, | ||
# Just need the basename because the path is added to the classpath | ||
props=properties, | ||
log_config=log_configuration, | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser("A script for generating a command to run an Amazon KCLpy app") | ||
parser.add_argument( | ||
"--print_classpath", | ||
dest="print_classpath", | ||
action="store_true", | ||
default=False, | ||
help="Print a java class path.\noptional arguments: --path", | ||
) | ||
parser.add_argument( | ||
"--print_command", | ||
dest="print_command", | ||
action="store_true", | ||
default=False, | ||
help="Print a command for running an Amazon KCLpy app.\nrequired " | ||
+ "args: --java --properties\noptional args: --classpath", | ||
) | ||
parser.add_argument( | ||
"-j", | ||
"--java", | ||
dest="java", | ||
help="The path to the java executable e.g. <some root>/jdk/bin/java", | ||
metavar="PATH_TO_JAVA", | ||
) | ||
parser.add_argument( | ||
"-p", | ||
"--properties", | ||
"--props", | ||
"--prop", | ||
dest="properties", | ||
help="The path to a properties file (relative to where you are running this script)", | ||
metavar="PATH_TO_PROPERTIES", | ||
) | ||
parser.add_argument( | ||
"--sample", | ||
"--sample-props", | ||
"--use-sample-properties", | ||
dest="use_sample_props", | ||
help="This will use the sample.properties file included in this package as the properties file.", | ||
action="store_true", | ||
default=False, | ||
) | ||
parser.add_argument( | ||
"-c", | ||
"--classpath", | ||
"--path", | ||
dest="paths", | ||
action="append", | ||
default=[], | ||
help="Additional path to add to java class path. May be specified any number of times", | ||
metavar="PATH", | ||
) | ||
parser.add_argument( | ||
"-l", | ||
"--log-configuration", | ||
dest="log_configuration", | ||
help="This will use the logback.xml which will be used by the KCL to log.", | ||
metavar="PATH_TO_LOG_CONFIGURATION", | ||
) | ||
args = parser.parse_args() | ||
# Possibly replace the properties with the sample. Useful if they just want to run the sample app. | ||
if args.use_sample_props: | ||
if args.properties: | ||
sys.stderr.write("Replacing provided properties with sample properties due to arg --sample\n") | ||
args.properties = os.path.join(get_dir_of_file(samples.__file__), "record_processor.properties") | ||
|
||
# Print what the asked for | ||
if args.print_classpath: | ||
print(get_kcl_classpath(args.properties, args.paths)) | ||
elif args.print_command: | ||
if args.java and args.properties: | ||
|
||
# HACK | ||
|
||
# Kinesis backend. | ||
multi_lang_daemon_class = "software.amazon.kinesis.multilang.MultiLangDaemon" | ||
|
||
# DynamoDB backend. | ||
# https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/46#issuecomment-1260222792 | ||
multi_lang_daemon_class = "com.amazonaws.services.dynamodbv2.streamsadapter.StreamsMultiLangDaemon" | ||
|
||
properties_argument = "{props}".format(props=args.properties) | ||
log_argument = "" | ||
if args.log_configuration is not None: | ||
log_argument = "--log-configuration {log}".format(log=args.log_configuration) | ||
print( | ||
get_kcl_app_command(args, multi_lang_daemon_class, properties_argument, log_argument, paths=args.paths) | ||
) | ||
else: | ||
sys.stderr.write("Must provide arguments: --java and --properties\n") | ||
parser.print_usage() | ||
else: | ||
parser.print_usage() |