Skip to content

Commit 9533b64

Browse files
committed
Kinesis: Add playground assets
1 parent 82b9150 commit 9533b64

File tree

11 files changed

+994
-0
lines changed

11 files changed

+994
-0
lines changed

lorrystream/kinesis/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
record_processor.log

lorrystream/kinesis/README.md

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Kinesis
2+
3+
## About
4+
A stream processor component using the [Kinesis Client Library (KCL)].
5+
It is written in Python, and uses the [amazon-kclpy] Python SDK for KCL
6+
([GitHub][amazon-kclpy-github]).
7+
8+
## What's Inside
9+
- [ ] Publishing and subscribing to fundamental Kinesis streams.
10+
- [ ] Subscribing to DynamoDB CDC streams.
11+
- [Using the DynamoDB Streams Kinesis adapter to process stream records]
12+
- [DynamoDB Streams Kinesis Adapter for Java]
13+
14+
## Setup
15+
This section reflects configuration settings stored in
16+
[record_processor.properties](./record_processor.properties).
17+
18+
### AWS
19+
Configure a Kinesis Stream, and an IAM policy, because Kinesis needs to create
20+
and maintain a "[leases table]" stored in DynamoDB, so it requires corresponding
21+
permissions to do so.
22+
23+
- Create a [Kinesis] stream called `testdrive-stream`.
24+
- [Create an IAM Policy and User], applying the permissions outlined on this page.
25+
Two example ARN IDs, that address relevant resources in Kinesis and DynamoDB, are:
26+
```text
27+
arn:aws:kinesis:us-east-1:841394475918:stream/testdrive-stream
28+
arn:aws:dynamodb:us-east-1:841394475918:table/stream-demo
29+
```
30+
- The leases table in DynamoDB will be automatically created when the first
31+
stream consumer (the KCL application) becomes active.
32+
33+
### KCL Stream Processor
34+
35+
Acquire sources and initialize sandbox.
36+
```shell
37+
git clone https://github.com/daq-tools/lorrystream --branch=kinesis
38+
cd lorrystream
39+
python3 -m venv .venv
40+
source .venv/bin/activate
41+
```
42+
43+
Install dependencies, mainly the [amazon-kclpy] package.
44+
```shell
45+
cd lorrystream/kinesis
46+
pip install --verbose wheel -r requirements.txt
47+
```
48+
Note that the first installation of the [amazon-kclpy] package on your machine
49+
will take a while, because it will download a bunch of JAR files, defined by a
50+
traditional [pom.xml] recipe, before embedding them into the Python package.
51+
52+
On subsequent installations, as long as you don't switch versions, that package
53+
will install from your local package cache, so it will be much faster.
54+
55+
Alternative: Use ready-made wheel package. Note to self: Need to provide this to
56+
the colleagues.
57+
```shell
58+
pip install ./dist/amazon_kclpy-2.1.5-py3-none-any.whl
59+
```
60+
61+
## Usage
62+
You will need two terminal windows. Within both of them, activate the virtualenv
63+
on the top-level directory. Then, navigate to the playground directory, and
64+
seed AWS credentials.
65+
```shell
66+
source .venv/bin/activate
67+
cd lorrystream/kinesis
68+
export AWS_ACCESS_KEY=...
69+
export AWS_SECRET_ACCESS_KEY=...
70+
```
71+
72+
Launch the stream processor, subscribing to the stream.
73+
```shell
74+
$(sh launch.sh)
75+
```
76+
77+
Watch actions of the record processor.
78+
```shell
79+
tail -F record_processor.log
80+
```
81+
82+
Publish a demo message to the stream.
83+
```shell
84+
python publish.py
85+
```
86+
87+
88+
## See Also
89+
- https://github.com/aws-samples/amazon-kinesis-data-processor-aws-fargate
90+
91+
92+
[amazon-kclpy]: https://pypi.org/project/amazon-kclpy
93+
[amazon-kclpy-github]: https://github.com/awslabs/amazon-kinesis-client-python
94+
[Create an IAM Policy and User]: https://docs.aws.amazon.com/streams/latest/dev/tutorial-stock-data-kplkcl2-iam.html
95+
[DynamoDB]: https://console.aws.amazon.com/dynamodbv2/
96+
[DynamoDB Streams Kinesis Adapter for Java]: https://github.com/awslabs/dynamodb-streams-kinesis-adapter
97+
[Kinesis]: https://console.aws.amazon.com/kinesis/
98+
[Kinesis Client Library (KCL)]: https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html
99+
[leases table]: https://aws.amazon.com/blogs/big-data/processing-amazon-dynamodb-streams-using-the-amazon-kinesis-client-library/
100+
[pom.xml]: https://github.com/awslabs/amazon-kinesis-client-python/blob/v2.1.5/pom.xml
101+
[Using the DynamoDB Streams Kinesis adapter to process stream records]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
#!/usr/bin/env python
2+
# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
# SPDX-License-Identifier: Apache-2.0
4+
# ruff: noqa: B006,E501
5+
"""
6+
This script provides two utility functions:
7+
8+
``--print_classpath``
9+
which prints a java class path. It optionally takes --properties
10+
and any number of --path options. It will generate a java class path which will include
11+
the properties file and paths and the location of the KCL jars based on the location of
12+
the amazon_kclpy.kcl module.
13+
14+
``--print_command``
15+
which prints a command to run an Amazon KCLpy application. It requires a --java
16+
and --properties argument and optionally takes any number of --path arguments to prepend
17+
to the classpath that it generates for the command.
18+
"""
19+
from __future__ import print_function
20+
21+
import argparse
22+
import os
23+
import sys
24+
from glob import glob
25+
26+
import samples
27+
from amazon_kclpy import kcl
28+
29+
30+
def get_dir_of_file(f):
31+
"""
32+
Returns the absolute path to the directory containing the specified file.
33+
34+
:type f: str
35+
:param f: A path to a file, either absolute or relative
36+
37+
:rtype: str
38+
:return: The absolute path of the directory represented by the relative path provided.
39+
"""
40+
return os.path.dirname(os.path.abspath(f))
41+
42+
43+
def get_kcl_dir():
44+
"""
45+
Returns the absolute path to the dir containing the amazon_kclpy.kcl module.
46+
47+
:rtype: str
48+
:return: The absolute path of the KCL package.
49+
"""
50+
return get_dir_of_file(kcl.__file__)
51+
52+
53+
def get_kcl_jar_path():
54+
"""
55+
Returns the absolute path to the KCL jars needed to run an Amazon KCLpy app.
56+
57+
:rtype: str
58+
:return: The absolute path of the KCL jar files needed to run the MultiLangDaemon.
59+
"""
60+
return ":".join(glob(os.path.join(get_kcl_dir(), "jars", "*jar")))
61+
62+
63+
def get_kcl_classpath(properties=None, paths=[]):
64+
"""
65+
Generates a classpath that includes the location of the kcl jars, the
66+
properties file and the optional paths.
67+
68+
:type properties: str
69+
:param properties: Path to properties file.
70+
71+
:type paths: list
72+
:param paths: List of strings. The paths that will be prepended to the classpath.
73+
74+
:rtype: str
75+
:return: A java class path that will allow your properties to be found and the MultiLangDaemon and its deps and
76+
any custom paths you provided.
77+
"""
78+
# First make all the user provided paths absolute
79+
paths = [os.path.abspath(p) for p in paths]
80+
# We add our paths after the user provided paths because this permits users to
81+
# potentially inject stuff before our paths (otherwise our stuff would always
82+
# take precedence).
83+
paths.append(get_kcl_jar_path())
84+
if properties:
85+
# Add the dir that the props file is in
86+
dir_of_file = get_dir_of_file(properties)
87+
paths.append(dir_of_file)
88+
return ":".join([p for p in paths if p != ""])
89+
90+
91+
def get_kcl_app_command(args, multi_lang_daemon_class, properties, log_configuration, paths=[]):
92+
"""
93+
Generates a command to run the MultiLangDaemon.
94+
95+
:type java: str
96+
:param java: Path to java
97+
98+
:type multi_lang_daemon_class: str
99+
:param multi_lang_daemon_class: Name of multi language daemon class e.g. com.amazonaws.services.kinesis.multilang.MultiLangDaemon
100+
101+
:type properties: str
102+
:param properties: Optional properties file to be included in the classpath.
103+
104+
:type paths: list
105+
:param paths: List of strings. Additional paths to prepend to the classpath.
106+
107+
:rtype: str
108+
:return: A command that will run the MultiLangDaemon with your properties and custom paths and java.
109+
"""
110+
return "{java} -cp {cp} {daemon} {props} {log_config}".format(
111+
java=args.java,
112+
cp=get_kcl_classpath(args.properties, paths),
113+
daemon=multi_lang_daemon_class,
114+
# Just need the basename because the path is added to the classpath
115+
props=properties,
116+
log_config=log_configuration,
117+
)
118+
119+
120+
if __name__ == "__main__":
121+
parser = argparse.ArgumentParser("A script for generating a command to run an Amazon KCLpy app")
122+
parser.add_argument(
123+
"--print_classpath",
124+
dest="print_classpath",
125+
action="store_true",
126+
default=False,
127+
help="Print a java class path.\noptional arguments: --path",
128+
)
129+
parser.add_argument(
130+
"--print_command",
131+
dest="print_command",
132+
action="store_true",
133+
default=False,
134+
help="Print a command for running an Amazon KCLpy app.\nrequired "
135+
+ "args: --java --properties\noptional args: --classpath",
136+
)
137+
parser.add_argument(
138+
"-j",
139+
"--java",
140+
dest="java",
141+
help="The path to the java executable e.g. <some root>/jdk/bin/java",
142+
metavar="PATH_TO_JAVA",
143+
)
144+
parser.add_argument(
145+
"-p",
146+
"--properties",
147+
"--props",
148+
"--prop",
149+
dest="properties",
150+
help="The path to a properties file (relative to where you are running this script)",
151+
metavar="PATH_TO_PROPERTIES",
152+
)
153+
parser.add_argument(
154+
"--sample",
155+
"--sample-props",
156+
"--use-sample-properties",
157+
dest="use_sample_props",
158+
help="This will use the sample.properties file included in this package as the properties file.",
159+
action="store_true",
160+
default=False,
161+
)
162+
parser.add_argument(
163+
"-c",
164+
"--classpath",
165+
"--path",
166+
dest="paths",
167+
action="append",
168+
default=[],
169+
help="Additional path to add to java class path. May be specified any number of times",
170+
metavar="PATH",
171+
)
172+
parser.add_argument(
173+
"-l",
174+
"--log-configuration",
175+
dest="log_configuration",
176+
help="This will use the logback.xml which will be used by the KCL to log.",
177+
metavar="PATH_TO_LOG_CONFIGURATION",
178+
)
179+
args = parser.parse_args()
180+
# Possibly replace the properties with the sample. Useful if they just want to run the sample app.
181+
if args.use_sample_props:
182+
if args.properties:
183+
sys.stderr.write("Replacing provided properties with sample properties due to arg --sample\n")
184+
args.properties = os.path.join(get_dir_of_file(samples.__file__), "record_processor.properties")
185+
186+
# Print what the asked for
187+
if args.print_classpath:
188+
print(get_kcl_classpath(args.properties, args.paths))
189+
elif args.print_command:
190+
if args.java and args.properties:
191+
multi_lang_daemon_class = "software.amazon.kinesis.multilang.MultiLangDaemon"
192+
properties_argument = "--properties-file {props}".format(props=args.properties)
193+
log_argument = ""
194+
if args.log_configuration is not None:
195+
log_argument = "--log-configuration {log}".format(log=args.log_configuration)
196+
print(
197+
get_kcl_app_command(args, multi_lang_daemon_class, properties_argument, log_argument, paths=args.paths)
198+
)
199+
else:
200+
sys.stderr.write("Must provide arguments: --java and --properties\n")
201+
parser.print_usage()
202+
else:
203+
parser.print_usage()

lorrystream/kinesis/launch.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
python amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties record_processor.properties --log-configuration logback.xml

lorrystream/kinesis/logback.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<configuration>
2+
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<!-- encoders are assigned the type
5+
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
6+
<encoder>
7+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
8+
</encoder>
9+
</appender>
10+
11+
<root level="info">
12+
<appender-ref ref="STDOUT" />
13+
</root>
14+
</configuration>

0 commit comments

Comments
 (0)