Wrong order of documents updated in ES and database

Hi,

I am observing sometimes strange behavior (inconsistency) when I do fast changes (milliseconds) in database for the same record.

  1. I create record in database that results in OpenSearch call:
    curl -iX POST ‘https://xxxx.es.amazonaws.com/_bulk’ -d '{“index”:{“_index”:“xxx-write”,“_id”:“T1_1”,“routing”:“T1”}}
    {“data”:“test1”, “_entity_type”:“Xxxx”,“_tenant_id”:“T1”,“_tenant_doc_id”:“1”}

    Response is:
    HTTP/1.1 200 OK
    {“took”:4,“errors”:false,“items”:[{“index”:{“_index”:“xxx-000001”,“_id”:“T1_1”,“_version”:1,“result”:“created”,“_shards”:{“total”:2,“successful”:2,“failed”:0},“_seq_no”:26463,“_primary_term”:1,“status”:201}}]}

  2. Update of the same record in database - change value to “test2”
    curl -iX POST ‘https://xxxx.es.amazonaws.com/_bulk’ -d '{“index”:{“_index”:“xxx-write”,“_id”:“T1_1”,“routing”:“T1”}}
    {“data”:“test2”, “_entity_type”:“Xxxx”,“_tenant_id”:“T1”,“_tenant_doc_id”:“1”}

    Response is:
    HTTP/1.1 200 OK
    {“took”:5,“errors”:false,“items”:[{“index”:{“_index”:“xxx-000001”,“_id”:“T1_1”,“_version”:3,“result”:“updated”,“_shards”:{“total”:2,“successful”:2,“failed”:0},“_seq_no”:26515,“_primary_term”:1,“status”:200}}]}

  3. Another update of the same record in database - change value to “test3”
    curl -iX POST ‘https://xxxx.es.amazonaws.com/_bulk’ -d '{“index”:{“_index”:“xxx-write”,“_id”:“T1_1”,“routing”:“T1”}}
    {“data”:“test3”, “_entity_type”:“Xxxx”,“_tenant_id”:“T1”,“_tenant_doc_id”:“1”}

    Response is:
    HTTP/1.1 200 OK
    {“took”:6,“errors”:false,“items”:[{“index”:{“_index”:“xxx-000001”,“_id”:“T1_1”,“_version”:2,“result”:“updated”,“_shards”:{“total”:2,“successful”:2,“failed”:0},“_seq_no”:26506,“_primary_term”:1,“status”:200}}]}

I have in database correct value “test3”.
But the issue is with value in OpenSearch where I have value “test2” instead of “test3”.

For some reason the request to OpenSearch for update in point 3) was processed before update in point 2).
It can be seen on version and _seq_no from the OpenSearch response.

I am using “hibernate.search.indexing.plan.synchronization.strategy”=“write-sync”.
“hibernate.core.version” = 6.6.1.Final
“hibernate.search” = 7.2.1.Final

This happens occasionally.
Do you please know what is the reason for this issue and how to fix it?

Thank you for help.

Kind Regards.

Hey,

This looks odd. A few questions:

  1. Your snippets mention curl -iX, why is that? Hibernate Search doesn’t use curl.
  2. Is this all happening in a single application instance?
  3. How do you execute these consecutive changes? Single transaction with multiple flushes? Multiple transactions? Can you show the code?
  4. Do the Hibernate Search logs show three separate _bulk requests, or a single one?
  5. Can you confirm you don’t use coordination?
  6. I see this in responses: total”:2,“successful”:2. Do you have multiple OpenSearch nodes, or just two shards (e.g. primary/replica) on the same node? What’s your OpenSearch setup exactly?

Hi,

  1. That’s what I see in log.
  2. It’s a single application that’s running in multiple nodes. So the call to database can be done from any node.
  3. It’s always a single transaction that can come from any node. So one update can come from node1 and the second one from node2 but those are always independent single transactions Resulting in correct values in database. I have constrains on table and a trigger that will rollback transaction if data are wrong (wrong version).
  4. Logs shows 3 separate _bulk requests.
  5. I am not setting “hibernate.search.coordination.strategy” at all. So no coordination.
  6. OpenSearch 2.11 - OpenSearch_2_11_R20241003, 3-AZ without standby, 3 data nodes. Index split into 64 shards.

Thank you.

Ok, well that’s not Hibernate Search logs. No idea where this comes from.

That’s your problem right there. If your second and third operations happen on different nodes, it’s totally possible that the indexing requests are sent to Elasticsearch out of order, because of e.g. garbage collector pauses or just thread scheduling. Or they are sent in order, but due to network latency variations, are received by Elasticsearch out of order.

With a single application node it’s not usually a problem, because there are in-JVM mechanisms to preserve operation order (ES requests are executed in the same order as the order of transaction completion). With multiple nodes, there’s no such protection.

The only solution right now is to use outbox-polling coordination.

In your particular case optimistic concurrency control could, maybe, have helped, though picking a suitable version number could be challenging. But anyway, nobody cared enough so far to contribute an implementation of that, so you can’t use it right now.

@mbekhta This limitation is maybe not as obvious as I thought, so we should probably document it in Hibernate Search 7.2.2.Final: Reference Documentation? This should probably even be the first limitation… And obviously we’d need to reference it from pros/cons of architecture examples, e.g. there: Hibernate Search 7.2.2.Final: Reference Documentation

1 Like

Thank you for your response.

I am not sure that I can use outbox-polling coordination because it needs a list of tenants up front in the configuration. And I have a solution where new tenants are added dynamically. Also each tenant has it’s own database.

This should not be a problem, as long as it works in Hibernate ORM.

This will be a problem indeed. Hibernate Search needs to start an agent for each tenant – especially if each has its own database. Without knowing the list of tenants, it can’t start agents and can’t process events.

Maybe there could be a new feature in a future version of Hibernate Search where applications can list tenants for Hibernate Search on startup (e.g. they retrieve them from a DB), and later “notify” Hibernate Search about newly added tenants, so that Hibernate Search starts agents accordingly. But so far nobody requested that feature, nor offered to work on it :slight_smile:

I have 2 possibilities to solve the issue:

  1. implement something like outbox-polling coordination that will be able to handle new tenants dynamically
  2. I will make sure, that all the database changes for a specific tenant are done from a single node.

I think I will go for solution 2.

Than you very much for helping me.

1 Like

Hi,

I have changed my code so all the database calls are happening from the same JVM (different threads) but I still see the same issue.

What is the mechanism to make sure that 2 updates that are happening almost at the same time are processed in ES in correct order?

My example:
2 updates are happening almost at the same time.
Commit for “update 1” is done and the call to ES “call 1” is executed.
Commit for “update 2” is done and the call to ES “call 2” is executed.

I am observing that the “call 2” is processed in ES before “call 1”. That can happen because of network latency.

The response for “call 2” has:
“errors”:false, “_version”:2,“result”:“updated”, “_seq_no”:243527, “status”:200

The response for “call 1” has:
“errors”:false, “_version”:3,“result”:“updated”, “_seq_no”:243535, “status”:200

Thank you

There should never be two calls to ES for the same document at the same time. One must finish before the next one starts.

If you want to inspect the code, it starts here: hibernate-search/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchingWorkOrchestrator.java at 0b9e3f0d87250daa800f10223bcc81cdb0da773a · hibernate/hibernate-search · GitHub . There are multiple queues in what’s essentially a hash table based on the document ID. A work item for a given document ID will always end up in the same queue, and work items are processed in queue order.
And the actual relevant logic (queue draining, batching and request triggering) is here: hibernate-search/engine/src/main/java/org/hibernate/search/engine/backend/orchestration/spi/BatchingExecutor.java at 97841fb3a907d88ef738c206385a9b625217b75f · hibernate/hibernate-search · GitHub

That being said:

  1. If the two changes happen very close, they could end up in the same “_bulk” request: two changes in the same request to Elasticsearch/OpenSearch. But if that happens, as far as I know Elasticsearch/OpenSearch will process them in order too.
  2. The logs you mentioned earlier make me doubt if it’s even Hibernate Search doing the Elasticsearch calls. Really, you shouldn’t see anything related to curl. It’s just bizarre.

Also potentially relevant, if your data field is not just mapped to a column, but instead generated in a custom bridge sourcing its data from multiple associations: Hibernate Search 7.2.2.Final: Reference Documentation

Hi,

Here are maven dependencies:

	<dependency>
		<groupId>org.postgresql</groupId>
		<artifactId>postgresql</artifactId>
		<version>42.7.3</version>
	</dependency>

	<dependency>
		<groupId>software.amazon.jdbc</groupId>
		<artifactId>aws-advanced-jdbc-wrapper</artifactId>
		<version>2.5.4</version>
	</dependency>
	
	<dependency>
		<groupId>org.hibernate.search</groupId>
		<artifactId>hibernate-search-mapper-orm</artifactId>
		<version>7.2.1.Final</version>
	</dependency>

	<dependency>
		<groupId>org.hibernate.search</groupId>
		<artifactId>hibernate-search-backend-elasticsearch</artifactId>
		<version>7.2.1.Final</version>
	</dependency>

	<dependency>
		<groupId>org.hibernate.search</groupId>
		<artifactId>hibernate-search-backend-elasticsearch-aws</artifactId>
		<version>7.2.1.Final</version>
	</dependency>
	
	<dependency>
		<groupId>org.hibernate.orm</groupId>
		<artifactId>hibernate-core</artifactId>
		<version>6.6.1.Final</version>
		<exclusions>
			<exclusion>
				<groupId>org.jboss.logging</groupId>
				<artifactId>jboss-logging</artifactId>
			</exclusion>
		</exclusions>
	</dependency>

	<dependency>
		<groupId>jakarta.transaction</groupId>
		<artifactId>jakarta.transaction-api</artifactId>
		<version>2.0.1</version>
	</dependency>

	<dependency>
		<groupId>com.zaxxer</groupId>
		<artifactId>HikariCP</artifactId>
		<version>5.1.0</version>
	</dependency>
	
	<dependency>
		<groupId>com.github.scribejava</groupId>
		<artifactId>scribejava-apis</artifactId>
		<version>8.3.3</version>
	</dependency>
	
	<dependency>
		<groupId>com.github.scribejava</groupId>
		<artifactId>scribejava-httpclient-apache</artifactId>
		<version>8.3.3</version>
		<exclusions>
			<exclusion>
				<groupId>org.apache.httpcomponents</groupId>
				<artifactId>httpclient</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.httpcomponents</groupId>
				<artifactId>httpcore</artifactId>
			</exclusion>
			<exclusion>
				<groupId>com.github.scribejava</groupId>
				<artifactId>scribejava-core</artifactId>
			</exclusion>
			<exclusion>
				<groupId>commons-logging</groupId>
				<artifactId>commons-logging</artifactId>
			</exclusion>
		</exclusions>
	</dependency>

	<dependency>
		<groupId>org.apache.httpcomponents</groupId>
		<artifactId>httpclient</artifactId>
		<version>4.5.14</version>
		<exclusions>
			<exclusion>
				<groupId>commons-logging</groupId>
				<artifactId>commons-logging</artifactId>
			</exclusion>
		</exclusions>
	</dependency>

Hello,

I collected more information and I was able to replicate the issue by debugging the code:

Set breakpoints:
a) ElasticsearchBatchingWorkOrchestrator (method “protected void doSubmit(ElasticsearchBatchedWork<?> work, OperationSubmitter operationSubmitter)”) - line 111: executors.get( work.getQueuingKey() ).submit( work, operationSubmitter );

b) BatchingExecutor (method “public CompletableFuture<?> work()”) - line 170: workBuffer.clear();

Steps to reproduce:

  1. “Thread 1” commit database transaction - data : id = 1, status = ‘OPEN’, version = 1
  2. “Thread 1” execution will stop on breakpoint a)
  • The record is committed in database so it’s visible from other threads
  1. “Thread 2” commit database transaction - data : id = 1, status = ‘DELETED’, version = 2
  • The database now contains those values “id = 1, status = ‘DELETED’, version = 2”
  1. “Thread 2” execution will stop on breakpoint a)
  2. “Thread 2” continue with the code execution
  3. “Thread 1” continue with the code execution
  4. Execution will stop at breakpoint b)
    workQueue contains 2 records:
    [0] - id = 1, status = ‘DELETED’, version = 2
    [1] - id = 1, status = ‘OPEN’, version = 1

The records in the workedQueue are in wrong order

  1. continue with the execution
  2. execution will stop at breakpoint b)
    workQueue contains 0 records

So now I have “id = 1, status = ‘DELETED’, version = 2” in database but in ES there are wrong data “id = 1, status = ‘OPEN’, version = 1”.

Some values from Object “work” in ElasticsearchBatchingWorkOrchestrator.doSubmit:
work.refreshStrategy = DocumentRefreshStrategy.NONE
work.resultAssessor = ElasticsearchRequestSuccessAssessor[ignoredErrorStatuses=[ ], ignoredErrorTypes=[ ], ignoreShardFailures=true]

This is just one possibility to reproduce the issue, but in general the issue occurs when workQueue contains records in wrong order.

Am I missing some settings on entity or in hibernate search that would validate version based on an attribute in entity? Some kind of optimistic locking?

Thank you.

So you’re essentially ending a transaction, and running a whole second transaction (+ submitting indexing work) before the indexing work for the first transaction was submitted.
Okay, that’s… concerning. I suppose we’ve never noticed this because it’s very unlikely that the second transaction would happen so fast… But then you did say yours are very fast.

In your case, I assume the second transaction has all the correct data, including updated data from the first transaction. So we could imagine some workarounds, such as locking the queue(s) before commits and releasing the lock(s) after the work items gets submitted. This could reasonably be introduced as an opt-in “local” coordination strategy.

That would be HSEARCH-5105 and is currently not implemented. The main reason being that documents can be built based on multiple entities (e.g. with @IndexedEmbedded), so it’s unclear how the version number could be generated automatically. Yes we could just combine @Version numbers from all entities involved, but the document’s version number needs to be monotonically increasing, so what happens when an association is set to null (ToOne) or has an element removed (ToMany)?

But if you have a way to maintain such a number in your particular application, Hibernate Search could easily be patched to allow you to annotate a property with @DocumentVersion and automatically use Elasticsearch/OpenSearch versioning – perhaps with a configurable flag to set the behavior on conflicts (ignore or fail).

If you’re interested in this feature, we can help you get this done. Backporting is less certain, we’ll have to talk with @mbekhta.

Hi,

I think I have a solution.

I added a lock in persistence layer that guaranties correct order of items in workQueue (locks the whole transaction).
The lock is per tenant and unique identifier that identifies the record in database.
In this way it will not have performance impact on the application.

In other words I have thread safe persist per a single record in database.

Thank you.

1 Like