Skip to content

Commit

Permalink
Fix: EntityTrackerSubscriber to track entities separately for each co…
Browse files Browse the repository at this point in the history
…nnection

Fix: Index managers for the same connection to reuse the same ConnectionManager instance
  • Loading branch information
pmishev committed Jun 17, 2019
1 parent 555bf15 commit 311da4a
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 22 deletions.
4 changes: 2 additions & 2 deletions DependencyInjection/Compiler/AddConnectionsPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function process(ContainerBuilder $container)
$connectionDefinition->setFactory(
[
new Reference('sfes.connection_factory'),
'createConnectionManager'
'createConnectionManager',
]
);

Expand All @@ -42,7 +42,7 @@ public function process(ContainerBuilder $container)
$connectionDefinition
);

if ($connectionName === 'default') {
if ('default' === $connectionName) {
$container->setAlias('sfes.connection', 'sfes.connection.default');
}
}
Expand Down
6 changes: 2 additions & 4 deletions DependencyInjection/Compiler/AddIndexManagersPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use Symfony\Component\DependencyInjection\Reference;

/**
* Compiles elastic search data.
* Registers index manager service definitions
*/
class AddIndexManagersPass implements CompilerPassInterface
{
Expand All @@ -27,9 +27,7 @@ public function process(ContainerBuilder $container)
// Make sure the connection service definition exists
$connectionService = sprintf('sfes.connection.%s', $indexSettings['connection']);
if (!$container->hasDefinition($connectionService)) {
throw new InvalidConfigurationException(
'There is no ES connection with name '.$indexSettings['connection']
);
throw new InvalidConfigurationException(sprintf('There is no ES connection with name %s', $indexSettings['connection']));
}

$indexManagerClass = $container->getParameter('sfes.index_manager.class');
Expand Down
7 changes: 7 additions & 0 deletions Document/Repository/Repository.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class Repository
*/
protected $metadata;

/**
* @var DocumentMetadataCollector
*/
protected $documentMetadataCollector;

/**
* Constructor.
*
Expand Down Expand Up @@ -90,6 +95,7 @@ public function getById($id, $resultType = Finder::RESULTS_OBJECT)
* @param int $resultsType Bitmask value determining how the results are returned
* @param array $additionalRequestParams Additional params to pass to the ES client's search() method
* @param int $totalHits The total hits of the query response
*
* @return mixed
*/
public function find(array $searchBody, $resultsType = Finder::RESULTS_OBJECT, array $additionalRequestParams = [], &$totalHits = null)
Expand All @@ -102,6 +108,7 @@ public function find(array $searchBody, $resultsType = Finder::RESULTS_OBJECT, a
*
* @param array $searchBody
* @param array $additionalRequestParams
*
* @return int
*/
public function count(array $searchBody = [], array $additionalRequestParams = [])
Expand Down
20 changes: 18 additions & 2 deletions Event/PostCommitEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Sineflow\ElasticsearchBundle\Event;

use Sineflow\ElasticsearchBundle\Manager\ConnectionManager;
use Symfony\Component\EventDispatcher\Event;

/**
Expand All @@ -15,11 +16,18 @@ class PostCommitEvent extends Event
private $bulkResponse;

/**
* @param array $bulkResponse
* @var string
*/
public function __construct(array $bulkResponse)
private $connectionName;

/**
* @param array $bulkResponse
* @param ConnectionManager $connectionManager
*/
public function __construct(array $bulkResponse, ConnectionManager $connectionManager)
{
$this->bulkResponse = $bulkResponse;
$this->connectionName = $connectionManager->getConnectionName();
}

/**
Expand All @@ -29,4 +37,12 @@ public function getBulkResponse()
{
return $this->bulkResponse;
}

/**
* @return string
*/
public function getConnectionName()
{
return $this->connectionName;
}
}
23 changes: 19 additions & 4 deletions Event/PrePersistEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Sineflow\ElasticsearchBundle\Event;

use Sineflow\ElasticsearchBundle\Document\DocumentInterface;
use Sineflow\ElasticsearchBundle\Manager\ConnectionManager;
use Symfony\Component\EventDispatcher\Event;

/**
Expand All @@ -15,19 +16,25 @@ class PrePersistEvent extends Event
*/
private $document;

/**
* @var string
*/
private $connectionName;

/**
* @var int
*/
private $bulkOperationIndex;

/**
* @param DocumentInterface $document
* @param int $bulkOperationIndex
* @param ConnectionManager $connectionManager
*/
public function __construct(DocumentInterface $document, $bulkOperationIndex)
public function __construct(DocumentInterface $document, ConnectionManager $connectionManager)
{
$this->document = $document;
$this->bulkOperationIndex = $bulkOperationIndex;
$this->document = $document;
$this->connectionName = $connectionManager->getConnectionName();
$this->bulkOperationIndex = $connectionManager->getBulkOperationsCount();
}

/**
Expand All @@ -38,6 +45,14 @@ public function getDocument()
return $this->document;
}

/**
* @return string
*/
public function getConnectionName()
{
return $this->connectionName;
}

/**
* @return int
*/
Expand Down
6 changes: 5 additions & 1 deletion Manager/ConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public function commit($forceRefresh = true)
}

if ($this->eventDispatcher) {
$this->eventDispatcher->dispatch(Events::POST_COMMIT, new PostCommitEvent($response));
$this->eventDispatcher->dispatch(Events::POST_COMMIT, new PostCommitEvent($response, $this));
}
}

Expand Down Expand Up @@ -329,7 +329,9 @@ public function getAliases()
*
* $params['index'] = (list) A comma-separated list of indices/aliases to check (Required)
* @param array $params Associative array of parameters
*
* @return bool
*
* @throws InvalidArgumentException
*/
public function existsIndexOrAlias(array $params)
Expand Down Expand Up @@ -370,7 +372,9 @@ public function existsIndexOrAlias(array $params)
*
* @param array $params
* $params['name'] = (list) A comma-separated list of alias names to return (Required)
*
* @return bool
*
* @throws InvalidArgumentException
*/
public function existsAlias(array $params)
Expand Down
17 changes: 16 additions & 1 deletion Manager/ConnectionManagerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ConnectionManagerFactory
private $tracer;

/**
* @var boolean
* @var bool
*/
private $kernelDebug;

Expand All @@ -32,6 +32,13 @@ class ConnectionManagerFactory
*/
private $eventDispatcher;

/**
* Array to keep track of already created connection managers, so the same instance is returned for subsequent service requests
*
* @var array ConnectionManager[]
*/
private $connectionManagers = [];

/**
* @param boolean $kernelDebug
* @param LoggerInterface $tracer
Expand Down Expand Up @@ -63,10 +70,16 @@ public function setEventDispatcher($eventDispatcher)
/**
* @param string $connectionName
* @param array $connectionSettings
*
* @return ConnectionManager
*/
public function createConnectionManager($connectionName, $connectionSettings)
{
// If we already have a ConnectionManager instance for the required connection, do not create a new one
if (isset($this->connectionManagers[$connectionName])) {
return $this->connectionManagers[$connectionName];
}

$clientBuilder = ClientBuilder::create();

$clientBuilder->setHosts($connectionSettings['hosts']);
Expand All @@ -88,6 +101,8 @@ public function createConnectionManager($connectionName, $connectionSettings)
$connectionManager->setLogger($this->logger ?: new NullLogger());
$connectionManager->setEventDispatcher($this->eventDispatcher);

$this->connectionManagers[$connectionName] = $connectionManager;

return $connectionManager;
}
}
3 changes: 1 addition & 2 deletions Manager/IndexManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,7 @@ public function update($documentClass, $id, array $fields = [], $script = null,
public function persist(DocumentInterface $document, array $metaParams = [])
{
if ($this->eventDispatcher) {
$bulkOperationIndex = $this->getConnection()->getBulkOperationsCount();
$this->eventDispatcher->dispatch(Events::PRE_PERSIST, new PrePersistEvent($document, $bulkOperationIndex));
$this->eventDispatcher->dispatch(Events::PRE_PERSIST, new PrePersistEvent($document, $this->getConnection()));
}

$documentArray = $this->documentConverter->convertToArray($document);
Expand Down
1 change: 1 addition & 0 deletions Manager/IndexManagerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public function setEventDispatcher($eventDispatcher)
* @param string $managerName
* @param ConnectionManager $connection
* @param array $indexSettings
*
* @return IndexManager
*/
public function createManager(
Expand Down
16 changes: 12 additions & 4 deletions Subscriber/EntityTrackerSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public static function getSubscribedEvents()

/**
* @param PrePersistEvent $prePersistEvent
*
* @throws \ReflectionException
*/
public function onPrePersist(PrePersistEvent $prePersistEvent)
{
Expand All @@ -56,8 +58,8 @@ public function onPrePersist(PrePersistEvent $prePersistEvent)
);
if (isset($propertiesMetadata['_id'])) {
$bulkOperationIndex = $prePersistEvent->getBulkOperationIndex();
$this->entitiesData[$bulkOperationIndex]['entity'] = $prePersistEvent->getDocument();
$this->entitiesData[$bulkOperationIndex]['metadata'] = $propertiesMetadata;
$this->entitiesData[$prePersistEvent->getConnectionName()][$bulkOperationIndex]['entity'] = $prePersistEvent->getDocument();
$this->entitiesData[$prePersistEvent->getConnectionName()][$bulkOperationIndex]['metadata'] = $propertiesMetadata;
}
}

Expand All @@ -66,7 +68,13 @@ public function onPrePersist(PrePersistEvent $prePersistEvent)
*/
public function onPostCommit(PostCommitEvent $postCommitEvent)
{
foreach ($this->entitiesData as $bulkOperationIndex => $entityData) {
// No need to do anything if there are no persisted entities for that connection
if (empty($this->entitiesData[$postCommitEvent->getConnectionName()])) {
return;
}

// Update the ids of persisted entity objects
foreach ($this->entitiesData[$postCommitEvent->getConnectionName()] as $bulkOperationIndex => $entityData) {
$idValue = current($postCommitEvent->getBulkResponse()['items'][$bulkOperationIndex])['_id'];
$idPropertyMetadata = $entityData['metadata']['_id'];
$entity = $entityData['entity'];
Expand All @@ -78,6 +86,6 @@ public function onPostCommit(PostCommitEvent $postCommitEvent)
}

// Clear the array to avoid any memory leaks
$this->entitiesData = [];
$this->entitiesData[$postCommitEvent->getConnectionName()] = [];
}
}
31 changes: 30 additions & 1 deletion Tests/Functional/Subscriber/EntityTrackerSubscriberTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Sineflow\ElasticsearchBundle\Finder\Finder;
use Sineflow\ElasticsearchBundle\Tests\AbstractElasticsearchTestCase;
use Sineflow\ElasticsearchBundle\Tests\app\fixture\Acme\FooBundle\Document\Customer;
use Sineflow\ElasticsearchBundle\Tests\app\fixture\Acme\FooBundle\Document\Log;

/**
* Class EntityTrackerSubscriberTest
Expand All @@ -19,22 +20,47 @@ public function testPersistWithSeveralBulkOps()
$converter = $this->getContainer()->get('sfes.document_converter');

$imWithAliases = $this->getIndexManager('customer');
$imWithAliases->getConnection()->setAutocommit(false);

// Another index manager on the same connection
$imNoAliases = $this->getIndexManager('bar');

// Make sure both index managers share the same connection object instance
$this->assertSame($imWithAliases->getConnection(), $imNoAliases->getConnection());

$imNoAliases->getConnection()->setAutocommit(false);

// Index manager on another connection
$backupIm = $this->getIndexManager('backup');
$backupIm->getConnection()->setAutocommit(false);

// Make sure this index manager has a separate connection manager
$this->assertNotSame($imWithAliases->getConnection(), $backupIm->getConnection());


// Persist raw document - ignored by the subscriber as there's no entity to update
$rawCustomer = new Customer();
$rawCustomer->name = 'firstRaw';
$documentArray = $converter->convertToArray($rawCustomer);
$imWithAliases->persistRaw('AcmeFooBundle:Customer', $documentArray);

// Persist entity - handled by the subscriber
$customer = new Customer();
$customer->name = 'batman';
$imWithAliases->persist($customer);

// Persist another raw document - ignored by the subscriber as there's no entity to update
$secondRawCustomer = new Customer();
$secondRawCustomer->name = 'secondRaw';
$documentArray = $converter->convertToArray($secondRawCustomer);
$imWithAliases->persistRaw('AcmeFooBundle:Customer', $documentArray);

// Persist an entity to another connection to make sure the subscriber handles the 2 commits independently
$log = new Log();
$log->id = 123;
$log->entry = 'test log entry';
$backupIm->persist($log);

// Persist another entity to the first connection - handled by the subscriber
$secondCustomer = new Customer();
$secondCustomer->id = '555';
$secondCustomer->name = 'joker';
Expand All @@ -45,12 +71,15 @@ public function testPersistWithSeveralBulkOps()
$this->assertNull($secondRawCustomer->id);
$this->assertEquals('555', $secondCustomer->id);


$imWithAliases->getConnection()->commit();
$backupIm->getConnection()->commit();

$this->assertNull($rawCustomer->id, 'id should not have been set');
$this->assertNotNull($customer->id, 'id should have been set');
$this->assertNull($secondRawCustomer->id, 'id should not have been set');
$this->assertEquals('555', $secondCustomer->id);
$this->assertEquals(123, $log->id);

// Get the customer from ES by name
$finder = $this->getContainer()->get('sfes.finder');
Expand Down
11 changes: 11 additions & 0 deletions Tests/app/config/config_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ sineflow_elasticsearch:
logging: true
bulk_batch_size: 100

backup_conn:
hosts: '%elasticsearch_hosts%'
profiling: false
logging: false

indices:
_base:
connection: default
Expand Down Expand Up @@ -53,6 +58,12 @@ sineflow_elasticsearch:
types:
- AcmeBarBundle:Product

backup:
connection: backup_conn
name: sineflow-esb-backup
types:
- AcmeFooBundle:Log

services:
app.es.language_provider:
class: Sineflow\ElasticsearchBundle\Tests\app\fixture\Acme\BarBundle\LanguageProvider
Expand Down
Loading

0 comments on commit 311da4a

Please sign in to comment.