Skip to content

Commit

Permalink
Initial public release
Browse files Browse the repository at this point in the history
  • Loading branch information
brunodebus authored Sep 23, 2018
1 parent 618a92b commit dd4a725
Show file tree
Hide file tree
Showing 17 changed files with 1,630 additions and 1 deletion.
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
target/

.idea/
*.iml

tags
.*.swp
.*.swo
~*

23 changes: 23 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM openjdk:8
MAINTAINER Bruno De Bus <Bruno.DeBus@klarrio.com>

ARG IMAGE_VERSION

# for training purposed configure uid to be 1024 for dshdemo1 or 1025 for
# dshdemo2
ENV id 1024
ADD get_signed_certificate.sh /get_signed_certificate.sh
ADD docker-entrypoint.sh /docker-entrypoint.sh

RUN groupadd --gid $id dshdemo
RUN useradd --no-create-home --uid $id --gid $id dshdemo

RUN mkdir -p /usr/share/tenant-example/conf
ADD target/lib /usr/share/tenant-example/lib
ADD target/tenant-example-${IMAGE_VERSION}.jar /usr/share/tenant-example/tenant-example.jar

RUN chown -R $id:$id /usr/share/tenant-example \
&& chmod -R o-rwx /usr/share/tenant-example
USER $id

ENTRYPOINT [ "/docker-entrypoint.sh" ]
191 changes: 190 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,190 @@
# tenant-example
# DSH tenant example

This example shows how tenants on the DSH platform should interact with the DSH
data streams. The particular points to pay attention to are:
- getting Kafka configuration from the kafka configuration service
- key and data envelopes for serialization/deserialization on public streams
(the `stream.<something>` streams).
- partitioning for public streams.

## Kafka configuration

On startup, every DSH container that wants to interact with the data streams
needs to fetch its kafka configuration from the kafka configuration service
(pikachu.dsh.marathon.mesos).

**Fetching the Kafka configuration *must* happen on container startup.** It
depends on a limited-lifetime token that is configured in the container's
environment on startup. If you wait for the token to expire, there is no way
for the container to ever fetch its Kafka configuration.

The full flow for fetching the Kafka configuration is a rather involved
multi-step process. Documenting it here in its entirety will lead us too far,
but the good news is that this example contains a fully working shell script
that does all the work for you. You can copy it into your own containers and
invoke it from the entrypoint.

See the `get_signed_certificate.sh` script.

### Consumer groups

In order to consume data from Kafka, you need to set a consumer group for your
consumers. How you choose your consumer group has a profound impact on the
operation of your program:
- if you choose a unique consumer group name, your program will see all data
from all partitions of the topics you subscribe to.
- if you share a consumer group with other programs, each program will see
a subset of the total data on the kafka topics you subscribe to. Kafka
balances the partitions among all members of a consumer group automatically.

#### Consumer group enforcement

The DSH platform enforces restrictions on the consumer groups each container
is allowed to join. When you retrieve the Kafka configuration from the Kafka
configuration service, it will include a list of consumer group names you are
allowed to use. To cater for the two options described above, the list is
split in two parts:
- **private** consumer groups are unique to this particular container, and
can be used to consume all data on a Kafka topic.
- **shared** consumer groups are shared among all instances of this container,
to allow you to spin up multiple instances and balance the load of a (set
of) Kafka topics among them.

Note that the DSH platform does not currently offer provisions for sharing a
consumer group among instances of different containers.

See [below](#configuration-data) for a description of how the consumer group
names are encoded in the `datastreams.properties` file.

#### Setting the consumer group

To set a specific consumer group, pass the `group.id` property to the Kafka
Consumer constructor.

### Configuration data

The configuration data fetched by the `get_signed_certificates.sh` script
consists of an SSL certificate to be used for connection to Kafka on the one
hand, and a Java properties file that contains the necessary security settings,
a list of bootstrap servers and a description of the streams you have access to
on the other hand.

On startup, load the properties from the `datastreams.properties` file
generated by the script, and pass them along to any Kafka consumer or producer
you create.

To learn more about the streams you have access to, you can inspect the
contents of the properties file. All configuration entries for a given stream
are prefixed with `datastream.<streamname>`. Note that the streamname includes
the stream type prefix, so concrete examples are `datastream.stream.foo` for a
public stream called "foo" and `datastream.internal.bar` for an internal stream
named "bar".

For each stream, the following configuration
values are defined:
- `datastream.<streamname>.cluster`: identifies the Kafka cluster carrying this
data stream (right now, the only supported value is "tt")
- `datastream.<streamname>.partitioner`: partitioning scheme used on this
stream (see below). Allowed values are: `"default-partitioner"`,
`"topic-level-partitioner"` and `"topic-char-depth-partitioner"`.
- `datastream.<streamname>.canretain`: indicates whether this stream supports
retained MQTT messages. This is only applicable to public streams. Allowed
values are `"true"` and `"false"`
- `datastream.<streamname>.partitions`: number of Kafka partitions for the
stream
- `datastream.<streamname>.replication`: Kafka replication factor for the
stream
- `datastream.<streamname>.partitioningDepth`: the partitioning depth
configuration parameter for the topic level and character depth partitioners.
This value has no meaning for the default partitioner.
- `datastream.<streamname>.read`: the **regex pattern** you can use to
subscribe to this stream on Kafka. You should always subscribe with a regex
pattern, to ensure you subscribe to all Kafka topics that underly this data
stream. If this configuration value is empty, it means you do not have read
access to this stream.
- `datastream.<streamname>.write`: the Kafka topic name you can use to produce
data on the stream. If this configuration value is empty, it means that you
do not have write permissions for this stream.

In addition to the stream descriptions, the `datastreams.properties` file
defines the following helpful values:
- `consumerGroups.private`: a comma-separated list of "private" consumer group names
your container is allowed to use. A private consumer group is one that is not
shared with any other container in the system, even other instances of the same
container.
- `consumerGroups.shared`: a comma-separated list of "shared" consumer group names
your container is allowed to use. A shared consumer group is one that is available
for use in all instances of the same container.
- `envelope.streams`: a comma-separated list of public streams that do _NOT_
yet have envelopes. Use this value to configure your (de)serializers to
expect the right kind of data (enveloped or non-enveloped) on each topic.
Note that the list contains only the middle part of the stream name, not the
`stream` prefix or tenant name suffix.


## Envelopes on public data streams

Public data streams (the `stream.<something>` streams) have specific
requirements with respect to the structure of the key and payload values. In
essence, the "plain" key and payload are wrapped in envelope structures. The
envelope structures are defined as Protobuf messages. The definition can be
found in `src/main/proto/envelope.proto`.

**Note:** at this moment, the DSH platform is in a transition phase where the
use of envelopes on the public topics is gradually phased in. Right now,
envelopes MUST be used on all Kafka topics ending in `.dsh`, and MAY NOT be
used on any other topics. The example code deals with this transparently, and
allows you to write your application code as if envelopes are mandatory
everywhere.

**Note:** while the usage of envelopes on the public streams is strictly
regulated and enforced, you are free to choose whether you want to use
envelopes on the other stream types (scratch, internal). You are not forced to
do it, and also not forbidden to if you feel it is useful to do so.

### Envelope definition

The envelope definition can be found [here](doc/envelopes.md).

### Envelope usage

The Kafka serializers and deserializers for enveloped keys and data can be
found in the `KeyEnvelopeSerializer`, `KeyEnvelopeDeserializer`,
`DataEnvelopeSerializer` and `DataEnvelopeDeserializer` classes. As noted
above, these implementations deal automatically with enveloped (.dsh) and
non-enveloped (all other) topics. *If you reuse these implementations in your
own containers, take care to remove this logic when envelopes become mandatory
for all public data streams.*

#### Convenience methods

For your convenience, the serializer classes define some utility methods that
facilitate wrapping of plain keys and payloads in the envelopes:
- `KeyEnvelopeSerializer.setIdentifier()` is used to set the static identifier
(tenant id and publisher id) for every message produced by this container.
- `KeyEnvelopeSerializer.wrap()` wraps a plain string key in an envelope, with
optional `qos` and `retain` settings (defaults are QoS 0 and non-retained).
- `DataEnvelopeSerializer.wrap()` wraps a plain byte array payload in a data
envelope.

## Partitioning for public streams

If your container publishes data on a public stream, it needs to abide by the
stream contract, which defines (amongst others) the partitioning scheme for
this stream.

Currently, there are three supported partitioning schemes:
- topic level partitioning: the first `n` levels of the topic path (i.e. the
plain string key) are hashed using the Murmur2 hashing scheme
- character level partitioning: the first `n` characters of the topic path are
hashed using the Murmur2 hashing scheme
- default partitioning: Murmur2 hash of the entire topic path

As of now, the only partitioning scheme that is used in practice on the public
streams is the topic level partitioner. Therefore, that is the only partitioner
for which we have included an example implementation, in the
`TopicLevelPartitioner` class.

The `TopicLevelPartitioner` class will figure out the partitioning depth (the
aforementioned `n` parameter) from the configuration values you fetched from
the Kafka configuration service, and passed to your Kafka producer constructor.
167 changes: 167 additions & 0 deletions doc/envelopes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Message Envelopes

On public streams, the DSH platform mandates the use of so-called message
envelopes. The message key must be wrapped in a key envelope, and the message
payload must be wrapped in a data envelope.

This document discusses the structure and usage of these message envelopes.

## Formal definition

The serialization scheme used on enveloped streams is [Google Protocol
Buffers](https://developers.google.com/protocol-buffers/).

### Key envelopes

Key envelopes are defined in the protocol buffers IDL as follows:

```protobuf
syntax = "proto3";
message KeyEnvelope {
KeyHeader header = 1; // header is mandatory on stream topics
string key = 2; // MQTT topic (minus prefixes) on stream topics
reserved 3; // deprecated field
}
message KeyHeader {
// identifies the message origin
Identity identifier = 1;
// marks the message as 'retained'
// this makes the Latest Value Store keep a copy of the message in memory for later retrieval
bool retained = 2;
// the QOS with which the message is handled within the system
QoS qos = 3;
}
// identifies the data origin
message Identity {
string tenant = 1;
string publisher = 2;
}
// System QOS identifiers
enum QoS {
BEST_EFFORT = 0; // might be dropped in case of resources running low (~ highest throughput)
RELIABLE = 1; // will *never* be dropped and retried until success (~ lowest throughput)
}
```

The key envelope mostly contains metadata that allows the DSH
protocol adapter(s) (currently the DSH platform only supports an MQTT protocol
adapter) to figure out how to properly handle the message.

- the `header` consists of
- the `identifier` that identifies the message origin
- the `tenant` field contains the tenant ID of the message (regardless of
whether it entered the stream via a protocol adapter or via direct
injection on Kafka). This field _must_ be set correctly, messages will
be dropped (i.e., they will not be retained or forwarded to device
clients) otherwise.
- the `publisher` field is a free-from string that identifies a particular
data source. For messages that are ingested over a protocol adapter, this
field contains the client ID of the device that published the message.
For messages injected directly on Kafka (i.e. from a container that runs
on the DSH platform), the tenant is free to choose its own naming
scheme. This field is purely informative, the DSH platform does not
attach any semantic meaning to it.
- the `retained` field indicates whether this message should be retained in
the Last Value Store - i.e. whether it should be delivered to late
subscribers on the MQTT side.
- the `qos` field indicates the MQTT QoS level with which the protocol
adapter will treat the message.
- `BEST_EFFORT` (0) QoS means messages may be dropped in rare cases if the
platform is under high load. Use this for messages that are frequently
refreshed (e.g. a vehicle that publishes its position every second)
- `RELIABLE` (1) QoS means that delivery of messages will be retried until
successful. Using this option has a cost in terms of throughput, so only
do this for messages that must absolutely in all cases be delivered.
- the `key` field contains the actual message key, which corresponds to the
MQTT topic on which the message will be exposed on the protocol adapter,
minus the prefix and stream name. For example, a message that is exposed on
the MQTT side on topic `/tt/foo/a/b/c`, corresponds with a message on DSH
stream `foo` with key `a/b/c` (note that there is no `/` character in front
of the key)

**Note** The QoS field in the key envelope only applies to QoS on the MQTT
side: once a message has been ingested on a Kafka stream, it is reliably
available to all Kafka (i.e. on-platform) consumers, regardless of the QoS
value.

### Data envelopes

Data envelopes are defined in the protocol buffers IDL as follows:

```protobuf
syntax = "proto3";
message DataEnvelope {
oneof kind { // main payload for data messages;
// leave empty for DELETE semantics in Latest Value Store
bytes payload = 1;
}
map<string, string> tracing = 2; // tracing data: used for passing span contexts between applications
// tenant-specific fields are ONLY allowed in the 500-1000 range
}
```

- the `payload` field contains the actual message payload. Note the use of the
`oneof` construct: this allows to make a distinction between a data envelope
with empty payload and a data envelope with _no_ payload. The difference
between the two is relevant for the interpretation of the message by the
Latest Value Store and the MQTT protocol adapter. See
[below](#empty-payload) for a detailed discussion.
- the `tracing` field is used to transport span contexts between different
platform components. See the [Distributed Tracing](tracing.md) document for
more details.

#### Empty payload

There are three ways in which one could conceivably encode a DELETE message
(i.e. a message that removes a previously retained message from the Latest
Value Store) as a Kafka message:

- as a Kafka message with a properly enveloped key, and a `null` value.
**This is not allowed. Messages on public streams must _always_ have a
properly enveloped value.**
- as a Kafka message with a properly enveloped key, and a data envelope value
that has a zero-length binary payload. **This will not be treated as a
DELETE message, but rather as a pathological case of a regular message with
a zero-length content.** In other words, injecting such a message on a Kafka
stream will cause the message (if it is marked as retained) to be stored in
the Latest Value Store.
- as a Kafka message with a properly enveloped key, and a data envelope value
that has an _empty_ `kind`. **This is the only correct way to encode a
DELETE message on Kafka.** Upon receipt of such a message, the Latest Value
Store will remove the corresponding key from its state store entirely.

The following code fragment illustrates the three alternatives in Java:

```java
KeyEnvelope key = KeyEnvelope.newBuilder()
.setKey("a/b/c/d")
.setHeader(KeyHeader.newBuilder()
.setQosValue(0)
.setRetained(true)
.setIdentifier(Identity.newBuilder()
.setTenant("foo")
.setPublisher("bar")
)
).build();

/* NOT ALLOWED: Kafka message with null value on public stream */
producer.send(new ProducerMessage<KeyEnvelope, DataEnvelope>(topic, key, null));

/* will not act as a DELETE message: zero-length binary payload */
DataEnvelope data1 = DataEnvelope.newBuilder()
.setPayload(ByteString.EMPTY)
.build();
producer.send(new ProducerMessage<KeyEnvelope, DataEnvelope>(topic, key, data1));

/* will act as a proper DELETE message: empty payload */
DataEnvelope data2 = DataEnvelope.newBuilder()
.clearKind()
.build();
producer.send(new ProducerMessage<KeyEnvelope, DataEnvelope>(topic, key, data2));
```
Loading

0 comments on commit dd4a725

Please sign in to comment.