Skip to content

Commit

Permalink
'Version 1.1.1 of the Amazon Kinesis Client Library'
Browse files Browse the repository at this point in the history
  • Loading branch information
kumarumesh committed Sep 11, 2014
1 parent 13aad26 commit 5000008
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 61 deletions.
4 changes: 2 additions & 2 deletions META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.0.0
Bundle-Version: 1.1.1
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0",
Expand All @@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0",
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.1.1",
org.apache.httpcomponents.httpcore;bundle-version="4.2.0",
org.apache.httpcomponents.httpclient;bundle-version="4.2.0"
com.amazonaws.sdk;bundle-version="1.6.9",
com.amazonaws.sdk;bundle-version="1.7.13",
Export-Package: com.amazonaws.services.kinesis,
com.amazonaws.services.kinesis.clientlibrary,
com.amazonaws.services.kinesis.clientlibrary.exceptions,
Expand Down
40 changes: 1 addition & 39 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name>
<version>1.1.0</version>
<version>1.1.1</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
<url>https://aws.amazon.com/kinesis</url>

Expand All @@ -24,7 +24,6 @@

<properties>
<aws-java-sdk.version>1.7.13</aws-java-sdk.version>
<jackson.version>2.1.1</jackson.version>
</properties>

<dependencies>
Expand All @@ -33,43 +32,6 @@
<artifactId>aws-java-sdk</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<type>jar</type>
<scope>compile</scope>
</dependency>

</dependencies>

<developers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.0";
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.1";

/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,13 @@ ShardConsumerState getCurrentState() {
return currentState;
}

/**
* Private/Internal method - has package level access solely for testing purposes.
*
* @return the beginShutdown
*/
boolean isBeginShutdown() {
return beginShutdown;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* Used to pass shard related info among different classes and as a key to the map of shard consumers.
*/
class ShardInfo {

private final String shardId;
private final String concurrencyToken;
// Sorted list of parent shardIds.
Expand All @@ -41,6 +41,8 @@ public ShardInfo(String shardId, String concurrencyToken, Collection<String> par
if (parentShardIds != null) {
this.parentShardIds.addAll(parentShardIds);
}
// ShardInfo stores parent shard Ids in canonical order in the parentShardIds list.
// This makes it easy to check for equality in ShardInfo.equals method.
Collections.sort(this.parentShardIds);
}

Expand Down Expand Up @@ -83,6 +85,14 @@ public int hashCode() {
*/
// CHECKSTYLE:OFF CyclomaticComplexity
// CHECKSTYLE:OFF NPathComplexity
/**
* This method assumes parentShardIds is ordered. The Worker.cleanupShardConsumers() method relies on this method
* returning true for ShardInfo objects which may have been instantiated with parentShardIds in a different order
* (and rest of the fields being the equal). For example shardInfo1.equals(shardInfo2) should return true with
* shardInfo1 and shardInfo2 defined as follows.
* ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2"));
* ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand Down Expand Up @@ -118,8 +128,14 @@ public boolean equals(Object obj) {
}
return true;
}

// CHECKSTYLE:ON CyclomaticComplexity
// CHECKSTYLE:ON NPathComplexity


@Override
public String toString() {
return "ShardInfo [shardId=" + shardId + ", concurrencyToken=" + concurrencyToken + ", parentShardIds="
+ parentShardIds + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public class Worker implements Runnable {
private boolean shutdown;

// Holds consumers for shards the worker is currently tracking. Key is shard
// id, value is ShardConsumer.
private ConcurrentMap<String, ShardConsumer> shardIdShardConsumerMap =
new ConcurrentHashMap<String, ShardConsumer>();
// info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap =
new ConcurrentHashMap<ShardInfo, ShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion;

/**
Expand Down Expand Up @@ -328,7 +328,7 @@ public void run() {
while (!shutdown) {
try {
boolean foundCompletedShard = false;
Set<String> assignedShardIds = new HashSet<String>();
Set<ShardInfo> assignedShards = new HashSet<ShardInfo>();
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory);
if (shardConsumer.isShutdown()
Expand All @@ -337,15 +337,15 @@ public void run() {
} else {
shardConsumer.consumeShard();
}
assignedShardIds.add(shardInfo.getShardId());
assignedShards.add(shardInfo);
}

if (foundCompletedShard) {
controlServer.syncShardAndLeaseInfo(null);
}

// clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShardIds);
cleanupShardConsumers(assignedShards);

wlog.info("Sleeping ...");
Thread.sleep(idleTimeInMilliseconds);
Expand Down Expand Up @@ -415,14 +415,24 @@ private void initialize() {
}
}

private void cleanupShardConsumers(Set<String> assignedShardIds) {
for (String shardId : shardIdShardConsumerMap.keySet()) {
if (!assignedShardIds.contains(shardId)) {
/**
* NOTE: This method is internal/private to the Worker class. It has package
* access solely for testing.
*
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows.
* ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2"));
* ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
*/
void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) {
if (!assignedShards.contains(shard)) {
// Shutdown the consumer since we are not longer responsible for
// the shard.
boolean isShutdown = shardIdShardConsumerMap.get(shardId).beginShutdown();
boolean isShutdown = shardInfoShardConsumerMap.get(shard).beginShutdown();
if (isShutdown) {
shardIdShardConsumerMap.remove(shardId);
shardInfoShardConsumerMap.remove(shard);
}
}
}
Expand Down Expand Up @@ -468,9 +478,8 @@ public void shutdown() {
* @return ShardConsumer for the shard
*/
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
synchronized (shardIdShardConsumerMap) {
String shardId = shardInfo.getShardId();
ShardConsumer consumer = shardIdShardConsumerMap.get(shardId);
synchronized (shardInfoShardConsumerMap) {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
// Instantiate a new consumer if we don't have one, or the one we
// had was from an earlier
// lease instance (and was shutdown). Don't need to create another
Expand All @@ -491,9 +500,8 @@ ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFact
executorService,
metricsFactory,
taskBackoffTimeMillis);
shardIdShardConsumerMap.put(shardId, consumer);
wlog.infoForce("Created new shardConsumer for shardId: " + shardId + ", concurrencyToken: "
+ shardInfo.getConcurrencyToken());
shardInfoShardConsumerMap.put(shardInfo, consumer);
wlog.infoForce("Created new shardConsumer for : " + shardInfo);
}
return consumer;
}
Expand Down

0 comments on commit 5000008

Please sign in to comment.