Skip to content

Commit

Permalink
DynamoDB: "DynamoDB Streams Kinesis Adapter" project is dead
Browse files Browse the repository at this point in the history
So, stop investigating that trail, ceasing the "standalone" attempt.
  • Loading branch information
amotl committed Jul 12, 2024
1 parent 31d6744 commit 6c255ac
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 0 deletions.
53 changes: 53 additions & 0 deletions lorrystream/dynamodb_standalone/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# DynamoDB CDC to CrateDB using DynamoDB Streams Kinesis Adapter


## Introduction
> DynamoDB Streams captures a time-ordered sequence of item-level modification
> in any DynamoDB table and stores this information in a log for up to 24 hours.
>
> Applications can access this log and view the data items as they appeared
> before and after they were modified, in near-real time.
>
> -- [Change data capture for DynamoDB Streams]

## About
A [change data capture (CDC)] pipeline made of a DynamoDB
egress CDC processor, sinking data into the CrateDB
OLAP database, using the [DynamoDB Streams Kinesis Adapter]
([GitHub][DynamoDB Streams Kinesis Adapter for Java]).

> Using the Amazon Kinesis Adapter is the recommended way to
> consume streams from Amazon DynamoDB.
>
> -- [Using the DynamoDB Streams Kinesis adapter to process stream records]

## What's Inside

- On a compute-environment of your choice, supporting Python, a traditional
KCL v2 application using the client-side DynamoDB Streams Kinesis Adapter,
subscribes to a DynamoDB Change Stream, which is pretending to be a Kinesis
Stream, in order to receive published CDC opslog messages.

- On the egress side, the application re-materializes the items of the
operations log into any database with [SQLAlchemy] support.


## Holzweg!

It looks like the "DynamoDB Streams Kinesis Adapter" project is dead.

- https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/40
- https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/42
- https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/46


[change data capture (CDC)]: https://en.wikipedia.org/wiki/Change_data_capture
[Change data capture for DynamoDB Streams]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html
[DynamoDB]: https://aws.amazon.com/dynamodb/
[DynamoDB Streams Kinesis Adapter]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
[DynamoDB Streams Kinesis Adapter for Java]: https://github.com/awslabs/dynamodb-streams-kinesis-adapter
[Kinesis]: https://aws.amazon.com/kinesis/
[SQLAlchemy]: https://www.sqlalchemy.org/
[Using the DynamoDB Streams Kinesis adapter to process stream records]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
Empty file.
231 changes: 231 additions & 0 deletions lorrystream/dynamodb_standalone/amazon_kclpy_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
#!/usr/bin/env python
# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
# ruff: noqa: B006,E501
"""
This script provides two utility functions:
``--print_classpath``
which prints a java class path. It optionally takes --properties
and any number of --path options. It will generate a java class path which will include
the properties file and paths and the location of the KCL jars based on the location of
the amazon_kclpy.kcl module.
``--print_command``
which prints a command to run an Amazon KCLpy application. It requires a --java
and --properties argument and optionally takes any number of --path arguments to prepend
to the classpath that it generates for the command.
"""
from __future__ import print_function

import argparse
import os
import sys
from glob import glob
from pathlib import Path

import samples
from amazon_kclpy import kcl


def get_dir_of_file(f):
"""
Returns the absolute path to the directory containing the specified file.
:type f: str
:param f: A path to a file, either absolute or relative
:rtype: str
:return: The absolute path of the directory represented by the relative path provided.
"""
return os.path.dirname(os.path.abspath(f))


def get_kcl_dir():
"""
Returns the absolute path to the dir containing the amazon_kclpy.kcl module.
:rtype: str
:return: The absolute path of the KCL package.
"""
return get_dir_of_file(kcl.__file__)


def get_kcl_jar_path():
"""
Returns the absolute path to the KCL jars needed to run an Amazon KCLpy app.
:rtype: str
:return: The absolute path of the KCL jar files needed to run the MultiLangDaemon.
"""
return ":".join(glob(os.path.join(get_kcl_dir(), "jars", "*jar")))


def get_kcl_classpath(properties=None, paths=[]):
"""
Generates a classpath that includes the location of the kcl jars, the
properties file and the optional paths.
:type properties: str
:param properties: Path to properties file.
:type paths: list
:param paths: List of strings. The paths that will be prepended to the classpath.
:rtype: str
:return: A java class path that will allow your properties to be found and the MultiLangDaemon and its deps and
any custom paths you provided.
"""
# First make all the user provided paths absolute
paths = [os.path.abspath(p) for p in paths]
# We add our paths after the user provided paths because this permits users to
# potentially inject stuff before our paths (otherwise our stuff would always
# take precedence).
paths.append(get_kcl_jar_path())
if properties:
# Add the dir that the props file is in
dir_of_file = get_dir_of_file(properties)
paths.append(dir_of_file)

# HACK: Add additional JARs to classpath, in order to satisfy Dynamodb Streams Kinesis Adapter for Python.
# https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/46#issuecomment-1260222792
"""
wget https://repo1.maven.org/maven2/com/amazonaws/amazon-kinesis-client/1.14.10/amazon-kinesis-client-1.14.10.jar
wget https://repo1.maven.org/maven2/com/amazonaws/dynamodb-streams-kinesis-adapter/1.6.0/dynamodb-streams-kinesis-adapter-1.6.0.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.760/aws-java-sdk-1.12.760.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-cloudwatch/1.12.760/aws-java-sdk-cloudwatch-1.12.760.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.12.760/aws-java-sdk-dynamodb-1.12.760.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-kinesis/1.12.760/aws-java-sdk-kinesis-1.12.760.jar
"""
paths.append(str(Path.cwd() / "amazon-kinesis-client-1.14.10.jar"))
paths.append(str(Path.cwd() / "dynamodb-streams-kinesis-adapter-1.6.0.jar"))
paths.append(str(Path.cwd() / "aws-java-sdk-1.12.760.jar"))
paths.append(str(Path.cwd() / "aws-java-sdk-cloudwatch-1.12.760.jar"))
paths.append(str(Path.cwd() / "aws-java-sdk-dynamodb-1.12.760.jar"))
paths.append(str(Path.cwd() / "aws-java-sdk-kinesis-1.12.760.jar"))

return ":".join([p for p in paths if p != ""])


def get_kcl_app_command(args, multi_lang_daemon_class, properties, log_configuration, paths=[]):
"""
Generates a command to run the MultiLangDaemon.
:type java: str
:param java: Path to java
:type multi_lang_daemon_class: str
:param multi_lang_daemon_class: Name of multi language daemon class e.g. com.amazonaws.services.kinesis.multilang.MultiLangDaemon
:type properties: str
:param properties: Optional properties file to be included in the classpath.
:type paths: list
:param paths: List of strings. Additional paths to prepend to the classpath.
:rtype: str
:return: A command that will run the MultiLangDaemon with your properties and custom paths and java.
"""
return "{java} -cp {cp} {daemon} {props} {log_config}".format(
java=args.java,
cp=get_kcl_classpath(args.properties, paths),
daemon=multi_lang_daemon_class,
# Just need the basename because the path is added to the classpath
props=properties,
log_config=log_configuration,
)


if __name__ == "__main__":
parser = argparse.ArgumentParser("A script for generating a command to run an Amazon KCLpy app")
parser.add_argument(
"--print_classpath",
dest="print_classpath",
action="store_true",
default=False,
help="Print a java class path.\noptional arguments: --path",
)
parser.add_argument(
"--print_command",
dest="print_command",
action="store_true",
default=False,
help="Print a command for running an Amazon KCLpy app.\nrequired "
+ "args: --java --properties\noptional args: --classpath",
)
parser.add_argument(
"-j",
"--java",
dest="java",
help="The path to the java executable e.g. <some root>/jdk/bin/java",
metavar="PATH_TO_JAVA",
)
parser.add_argument(
"-p",
"--properties",
"--props",
"--prop",
dest="properties",
help="The path to a properties file (relative to where you are running this script)",
metavar="PATH_TO_PROPERTIES",
)
parser.add_argument(
"--sample",
"--sample-props",
"--use-sample-properties",
dest="use_sample_props",
help="This will use the sample.properties file included in this package as the properties file.",
action="store_true",
default=False,
)
parser.add_argument(
"-c",
"--classpath",
"--path",
dest="paths",
action="append",
default=[],
help="Additional path to add to java class path. May be specified any number of times",
metavar="PATH",
)
parser.add_argument(
"-l",
"--log-configuration",
dest="log_configuration",
help="This will use the logback.xml which will be used by the KCL to log.",
metavar="PATH_TO_LOG_CONFIGURATION",
)
args = parser.parse_args()
# Possibly replace the properties with the sample. Useful if they just want to run the sample app.
if args.use_sample_props:
if args.properties:
sys.stderr.write("Replacing provided properties with sample properties due to arg --sample\n")
args.properties = os.path.join(get_dir_of_file(samples.__file__), "record_processor.properties")

# Print what the asked for
if args.print_classpath:
print(get_kcl_classpath(args.properties, args.paths))
elif args.print_command:
if args.java and args.properties:

# HACK

# Kinesis backend.
multi_lang_daemon_class = "software.amazon.kinesis.multilang.MultiLangDaemon"

# DynamoDB backend.
# https://github.com/awslabs/dynamodb-streams-kinesis-adapter/issues/46#issuecomment-1260222792
multi_lang_daemon_class = "com.amazonaws.services.dynamodbv2.streamsadapter.StreamsMultiLangDaemon"

properties_argument = "{props}".format(props=args.properties)
log_argument = ""
if args.log_configuration is not None:
log_argument = "--log-configuration {log}".format(log=args.log_configuration)
print(
get_kcl_app_command(args, multi_lang_daemon_class, properties_argument, log_argument, paths=args.paths)
)
else:
sys.stderr.write("Must provide arguments: --java and --properties\n")
parser.print_usage()
else:
parser.print_usage()

0 comments on commit 6c255ac

Please sign in to comment.