Motivation
Some colleagues of mine use a database and Elasticsearch(ELK) together where the database is the main data source, and ELK is the search facade: data is first stored in the database, and then the changes are written to the index.
Here is a typical example of such code that uses Spring Data:
@Override @Transactional public ProductDto update(ProductDto dto) { var jpaEntity = jpaRepository.save(dto2jpa(Objects.requireNonNull(dto))); var elkEntity = elkRepository.save(jpa2elk(jpaEntity)); return elk2dto(elkEntity); }
Unfortunately, this approach does not guarantee the consistency of the database and the index, not only permanently, but also with a delay. Namely, there may be such a moment that the state of the database at that moment will not correspond to the state of the index at any moment. And vice versa. In particular, some changes in the database may never reach the index.
Since I’ve seen code similar to the above in several projects, I thought a little project showing how sync could be done would be helpful.
This is an example of such a Java project using Postgresql CDC(Change Data Capture), Elasticsearch, String Boot, String Data(JPA, Elasticsearch). The project does not use any external components such as Kafka, etc. Only the ones listed. There are ten classes in total.
Configuration
In order to see how the project works, you must first set up the database.
Open some client (psql, DBeaver, etc) and run the command
SHOW wal_level;
If the value is not "logical", then you need to execute the command
ALTER SYSTEM SET wal_level = logical;
Then you need to restart the database server or a container with it (at least for PostgreSQL 14).
After that, open the client, look at the available slots
SELECT * FROM pg_replication_slots;
and create a slot with a unique name
SELECT * FROM pg_create_logical_replication_slot('elk_slot', 'test_decoding', false, true);
The unique name of the slot (in the example 'elk_slot') can be chosen by yourself.
Let’s leave the plugin name ('test_decoding') as it is for now.
The rest of the settings can be left unchanged.
In order to return to the previous configuration, you must delete the created slot
SELECT * FROM pg_drop_replication_slot ( 'elk_slot' );
restore the previous value of wal_level
ALTER SYSTEM RESET wal_level;
and restart the server or the container again.
Project description
To make the project compact, but still fully functional, I limited myself to working with one entity - Product, which corresponds to the table of the same name in the database (see script resources/create_table.sql), the index of the same name and three Java classes: ProductDto, JPA entity ProductDB and ELK document ProductELK.
Two repositories are used to work with the entity
ProductJPARepository extends JpaRepository<ProductDB, Long>
ProductELKRepository extends ElasticsearchRepository<ProductELK,Long>
The ProductVanillaService service works with these repositories. This service does not care about database and index synchronization. The code from the Motivation section is taken from this service.
The controller that works with the service is ProductController. Only CRUD endpoints are implemented for brevity.
If the application is launched with the "vanilla" profile, then it will work without synchronization.
In order to enable synchronization, you need to run the application with the “sync” profile.
To keep the database and the index in sync ProductCDCService service is used instead of ProductVanillaService. The sync version uses the same repositories as the vanilla one.
Here is an example of processing data changes from ProductCDCService, similar to the example given in the Motivation section.
@Override @Transactional(propagation= Propagation.NEVER) public ProductDto update(ProductDto dto) { var id = Objects.requireNonNull(Objects.requireNonNull(dto).getId()); jpaRepository.save(dto2jpa(dto)); processNextCDCChunk(); return elkRepository.findById(id).map(ProductMapper::elk2dto).orElse(null); }
The both services work with the database in the same way. Except that the methods of the vanilla service are transactional and therefore saving data to the index occurs in the same transaction as saving data to the database.
The methods of the ProductCDCService service are not transactional.
Also, the ProductCDCService methods, unlike ProductVanillaService, do not store data in the index. Instead, it calls processNextCDCChunk() method, which starts scanning completed transactions from WAL and finaly upload changes in the index.
processNextCDCChunk() starts its work outside a transaction. The transaction is opened in
dbRepository.save(dto2db(dto))
and closed on exit from it.
Therefore, by the time processNextCDCChunk() is called, the data saved in the previous step is already in WAL and available for scanning by this method. And, as a result, by the time processNextCDCChunk() ends, this data will already be in the index.
With the next call of elkRepository.findById(id) we get this data from the index and return it to the controller.
Here the next question arises. Does the state of the returned object match the state the object was at call of update(dto)? For example, if user A changed the product name to "prodA", will the name stay that way in the output? The answer to this question is negative. The correct answer is as follows. We return the state of the product at SOME point in time after the database transaction was committed.
Let’s take an example. Suppose user A renamed the product to "prodA" and user B followed him to "prodB". If by the time processNextCDCChunk() method of user A completed WAL scan, the transaction with user B’s change has been committed, then the result of this commit will be included in the scan of user A, and user A’s elkRepository.findById(id) will return the product with the value of the field name "prodB".
There is nothing strange in this, as it is clear that the state of the database is saved in the index with some delay. The main thing is that all database changes reach the index.
In particular, after successfully saving changes to the database and to the index, we may return nothing to the user. This can happen if, following the changes that we process in client A’s transaction, this object is deleted in client B’s transaction. And if by the time user A scans WAL, the deletion is already committed, then user A’s processNextCDCChunk() will remove the object from the index. In this case, upon return from processNextCDCChunk(), this object will no longer be in the index. Unless, of course, some third user C re-createed the deleted product and this product has uploaded into the index by the time elkRepository.findById(id) of user A is called. In this case, we will return the product of user C to the user A, not the product of user A with the name "prodA".
The processNextCDCChunk() method is simple. It calls the TestDecodingCDCService.processNextCDCChunk() method and handles exceptions. It is important to note two points here.
The first is that the method is called asynchronously.
And the second is that the executor that executes the method uses one thread. This ensures that WAL is processed sequentially, and therefore data changes are loaded into the index in exactly the same order as those changes were loaded into the database. Therefore, the state of the index at each moment will correspond to the state of the database at some moment in the past.
TestDecodingCDCService.processNextCDCChunk() is the central method. It takes all committed unprocessed transactions, parses and uploads changes from scanned transactions into the index.
Uploading changes to the index (TransactionOperationProcessor.processOp()) consists of the following steps:
-
Based on the table name from the transaction operation (TransactionOperation.tableName), determine the JPA entity class that is persisted in this table. If the class is not defined, then the operation is skipped.
-
Based on the JPA class, find the ELK service that processes the entities of this class. This service wraps ProductELKRepository. ProductVanillaService uses this repository to upload JPA entity into the index. If no such service is found, then the operation is skipped.
-
Using JPA, restore the JPA entity by TransactionOperation.restoreSQLStatement. Since the operation is associated with a specific table (TransactionOperation.tableName), only the part of the entity that is persisted in that table is restored. JPA properties annotated with @OneToOne, @OneToMany, etc. are not initialized.
-
Upload the restored JPA entity into the index with the service found in step 2.
If there are any problems during the processing of operations, you need to fix the problems and re-run TestDecodingCDCService.processNextCDCChunk(). This can be done because the ProductELKRepository.save and ProductELKRepository.delete are idempotent.
If all scanned transactions are successfully processed, then the corresponding records are removed from WAL.
I hope provided information is helpful.
Please contact me if you have any comments, suggestions or questions.
Stay in sync,
Sergey