Skip to content
This repository has been archived by the owner on Nov 28, 2023. It is now read-only.

Update of Dropwizard Extra for newer Kafka, dropwizard, and asynchbase #38

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dropwizard-extra-curator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.datasift.dropwizard</groupId>
<artifactId>dropwizard-extra</artifactId>
<version>0.7.1-2-SNAPSHOT</version>
<version>0.9.1-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

/**
* A factory for creating and managing {@link CuratorFramework} instances.
* <p/>
* <p>
* The resulting {@link CuratorFramework} will have its lifecycle managed by the {@link Environment}
* and will have {@link com.codahale.metrics.health.HealthCheck}s installed for the underlying ZooKeeper
* ensemble.
Expand Down Expand Up @@ -118,7 +118,7 @@ public void setMaxRetries(final int maxRetries) {

/**
* Returns the initial time to wait before retrying a failed connection.
* <p/>
* <p>
* Subsequent retries will wait an exponential amount of time more than this.
*
* @return the initial time to wait before trying to connect again.
Expand All @@ -130,7 +130,7 @@ public Duration getBackOffBaseTime() {

/**
* Sets the initial time to wait before retrying a failed connection.
* <p/>
* <p>
* Subsequent retries will wait an exponential amount of time more than this.
*
* @param backOffBaseTime the initial time to wait before trying to connect again.
Expand All @@ -142,7 +142,7 @@ public void setBackOffBaseTime(final Duration backOffBaseTime) {

/**
* Returns a {@link RetryPolicy} for handling failed connection attempts.
* <p/>
* <p>
* Always configures an {@link ExponentialBackoffRetry} based on the {@link #getMaxRetries()
* maximum retries} and {@link #getBackOffBaseTime() initial back-off} configured.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

/**
* Provides integration for Dropwizard's ZooKeeper functionality with Curator.
* <p/>
* <p>
* This ensures that {@link ZooKeeper} instances created by Curator integrate properly with the
* Dropwizard application life-cycle.
*/
Expand Down
4 changes: 2 additions & 2 deletions dropwizard-extra-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.datasift.dropwizard</groupId>
<artifactId>dropwizard-extra</artifactId>
<version>0.7.1-2-SNAPSHOT</version>
<version>0.9.1-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -30,7 +30,7 @@
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
<version>1.4.1</version>
<version>1.7.0</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@

/**
* An {@link HBaseClient} that constrains the maximum number of concurrent asynchronous requests.
* <p/>
* <p>
* This client places an upper-bounds on the number of concurrent asynchronous requests awaiting
* completion. When this limit is reached, subsequent requests will block until an existing request
* completes.
* <p/>
* <p>
* This behaviour is particularly useful for throttling high-throughput applications where HBase is
* the bottle-neck. Without backing-off, such an application may run out of memory. By constraining
* the maximum number of requests to a sufficiently high limit, but low enough so that it can be
* reached without running out of memory, such applications can organically throttle and back-off
* their requests.
* <p/>
* <p>
* Book-keeping of in-flight requests is done using a {@link Semaphore} which is configured as
* "non-fair" to reduce its impact on request throughput.
*/
Expand Down Expand Up @@ -55,7 +55,7 @@ public BoundedHBaseClient(final HBaseClient client, final int maxRequests) {
/**
* Create a new instance with the given semaphore for the given underlying {@link HBaseClient}
* implementation.
* <p/>
* <p>
* <i>Note: this is only really useful for sharing a {@link Semaphore} between two {@link
* BoundedHBaseClient} instances, which only really makes sense for instances configured for
* the same cluster, but with different client-side settings. <b>Use with caution!!</b></i>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

/**
* Client for interacting with an HBase cluster.
* <p/>
* <p>
* To create an instance, use {@link HBaseClientFactory}.
* <p/>
* <p>
* All implementations are wrapper proxies around {@link org.hbase.async.HBaseClient} providing
* additional functionality.
*
Expand Down Expand Up @@ -172,6 +172,7 @@ public interface HBaseClient {
* Ensures that a specific table exists.
*
* @param table the table to check.
* @param family the family to check.
*
* @return a {@link Deferred} indicating the completion of the assertion.
*
Expand All @@ -186,6 +187,7 @@ public interface HBaseClient {
* Ensures that a specific table exists.
*
* @param table the table to check.
* @param family the family to check.
*
* @return a {@link Deferred} indicating the completion of the assertion.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@

/**
* A factory for creating and managing {@link HBaseClient} instances.
* <p/>
* <p>
* The resulting {@link HBaseClient} will have its lifecycle managed by an {@link Environment} and
* will have {@link com.codahale.metrics.health.HealthCheck}s installed for the {@code .META.} and
* {@code -ROOT-} tables.
* will have {@link com.codahale.metrics.health.HealthCheck}s installed for the {@code hbase:meta}.
*
* @see HBaseClient
*/
Expand Down Expand Up @@ -90,7 +89,7 @@ public void setFlushInterval(final Duration flushInterval) {

/**
* Returns the maximum size of the buffer for increment operations.
* <p/>
* <p>
* Once this buffer is full, a flush is forced irrespective of the {@link #getFlushInterval()
* flushInterval}.
*
Expand All @@ -105,7 +104,7 @@ public Size getIncrementBufferSize() {

/**
* Sets the maximum size of the buffer for increment operations.
* <p/>
* <p>
* Once this buffer is full, a flush is forced irrespective of the {@link #getFlushInterval()
* flushInterval}.
*
Expand All @@ -120,10 +119,10 @@ public void setIncrementBufferSize(final Size incrementBufferSize) {

/**
* Returns maximum number of concurrent asynchronous requests for the client.
* <p/>
* <p>
* Useful for throttling high-throughput applications when HBase is the bottle-neck to prevent
* the client running out of memory.
* <p/>
* <p>
* With this is zero ("0"), no limit will be placed on the number of concurrent asynchronous
* requests.
*
Expand All @@ -138,10 +137,10 @@ public int getMaxConcurrentRequests() {

/**
* Sets the maximum number of concurrent asynchronous requests for the client.
* <p/>
* <p>
* Useful for throttling high-throughput applications when HBase is the bottle-neck to prevent
* the client running out of memory.
* <p/>
* <p>
* With this is zero ("0"), no limit will be placed on the number of concurrent asynchronous
* requests.
*
Expand Down Expand Up @@ -228,9 +227,8 @@ public HBaseClient build(final Environment environment, final String name) {
client.setFlushInterval(getFlushInterval());
client.setIncrementBufferSize(getIncrementBufferSize());

// add healthchecks for META and ROOT tables
environment.healthChecks().register(name + "-meta", new HBaseHealthCheck(client, ".META."));
environment.healthChecks().register(name + "-root", new HBaseHealthCheck(client, "-ROOT-"));
// add healthchecks for hbase:meta table
environment.healthChecks().register(name + "-meta", new HBaseHealthCheck(client, "hbase:meta"));

// manage client
environment.lifecycle().manage(new ManagedHBaseClient(
Expand All @@ -241,11 +239,11 @@ public HBaseClient build(final Environment environment, final String name) {

/**
* Builds a new {@link HBaseClient} according to the given {@link HBaseClientFactory}.
* <p/>
* <p>
* If instrumentation {@link #instrumented is enabled} in the
* configuration, this will build an {@link InstrumentedHBaseClient} wrapping the given {@link
* HBaseClient}.
* <p/>
* <p>
* If instrumentation is not enabled, the given {@link HBaseClient} will be returned verbatim.
*
* @param client an underlying {@link HBaseClient} implementation.
Expand All @@ -263,10 +261,10 @@ private HBaseClient instrument(final HBaseClient client,

/**
* Builds a new {@link HBaseClient} according to the given {@link HBaseClientFactory}.
* <p/>
* <p>
* If the {@link #maxConcurrentRequests} is non-zero in the
* configuration, this will build a {@link BoundedHBaseClient} that wraps the given client.
* <p/>
* <p>
* If {@link #maxConcurrentRequests} is zero, the given {@link
* HBaseClient} will be returned verbatim.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

/**
* An {@link HBaseClient} that is instrumented with {@link Metric}s.
* <p/>
* <p>
* For each asynchronous request method, a {@link Timer} tracks the time taken for the request.
* <p/>
* <p>
* This implementation proxies all requests through an underlying {@link HBaseClient}; it merely
* layers instrumentation on top of the underlying {@link HBaseClient}.
*
Expand All @@ -38,9 +38,9 @@ public class InstrumentedHBaseClient implements HBaseClient {

/**
* Creates a new {@link InstrumentedHBaseClient} for the given underlying client.
* <p/>
* <p>
* Instrumentation will be registered with the given {@link MetricRegistry}.
* <p/>
* <p>
* A new {@link HBaseInstrumentation} container will be created for this {@link HBaseClient}
* with the given {@link MetricRegistry}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ public ManagedHBaseClient(final HBaseClient client, final Duration connectionTim
* To force the connection, we look for the prescence of the .META. table.
*
* @throws com.stumbleupon.async.TimeoutException if there is a problem connecting to HBase.
* @throws org.hbase.async.TableNotFoundException if the .META. table can't be found.
* @throws Exception if there is a problem verifying the .META. table exists.
* @throws org.hbase.async.TableNotFoundException if the hbase:meta table can't be found.
* @throws Exception if there is a problem verifying the hbase:meta table exists.
*/
public void start() throws Exception {
client.ensureTableExists(".META.").joinUninterruptibly(connectionTimeout.toMilliseconds());
client.ensureTableExists("hbase:meta").joinUninterruptibly(connectionTimeout.toMilliseconds());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class HBaseInstrumentation {

/**
* Initialises instrumentation for the given {@link HBaseClient} using the given {@link
* MetricsRegistry}.
* com.codahale.metrics.MetricRegistry}.
*
* @param client the client to create metrics for.
* @param registry the registry to register the metrics with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

/**
* A Scanner that constraints concurrent requests with a {@link Semaphore}.
* <p/>
* <p>
* To obtain an instance of a {@link RowScanner}, call {@link BoundedHBaseClient#scan(byte[])}.
*/
public class BoundedRowScanner implements RowScanner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

/**
* A {@link RowScanner} that is instrumented with {@link Metric}s.
* <p/>
* <p>
* To obtain an instance of a {@link RowScanner}, call {@link InstrumentedHBaseClient#scan(byte[])}.
*/
public class InstrumentedRowScanner implements RowScanner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

/**
* Client for scanning over a selection of rows.
* <p/>
* <p>
* To obtain an instance of a {@link RowScanner}, call {@link
* com.datasift.dropwizard.hbase.HBaseClient#scan(byte[])}.
* <p/>
* <p>
* All implementations are wrapper proxies around {@link org.hbase.async.Scanner} providing
* additional functionality.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

/**
* Client for scanning over a selection of rows.
* <p/>
* <p>
* To obtain an instance of a {@link RowScanner}, call {@link
* com.datasift.dropwizard.hbase.HBaseClient#scan(byte[])}.
* <p/>
* <p>
* This implementation is a proxy for a {@link org.hbase.async.Scanner}.
*/
public class RowScannerProxy implements RowScanner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,34 @@
import com.codahale.metrics.Timer;

/**
* A {@link Callback} for stopping a {@link TimerContext} on completion.
* A {@link com.stumbleupon.async.Callback} for stopping a {@link com.codahale.metrics.Timer.Context} on completion.
*/
public class TimerStoppingCallback<T> implements Callback<T, T> {

/**
* The context of the active {@link com.yammer.metrics.core.Timer} to stop.
* The context of the active {@link com.codahale.metrics.Timer} to stop.
*/
private final Timer.Context timer;

/**
* Creates a new {@link Callback} that stops the given active timer on completion.
*
* @param timer the active {@link com.yammer.metrics.core.Timer} to stop on completion of the
* @param timer the active {@link com.codahale.metrics.Timer} to stop on completion of the
* {@link Callback}.
*/
public TimerStoppingCallback(final Timer.Context timer) {
this.timer = timer;
}

/**
* Stops the registered {@link com.yammer.metrics.core.Timer} and proxies any argument through
* Stops the registered {@link com.codahale.metrics.Timer} and proxies any argument through
* verbatim.
*
* @param arg the argument (if any) to pass-through.
*
* @return the argument (if any), proxied verbatim.
*
* @throws Exception if an error occurs stopping the {@link com.yammer.metrics.core.Timer}.
* @throws Exception if an error occurs stopping the {@link com.codahale.metrics.Timer}.
*/
public T call(final T arg) throws Exception {
timer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

/**
* Tests {@link InstrumentedHBaseClient}.
* <p/>
* <p>
* Each method is tested first, that it proxies its implementation to the underlying {@link
* HBaseClient}, and then that the method is timed as expected.
*/
Expand Down
8 changes: 4 additions & 4 deletions dropwizard-extra-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.datasift.dropwizard</groupId>
<artifactId>dropwizard-extra</artifactId>
<version>0.7.1-2-SNAPSHOT</version>
<version>0.9.1-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -18,10 +18,10 @@
</description>

<properties>
<kafka.version>0.8.1.1</kafka.version>
<kafka.version>0.8.2.2</kafka.version>
<scala.version>2.10</scala.version>
</properties>

<!--
<dependencyManagement>
<dependencies>
<dependency>
Expand All @@ -31,7 +31,7 @@
</dependency>
</dependencies>
</dependencyManagement>

-->
<dependencies>
<dependency>
<groupId>io.dropwizard</groupId>
Expand Down
Loading