SearchIndexingPlan - exception when using addOrUpdate in different order

Hello :smiley:

I find something awkward when i use the SearchIndexingPlan, in my use case i receive a batch with entities to index. To illustrate my issue i’m not using a loop to iterate throught my entites.

SearchSession searchSession = Search.session((EntityManager) sessionFactory.getCurrentSession());
SearchIndexingPlan searchWritePlan = searchSession.indexingPlan();
// this order endup to the exception below
searchWritePlan.addOrUpdate(rootRepository.load(ClientPhysique.class), 12445l));
searchWritePlan.addOrUpdate(rootRepository.load(ClientPhysique.class, 12444l));
searchWritePlan.addOrUpdate(rootRepository.load(ClientPhysique.class, 12443l));
// this order is fine
searchWritePlan.addOrUpdate(rootRepository.load(ClientPhysique.class), 12443l));
searchWritePlan.addOrUpdate(rootRepository.load(ClientPhysique.class, 12444l));
searchWritePlan.addOrUpdate(rootRepository.load(ClientPhysique.class, 12445l));
searchWritePlan.process();
searchWritePlan.execute();
java.util.ConcurrentModificationException: null
	at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
	at java.base/java.util.LinkedHashMap$LinkedValueIterator.next(LinkedHashMap.java:746)
	at org.hibernate.search.mapper.pojo.work.impl.PojoIndexedTypeIndexingPlan.resolveDirty(PojoIndexedTypeIndexingPlan.java:87)
	at org.hibernate.search.mapper.pojo.work.impl.PojoIndexingPlanImpl.process(PojoIndexingPlanImpl.java:100)
	at org.hibernate.search.mapper.orm.work.impl.SearchIndexingPlanImpl.process(SearchIndexingPlanImpl.java:55)
	at com.xxxxx.services.elasticsearch.SdkUtilsBS.indexBatch(SdkUtilsBS.java:79)
	at com.xxxxxx.services.facade.SdkUtilsFacade.indexBatch(SdkUtilsFacade.java:26)
	at com.xxxxxx.services.facade.SdkUtilsFacade$$FastClassBySpringCGLIB$$4ae56d67.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
	at com.xxxxx.services.facade.SdkUtilsFacade$$EnhancerBySpringCGLIB$$77255ebf.indexBatch(<generated>)
	at com.xxxxxxx.controllers.SdkUtilsController.index(SdkUtilsController.java:29)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:800)
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:660)
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:209)
	at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
	at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
	at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at com.xxxxx.context.filter.LogbackCrpenceFilter.doFilter(LogbackCrpenceFilter.java:51)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at com.xxxxxx.context.filter.CheckContextFilter.doFilter(CheckContextFilter.java:74)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
	at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
	at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:678)
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
	at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:853)
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1587)
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
	at java.base/java.lang.Thread.run(Thread.java:834)

Do you have any tips to troubleshoot this ? Or maybe an idea of what is going on :smiley:

Thank you

Hello,

When you use an indexing plan, calling addOrUpdate will not only index that entity, but also any other entity that could be affected by changes to that entity. That’s probably not ideal in your case, but that’s another matter.

There seems to be a bug related to the resolution of the entities that should be reindexed. I suppose ClientPhysique has an @IndexedEmbedded that includes another ClientPhysique?

The problem is probably in org.hibernate.search.mapper.pojo.work.impl.PojoIndexingPlanImpl#process, in this loop:

			for ( PojoIndexedTypeIndexingPlan<?, ?, ?> delegate : indexedTypeDelegates.values() ) {
				delegate.resolveDirty( this::updateBecauseOfContained );
			}

The call to resolveDirty will trigger a loop on indexingPlansPerId that executes an operation of each element of that map.
The problem is, this operation might trigger the addition of an element to the map indexedTypeDelegates (through org.hibernate.search.mapper.pojo.work.impl.PojoIndexingPlanImpl#getOrCreateIndexedDelegateForContainedUpdate), or to the map indexingPlansPerId (through org.hibernate.search.mapper.pojo.work.impl.PojoIndexedTypeIndexingPlan#getPlan), when we discover a type/entity that wasn’t present in the map initially.

The concurrent modification is not a problem per se, because we don’t need to call resolveDirty on newly added elements. However, it’s a problem because the iterator will detect the concurrent modification and throw an exception.

Would you mind creating a ticket? If you want to have a try at solving the problem, that’s even better :slight_smile:

The most important part of fixing the problem will be to create a reproducer. We will need two:

  • one where an entity A1 of type A is indexed-embedded in an entity A2 of type A. A1 and A2 are created in a first transaction, then in a second transaction A1 is modified, which should trigger reindexing of A2 and thus the bug.
  • one where an entity A1 of type A is indexed-embedded in an entity B2 of type B. A1 and B2 are created in a first transaction, then in a second transaction A1 is modified, which should trigger reindexing of B2 and thus the bug.

A good starting point for the test would be to copy/paste org.hibernate.search.integrationtest.mapper.orm.automaticindexing.AutomaticIndexingBasicIT#directPersistUpdateDelete in a second test class, e.g. AutomaticIndexingConcurrentModificationIT.

As to the solution… the easiest solution will probably be to dump the content of the two maps I mentioned above to an array before iterating on them, like we used to do in Search 5:

		PerClassWork[] worksFromEvents = byClass.values().toArray( new PerClassWork[byClass.size()] );

		// We need to iterate on a "frozen snapshot" of the byClass values
		// because of HSEARCH-647. This method is not recursive, invoked
		// only after the current unit of work is complete, and all additional
		// work we add through recursion is already complete, so we don't need
		// to process again new classes we add during the process.
		for ( PerClassWork perClassWork : worksFromEvents ) {
			perClassWork.processContainedInAndPrepareExecution();
		}

@yrodiere
Thank you for the answer, i can reproduce it with something like that, is it ok for you ?

package org.hibernate.search.integrationtest.mapper.orm.automaticindexing;

import javax.persistence.Basic;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;

import org.hibernate.SessionFactory;
import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.mapper.orm.Search;
import org.hibernate.search.mapper.orm.automaticindexing.AutomaticIndexingStrategyName;
import org.hibernate.search.mapper.orm.cfg.HibernateOrmMapperSettings;
import org.hibernate.search.mapper.orm.work.SearchIndexingPlan;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.AssociationInverseSide;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.GenericField;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.IndexedEmbedded;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.IndexingDependency;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.ObjectPath;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.PropertyValue;
import org.hibernate.search.util.impl.integrationtest.common.rule.BackendMock;
import org.hibernate.search.util.impl.integrationtest.mapper.orm.OrmSetupHelper;
import org.hibernate.search.util.impl.integrationtest.mapper.orm.OrmUtils;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class AutomaticIndexingConcurrentModificationIT {
	@Rule
	public BackendMock backendMock = new BackendMock( "stubBackend" );

	@Rule
	public OrmSetupHelper ormSetupHelper = OrmSetupHelper.withBackendMock( backendMock );

	private SessionFactory sessionFactory;

	@Before
	public void setup() {
		backendMock.expectSchema( IndexedEntity.INDEX, b -> b
				.field( "firstName", String.class )
				.field( "name", String.class )
				.objectField( "child", b2 -> b2
						.field( "firstName", String.class )
						.field( "name", String.class )
				)
		);

		sessionFactory = ormSetupHelper.start().withProperty(
				HibernateOrmMapperSettings.AUTOMATIC_INDEXING_STRATEGY,
				AutomaticIndexingStrategyName.NONE
		).setup( IndexedEntity.class );

		backendMock.verifyExpectationsMet();
	}

	@Test
	public void directPersistUpdateDelete() {

		IndexedEntity entity1 = new IndexedEntity();
		entity1.setId( 1 );
		entity1.setName( "edouard" );
		entity1.setFirstName( "zobocare" );

		IndexedEntity entity2 = new IndexedEntity();
		entity2.setId( 2 );
		entity2.setName( "yann" );
		entity2.setFirstName( "bonduel" );

		IndexedEntity entity3 = new IndexedEntity();
		entity3.setId( 3 );
		entity3.setName( "antoine" );
		entity3.setFirstName( "owl" );

		IndexedEntity entity4 = new IndexedEntity();
		entity4.setId( 4 );
		entity4.setName( "princess" );
		entity4.setFirstName( "yossrage" );

		entity1.setChild( entity2 );
		entity2.setChild( entity1 );

		entity3.setChild( entity4 );
		entity4.setChild( entity3 );

		// First transaction
		OrmUtils.withinTransaction( sessionFactory, session -> {
			session.persist( entity1 );
			session.persist( entity2 );
			session.persist( entity3 );
			session.persist( entity4 );
			session.flush();
			backendMock.verifyExpectationsMet();
		} );

		// Second transaction
		OrmUtils.withinTransaction( sessionFactory, session -> {
			SearchIndexingPlan indexingPlan = Search.session( session ).indexingPlan();
			indexingPlan.addOrUpdate( session.load( IndexedEntity.class, 3 ) );
			indexingPlan.addOrUpdate( session.load( IndexedEntity.class, 1 ) );
			indexingPlan.addOrUpdate( session.load( IndexedEntity.class, 2 ) );

			backendMock.expectWorksAnyOrder( IndexedEntity.INDEX, DocumentCommitStrategy.FORCE, DocumentRefreshStrategy.NONE )

					.update( entity1.getId().toString(), b -> b
							.field( "name", entity1.getName() )
							.field( "firstName", entity1.getFirstName() )
							.objectField( "child", b2 -> b2
									.field( "firstName", entity2.getFirstName() )
									.field( "name", entity2.getName() )
							)
					)
					.update( entity2.getId().toString(), b -> b
							.field( "name", entity2.getName() )
							.field( "firstName", entity2.getFirstName() )
							.objectField( "child", b2 -> b2
									.field( "firstName", entity1.getFirstName() )
									.field( "name", entity1.getName() )
							)
					)
					.update( entity3.getId().toString(), b -> b
							.field( "name", entity3.getName() )
							.field( "firstName", entity3.getFirstName() )
							.objectField( "child", b2 -> b2
									.field( "firstName", entity4.getFirstName() )
									.field( "name", entity4.getName() )
							)
					)
					.update( entity4.getId().toString(), b -> b
							.field( "name", entity4.getName() )
							.field( "firstName", entity4.getFirstName() )
							.objectField( "child", b2 -> b2
									.field( "firstName", entity3.getFirstName() )
									.field( "name", entity3.getName() )
							)
					)
					.processedThenExecuted();
		} );

		backendMock.verifyExpectationsMet();
	}

	@Entity(name = "indexed")
	@Indexed(index = IndexedEntity.INDEX)
	public static class IndexedEntity {
		static final String INDEX = "IndexedEntity";

		@Id
		private Integer id;

		@GenericField
		@Basic
		private String firstName;

		@GenericField
		@Basic
		private String name;

		@IndexedEmbedded(includePaths = {
				"firstName", "name"
		})
		@OneToOne
		@IndexingDependency(
				derivedFrom = {
						@ObjectPath({ @PropertyValue(propertyName = "name") }),
						@ObjectPath({ @PropertyValue(propertyName = "firstName") })
				}
		)
		@AssociationInverseSide(inversePath = @ObjectPath(@PropertyValue(propertyName = "child")))
		private IndexedEntity child;

		public Integer getId() {
			return id;
		}

		public void setId(Integer id) {
			this.id = id;
		}

		public String getFirstName() {
			return firstName;
		}

		public void setFirstName(String firstName) {
			this.firstName = firstName;
		}

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public IndexedEntity getChild() {
			return child;
		}

		public void setChild(
				IndexedEntity child) {
			this.child = child;
		}
	}
}

This seems to fix my issue as you expected

// in PojoIndexedTypeIndexingPlan.class
void resolveDirty(PojoReindexingCollector containingEntityCollector) {
		List<IndexedEntityIndexingPlan> plansPerID = new ArrayList<>( indexingPlansPerId.values() );
		for ( IndexedEntityIndexingPlan plan : plansPerID ) {
			plan.resolveDirty( containingEntityCollector );
		}
	}

// PojoContainedTypeIndexingPlan.class
void resolveDirty(PojoReindexingCollector containingEntityCollector) {
		List<ContainedEntityIndexingPlan> plansPerID = new ArrayList<>( indexingPlansPerId.values() );
		for ( ContainedEntityIndexingPlan plan : plansPerID ) {
			plan.resolveDirty( containingEntityCollector );
		}
	}

I will create the ticket later, and put a PR after i check all the test are still ok.

PR: https://github.com/hibernate/hibernate-search/pull/2240

Thanks, I will have a look ASAP.

1 Like