Skip to content

Commit

Permalink
Add Kinesis Firehose adapter and tests (#42)
Browse files Browse the repository at this point in the history
* Add Kinesis Firehose adapter and tests

* Simplify message structure and support configurable batch size through options

* Remove unused use statement

* Add (and fix) failed enqueue test

* Move enqueue failure test to integration tests
  • Loading branch information
joemeehan authored and Harry Bragg committed Oct 25, 2017
1 parent c1b2ad7 commit 3084e3f
Show file tree
Hide file tree
Showing 3 changed files with 357 additions and 0 deletions.
159 changes: 159 additions & 0 deletions src/Adapter/FirehoseAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
<?php

/**
* This file is part of graze/queue.
*
* Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* @license https://github.com/graze/queue/blob/master/LICENSE MIT
*
* @link https://github.com/graze/queue
*/

namespace Graze\Queue\Adapter;

use Aws\Firehose\FirehoseClient;
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
use Graze\Queue\Message\MessageFactoryInterface;
use Graze\Queue\Message\MessageInterface;

/**
* Amazon AWS Kinesis Firehose Adapter.
*
* This method only supports the enqueue method to send messages to a Kinesiss
* Firehose stream
*
* @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Firehose.FirehoseClient.html#putRecordBatch
*/
final class FirehoseAdapter implements AdapterInterface
{
const BATCHSIZE_SEND = 100;

/** @var FirehoseClient */
protected $client;

/** @var array */
protected $options;

/** @var string */
protected $deliveryStreamName;

/**
* @param FirehoseClient $client
* @param string $deliveryStreamName
* @param array $options - BatchSize <integer> The number of messages to send in each batch.
*/
public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = [])
{
$this->client = $client;
$this->deliveryStreamName = $deliveryStreamName;
$this->options = $options;
}

/**
* @param MessageInterface[] $messages
*
* @throws MethodNotSupportedException
*/
public function acknowledge(array $messages)
{
throw new MethodNotSupportedException(
__FUNCTION__,
$this,
$messages
);
}

/**
* @param MessageFactoryInterface $factory
* @param int $limit
*
* @throws MethodNotSupportedException
*/
public function dequeue(MessageFactoryInterface $factory, $limit)
{
throw new MethodNotSupportedException(
__FUNCTION__,
$this,
[]
);
}

/**
* @param MessageInterface[] $messages
*
* @throws FailedEnqueueException
*/
public function enqueue(array $messages)
{
$failed = [];
$batches = array_chunk(
$messages,
$this->getOption('BatchSize', self::BATCHSIZE_SEND)
);

foreach ($batches as $batch) {
$requestRecords = array_map(function (MessageInterface $message) {
return [
'Data' => $message->getBody()
];
}, $batch);

$request = [
'DeliveryStreamName' => $this->deliveryStreamName,
'Records' => $requestRecords,
];

$results = $this->client->putRecordBatch($request);

foreach ($results->get('RequestResponses') as $idx => $response) {
if (isset($response['ErrorCode'])) {
$failed[] = $batch[$idx];
}
}
}

if (!empty($failed)) {
throw new FailedEnqueueException($this, $failed);
}
}

/**
* @param string $name
* @param mixed $default
*
* @return mixed
*/
protected function getOption($name, $default = null)
{
return isset($this->options[$name]) ? $this->options[$name] : $default;
}

/**
* @throws MethodNotSupportedException
*/
public function purge()
{
throw new MethodNotSupportedException(
__FUNCTION__,
$this,
[]
);
}

/**
* @throws MethodNotSupportedException
*/
public function delete()
{
throw new MethodNotSupportedException(
__FUNCTION__,
$this,
[]
);
}
}
80 changes: 80 additions & 0 deletions tests/integration/FirehoseIntegrationTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?php

/**
* This file is part of graze/queue.
*
* Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* @license https://github.com/graze/queue/blob/master/LICENSE MIT
*
* @link https://github.com/graze/queue
*/

namespace Graze\Queue;

use Aws\ResultInterface;
use Aws\Firehose\FirehoseClient;
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
use Graze\Queue\Adapter\FirehoseAdapter;
use Mockery as m;
use Mockery\MockInterface;
use PHPUnit_Framework_TestCase as TestCase;

class FirehoseIntegrationTest extends TestCase
{
/** @var string */
private $deliveryStreamName;
/** @var FirehoseClient|MockInterface */
private $firehoseClient;
/** @var Client */
private $client;

public function setUp()
{
$this->deliveryStreamName = 'delivery_stream_foo';
$this->firehoseClient = m::mock(FirehoseClient::class);
$this->client = new Client(new FirehoseAdapter($this->firehoseClient, 'delivery_stream_foo'));
}

public function testSend()
{
$model = m::mock(ResultInterface::class);
$model->shouldReceive('get')->once()->with('RequestResponses')->andReturn([]);

$this->firehoseClient->shouldReceive('putRecordBatch')->once()->with([
'DeliveryStreamName' => $this->deliveryStreamName,
'Records' => [
['Data' => 'foo']
]
])->andReturn($model);

$this->client->send([$this->client->create('foo')]);
}

/**
* @expectedException \Graze\Queue\Adapter\Exception\FailedEnqueueException
*/
public function testSendError()
{
$model = m::mock(ResultInterface::class);
$model->shouldReceive('get')->once()->with('RequestResponses')->andReturn([
[
'ErrorCode' => 'fooError',
'ErrorMessage' => 'Some error message',
'RecordId' => 'foo',
]
]);

$this->firehoseClient->shouldReceive('putRecordBatch')->once()->with([
'DeliveryStreamName' => $this->deliveryStreamName,
'Records' => [
['Data' => 'foo'],
],
])->andReturn($model);

$this->client->send([$this->client->create('foo')]);
}
}
118 changes: 118 additions & 0 deletions tests/unit/Adapter/FirehoseAdapterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
<?php

/**
* This file is part of graze/queue.
*
* Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* @license https://github.com/graze/queue/blob/master/LICENSE MIT
*
* @link https://github.com/graze/queue
*/

namespace Graze\Queue\Adapter;

use Aws\ResultInterface;
use Aws\Firehose\FirehoseClient;
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
use Graze\Queue\Message\MessageFactoryInterface;
use Graze\Queue\Message\MessageInterface;
use Mockery as m;
use Mockery\MockInterface;
use PHPUnit_Framework_TestCase as TestCase;

class FirehoseAdapterTest extends TestCase
{
/** @var MessageInterface|MockInterface */
private $messageA;
/** @var MessageInterface|MockInterface */
private $messageB;
/** @var MessageInterface|MockInterface */
private $messageC;
/** @var MessageInterface[]|MockInterface[] */
private $messages;
/** @var ResultInterface|MockInterface */
private $model;
/** @var MessageFactoryInterface|MockInterface */
private $factory;
/** @var FirehoseClient */
private $client;

public function setUp()
{
$this->client = m::mock(FirehoseClient::class);
$this->model = m::mock(ResultInterface::class);
$this->factory = m::mock(MessageFactoryInterface::class);

$this->messageA = $a = m::mock(MessageInterface::class);
$this->messageB = $b = m::mock(MessageInterface::class);
$this->messageC = $c = m::mock(MessageInterface::class);
$this->messages = [$a, $b, $c];
}

public function testInterface()
{
assertThat(new FirehoseAdapter($this->client, 'foo'), is(anInstanceOf('Graze\Queue\Adapter\AdapterInterface')));
}

public function testEnqueue()
{
$adapter = new FirehoseAdapter($this->client, 'foo');

$this->messageA->shouldReceive('getBody')->once()->withNoArgs()->andReturn('foo');
$this->messageB->shouldReceive('getBody')->once()->withNoArgs()->andReturn('bar');
$this->messageC->shouldReceive('getBody')->once()->withNoArgs()->andReturn('baz');

$this->model->shouldReceive('get')->once()->with('RequestResponses')->andReturn([]);

$this->client->shouldReceive('putRecordBatch')->once()->with([
'DeliveryStreamName' => 'foo',
'Records' => [
['Data' => 'foo'],
['Data' => 'bar'],
['Data' => 'baz'],
],
])->andReturn($this->model);

$adapter->enqueue($this->messages);
}

/**
* @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException
*/
public function testAcknowledge()
{
$adapter = new FirehoseAdapter($this->client, 'foo');
$adapter->acknowledge($this->messages);
}

/**
* @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException
*/
public function testDequeue()
{
$adapter = new FirehoseAdapter($this->client, 'foo');
$adapter->dequeue($this->factory, 10);
}

/**
* @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException
*/
public function testPurge()
{
$adapter = new FirehoseAdapter($this->client, 'foo');
$adapter->purge();
}

/**
* @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException
*/
public function testDelete()
{
$adapter = new FirehoseAdapter($this->client, 'foo');
$adapter->delete();
}
}

0 comments on commit 3084e3f

Please sign in to comment.