NPE for outbox-polling strategy for @Entity with @RoutingBinderRef

\"cause\":{\"commonElementCount\":36,\"localizedMessage\":\"HSEARCH850012: Unable to serialize OutboxEvent payload with Avro\",\"message\":\"HSEARCH850012: Unable to serialize OutboxEvent payload with Avro\",\"name\":\"org.hibernate.search.util.common.SearchException\",\"cause\":{\"commonElementCount\":36,\"name\":\"java.lang.NullPointerException\",\"extendedStackTrace\":\"java.lang.NullPointerException: null
at org.hibernate.search.mapper.orm.coordination.outboxpolling.avro.impl.EventPayloadToDtoConverterUtils.convert(EventPayloadToDtoConverterUtils.java:62) ~[hibernate-search-mapper-orm-coordination-outbox-polling-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.orm.coordination.outboxpolling.avro.impl.EventPayloadToDtoConverterUtils.convert(EventPayloadToDtoConverterUtils.java:51) ~[hibernate-search-mapper-orm-coordination-outbox-polling-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.orm.coordination.outboxpolling.avro.impl.EventPayloadToDtoConverterUtils.convert(EventPayloadToDtoConverterUtils.java:32) ~[hibernate-search-mapper-orm-coordination-outbox-polling-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.orm.coordination.outboxpolling.avro.impl.EventPayloadSerializationUtils.serialize(EventPayloadSerializationUtils.java:41) ~[hibernate-search-mapper-orm-coordination-outbox-polling-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.orm.coordination.outboxpolling.event.impl.OutboxPollingOutboxEventSendingPlan.append(OutboxPollingOutboxEventSendingPlan.java:45) ~[hibernate-search-mapper-orm-coordination-outbox-polling-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.orm.automaticindexing.impl.HibernateOrmIndexingQueueEventSendingPlan.append(HibernateOrmIndexingQueueEventSendingPlan.java:28) ~[hibernate-search-mapper-orm-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.pojo.work.impl.PojoTypeIndexingPlanEventQueueDelegate.delete(PojoTypeIndexingPlanEventQueueDelegate.java:92) ~[hibernate-search-mapper-pojo-base-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.pojo.work.impl.AbstractPojoTypeIndexingPlan$AbstractEntityState.delegateDelete(AbstractPojoTypeIndexingPlan.java:408) ~[hibernate-search-mapper-pojo-base-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.pojo.work.impl.AbstractPojoTypeIndexingPlan$AbstractEntityState.sendCommandsToDelegate(AbstractPojoTypeIndexingPlan.java:334) ~[hibernate-search-mapper-pojo-base-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.pojo.work.impl.AbstractPojoTypeIndexingPlan.process(AbstractPojoTypeIndexingPlan.java:106) ~[hibernate-search-mapper-pojo-base-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.pojo.work.impl.PojoIndexingPlanImpl.process(PojoIndexingPlanImpl.java:138) ~[hibernate-search-mapper-pojo-base-6.1.3.Final.jar!/:6.1.3.Final]
at org.hibernate.search.mapper.orm.event.impl.HibernateSearchEventListener.onFlush(HibernateSearchEventListener.java:199)

I can provide additional details if required

The NPE seems to be happening on this line in the Hibernate Search code:

				.setRoutingKey( route.routingKey() )

Suggesting that route is null. But a route should never be null.

So, I will need the code of your routing binder and routing bridge. Also, if you’re making explicit calls to the indexing plan, I’ll need that code as well. Did you call delete or purge, by any chance?

public class RuleRoutingBinder implements RoutingBinder {
    @Override
    public void bind(RoutingBindingContext context) {
        context.dependencies().use("subPolicy.name").use("ruleNumber").use("subRules");
        context.bridge(Rule.class, new RuleBridge());
    }


     static class RuleBridge implements RoutingBridge<Rule> {

        @Override
        public void route(DocumentRoutes routes, Object entityIdentifier, Rule indexedEntity,
                          RoutingBridgeRouteContext context) {
            String ruleNumber = indexedEntity.getRuleNumber();
            Set<SubRule> subRules = Optional.ofNullable(indexedEntity.getSubRules()).orElse(Collections.emptySet());
            String[] customRulePrefixes =
                    Arrays.stream(RULE_PREFIX.values()).map(RULE_PREFIX::getRulePrefix).toArray(String[]::new);
            if (StringUtils.startsWithIgnoreCase(ruleNumber, "CR-") ||
                    subRules.stream().filter(Objects::nonNull).map(SubRule::getCodeRule).filter(Objects::nonNull)
                            .map(CodeRule::getRuleNumber).anyMatch(
                                    codeRuleNumber -> StringUtils.startsWithAny(codeRuleNumber, customRulePrefixes))) {
                routes.addRoute();
            } else {
                routes.notIndexed();
            }

        }

        @Override
        public void previousRoutes(DocumentRoutes routes, Object entityIdentifier, Rule indexedEntity,
                                   RoutingBridgeRouteContext context) {
            routes.addRoute();
        }
    }
}

We did not make any explicit delete or purge calls in this flow which we are encountering the error, we only use them in certain scenarios of reindexing. I am attaching that code also, just in case.

@Transactional
    public Map<String, Boolean> reIndexEntites(List<String> entities) {
        SearchSession searchSession = Search.session(entityManager);
        SearchMapping searchMapping = Search.mapping(entityManager.getEntityManagerFactory());

        Map<String, Boolean> resultMap = new HashMap<>();

        for(String entity : entities) {
            long t1;
            log.info("Reindex triggered for entity: {}", entity);
            Class entityClass = classNameMap.get(entity);
            if (Objects.isNull(entityClass)) {
                log.error("Reindex cannot be run for entity: {}", entity);
                resultMap.put(entity, false);
                continue;
            }

            String entityName = searchMapping.indexedEntity(entityClass).jpaName().toLowerCase();
            String writeAlias = entityName + "-write";
            String readAlias = entityName + "-read";

            //remove read and write alias from current index
            t1 = System.currentTimeMillis();
            String currentIndex = gsRestHighLevelClient.getIndexForAlias(writeAlias);
            if (Objects.isNull(currentIndex)) {
                resultMap.put(entity, false);
                continue;
            }
            if (!gsRestHighLevelClient.removeAlias(currentIndex, writeAlias) ||
                    !gsRestHighLevelClient.removeAlias(currentIndex, readAlias)) {
                resultMap.put(entity, false);
                continue;
            }
            MonitoringDataService.logDataToES("remove_alias_current_index", System.currentTimeMillis() - t1,
                    MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));

            //create new index
            t1 = System.currentTimeMillis();
            try {
                SearchSchemaManager searchSchemaManager = searchSession.schemaManager(entityClass);
                searchSchemaManager.createOrUpdate();
            } catch (Exception e) {
                log.error("Error occurred while creating new index for entity: {}", entity, e);
                resultMap.put(entity, false);
                continue;
            }
            String newIndex = gsRestHighLevelClient.getIndexForAlias(writeAlias);
            if (Objects.isNull(newIndex)) {
                resultMap.put(entity, false);
                continue;
            }
            MonitoringDataService.logDataToES("create_new_index", System.currentTimeMillis() - t1,
                    MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));

            //remove read alias from new index and add to current index
            t1 = System.currentTimeMillis();
            if (!gsRestHighLevelClient.removeAlias(newIndex, readAlias) ||
                    !gsRestHighLevelClient.addAlias(currentIndex, readAlias, false)) {
                resultMap.put(entity, false);
                continue;
            }
            MonitoringDataService.logDataToES("switch_read_alias", System.currentTimeMillis() - t1,
                    MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));

            //run mass indexing on new index
            try {
                t1 = System.currentTimeMillis();
                MassIndexer massIndexer = searchSession.massIndexer(entityClass);
                massIndexer.type(Rule.class).reindexOnly(
                        "e.ruleNumber like 'CR-%' or e.id in (select r.id from Rule r join SubRule sr on sr.rule.id " +
                                "=" + " r.id" + " join " + "CodeRule cr " + "on" + " sr" +
                                ".codeRule.id = cr.id where cr.ruleNumber like 'ARC-%' or cr" +
                                ".ruleNumber like 'AZC-%')");
                massIndexer.startAndWait();
                MonitoringDataService.logDataToES("mass_index", System.currentTimeMillis() - t1,
                        MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));
                TimeUnit.SECONDS.sleep(configurationService.findKey("gs.indexing.interval.seconds", 60));
                // change this time
            } catch (Exception e) {
                log.error("Error occurred while mass indexing on entity: {}", entity, e);
                //continuing as GSDataIntegrity cron will take care of missing entries in new index
            }

            //add read index to new index
            t1 = System.currentTimeMillis();
            if (!gsRestHighLevelClient.addAlias(newIndex, readAlias, false)) {
                resultMap.put(entity, false);
                continue;
            }
            MonitoringDataService.logDataToES("new_index_read_alias", System.currentTimeMillis() - t1,
                    MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));

            //delete current index
            t1 = System.currentTimeMillis();
            if (!gsRestHighLevelClient.deleteIndex(currentIndex)) {
                resultMap.put(entity, false);
                continue;
            }
            MonitoringDataService.logDataToES("delete_current_index", System.currentTimeMillis() - t1,
                    MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));

            resultMap.put(entity, true);
        }

        return resultMap;
    }

    private Map<String, String> generateMassIndexingTags(String entity) {
        Map<String, String> tags = new HashMap<>();
        tags.put("entity", entity);
        return tags;
    }

    @Transactional
    public Map<String, Boolean> reindexParticularEntities(String searchKey, List<String> ids) {
        SearchSession searchSession = Search.session(entityManager);
        SearchIndexingPlan searchIndexingPlan = searchSession.indexingPlan();

        Map<String, Boolean> resultMap = new HashMap<>();
        Class entityClass = classNameMap.get(searchKey);
        if (Objects.nonNull(entityClass)) {
            for (String id : ids) {
                //entityManager.getReference throws Exception if id is not present in db, .find returns null
                Object entityObject = entityManager.find(entityClass, Long.parseLong(id));
                if (Objects.nonNull(entityObject)) {
                    //if object is present in db, then send index request
                    searchIndexingPlan.addOrUpdate(entityObject);
                    resultMap.put(id, true);
                } else {
                    //if object is not present in db, send delete doc request
                    searchIndexingPlan.purge(entityClass, Long.parseLong(id), null);
                    resultMap.put(id, false);
                }
            }
        }

        return resultMap;
    }

    @Transactional
    public Map<String, Boolean> reindexEntitiesForCustomerId(List<String> entities, String customerId) {
        SearchSession searchSession = Search.session(entityManager);
        SearchIndexingPlan searchIndexingPlan = searchSession.indexingPlan();
        SearchMapping searchMapping = Search.mapping(entityManager.getEntityManagerFactory());

        Map<String, Boolean> resultMap = new HashMap<>();

        for (String entity : entities) {
            Class entityClass = classNameMap.get(entity);
            if (Objects.isNull(entityClass)) {
                resultMap.put(entity, false);
                continue;
            }

            Object entityObject = null;
            String tableName;
            try {
                entityObject = entityClass.newInstance();
            } catch (ReflectiveOperationException e) {
                log.error("Error while creating entity instance for entity: {}", entity);
                resultMap.put(entity, false);
            }

            SessionImpl session = entityManager.unwrap(SessionImpl.class);

            EntityPersister persister = session.getEntityPersister(null, entityObject);

            if (persister instanceof AbstractEntityPersister) {
                AbstractEntityPersister persisterImpl = (AbstractEntityPersister) persister;
                tableName = persisterImpl.getTableName();
            } else {
                log.error("Unexpected persister type; a subtype of AbstractEntityPersister expected for entity: {}",
                        entity);
                resultMap.put(entity, false);
                continue;
            }

            //get all db object ids for searchkey
            String dbQuery;
            if (crossCustomerEntities.containsKey(entityClass)) {
                log.error("Entity {} is cross-customer type, try reindexing using other endpoints", entity);
                resultMap.put(entity, false);
                continue;
            } else if (customerLevelEntities.containsKey(entityClass)) {
                dbQuery = generateDbQueryForSpecialEntities(tableName);
                if (dbQuery.isEmpty()) {
                    log.error("Invalid entity {}", entity);
                    resultMap.put(entity, false);
                    continue;
                }
            } else {
                dbQuery = "select id from " + tableName + " where customer_id=?";
            }
            Object[] args = {customerId};
            int[] argsType = {4}; //argType code INTEGER
            List<Map<String, Object>> dbObjects = jdbcTemplate.queryForList(dbQuery, args, argsType);

            //delete all docs from ES for searchkey
            String entityName = searchMapping.indexedEntity(entityClass).jpaName().toLowerCase();
            String writeAlias = entityName + "-write";
            String index = gsRestHighLevelClient.getIndexForAlias(writeAlias);
            if (StringUtils.isBlank(index)) {
                resultMap.put(entity, false);
                continue;
            }
            BoolQueryBuilder finalBoolQuery = ESUtil.getBoolQuery();
            MatchQueryBuilder customerIdQuery = ESUtil.constructMatchQuery("customerId", customerId);
            finalBoolQuery.must(customerIdQuery);
            MatchQueryBuilder searchKeyQuery = ESUtil.constructMatchQuery("searchKey", entity);
            finalBoolQuery.must(searchKeyQuery);
            DeleteByQueryRequest esQuery = new DeleteByQueryRequest(index);
            esQuery.setQuery(finalBoolQuery);
            try {
                gsRestHighLevelClient.getClient().deleteByQuery(esQuery, RequestOptions.DEFAULT);
            } catch (IOException e) {
                log.error("IOException while deleting docs for entity: {}", entity);
                resultMap.put(entity, false);
                continue;
            }

            //reindex db objects to ES
            for (Map<String, Object> idObj : dbObjects) {
                Long id = (Long) idObj.get("id");
                Object entityObjectToIndex = entityManager.getReference(entityClass, id);
                searchIndexingPlan.addOrUpdate(entityObjectToIndex);
            }

            //refresh index (optional step)
            RefreshRequest refreshRequest = new RefreshRequest(index);
            try {
                gsRestHighLevelClient.getClient().indices().refresh(refreshRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                log.error("IOException while refreshing index for entity: {}", entity);
            }

            resultMap.put(entity, true);
        }

        return resultMap;
    }

    private String generateDbQueryForSpecialEntities(String tableName) {
        switch (tableName.toUpperCase()) {
            case "RULE":
                return "select r.id from rule r left join sub_rule sr on r.id = sr.rule_id left join code_rule cr on " +
                        "cr" + ".id " +
                        "= sr.code_rule_id join sub_policy sp on sp.id = r.subpolicy_id join policy p on p.id = sp" +
                        ".policy_id where ((r.rule_number like 'CR-%' or cr.rule_number like any(array['ARC-%', " +
                        "'AZC-%'])) and r.is_deleted = false and p.customer_id = ?);";
            // Add more cases according to requirements
            default:
                return "";
        }
    }

    //used in ITs
    @Transactional
    public void purgeEntitiesFromES(List<Object> entities) {
        SearchSession searchSession = Search.session(entityManager);
        SearchIndexingPlan searchIndexingPlan = searchSession.indexingPlan();

        for (Object entity : entities) {
            String searchKey = ((SearchEntity) entity).getSearchKey();
            Class entityClass = classNameMap.get(searchKey);
            Long id = ((DomainObject) entity).getId();
            searchIndexingPlan.purge(entityClass, id, null);
        }
    }

    @Transactional
    public void reindexEntity(Object entity) {
        if (!entity.getClass().isAnnotationPresent(Indexed.class))
            return;

        SearchSession searchSession = Search.session(entityManager);
        SearchIndexingPlan searchIndexingPlan = searchSession.indexingPlan();

        searchIndexingPlan.addOrUpdate(entity);
}
    

Thanks. I managed to reproduce the problem.

I opened [HSEARCH-4537] - Hibernate JIRA and will work on this.

Thanks @yrodiere , can we expect this fix in the next release?

Yes you can. I need to fix the bug first, though.

1 Like

This was fixed in Hibernate Search 6.1.5.Final, released just now.