public class RuleRoutingBinder implements RoutingBinder {
@Override
public void bind(RoutingBindingContext context) {
context.dependencies().use("subPolicy.name").use("ruleNumber").use("subRules");
context.bridge(Rule.class, new RuleBridge());
}
static class RuleBridge implements RoutingBridge<Rule> {
@Override
public void route(DocumentRoutes routes, Object entityIdentifier, Rule indexedEntity,
RoutingBridgeRouteContext context) {
String ruleNumber = indexedEntity.getRuleNumber();
Set<SubRule> subRules = Optional.ofNullable(indexedEntity.getSubRules()).orElse(Collections.emptySet());
String[] customRulePrefixes =
Arrays.stream(RULE_PREFIX.values()).map(RULE_PREFIX::getRulePrefix).toArray(String[]::new);
if (StringUtils.startsWithIgnoreCase(ruleNumber, "CR-") ||
subRules.stream().filter(Objects::nonNull).map(SubRule::getCodeRule).filter(Objects::nonNull)
.map(CodeRule::getRuleNumber).anyMatch(
codeRuleNumber -> StringUtils.startsWithAny(codeRuleNumber, customRulePrefixes))) {
routes.addRoute();
} else {
routes.notIndexed();
}
}
@Override
public void previousRoutes(DocumentRoutes routes, Object entityIdentifier, Rule indexedEntity,
RoutingBridgeRouteContext context) {
routes.addRoute();
}
}
}
We did not make any explicit delete or purge calls in this flow which we are encountering the error, we only use them in certain scenarios of reindexing. I am attaching that code also, just in case.
@Transactional
public Map<String, Boolean> reIndexEntites(List<String> entities) {
SearchSession searchSession = Search.session(entityManager);
SearchMapping searchMapping = Search.mapping(entityManager.getEntityManagerFactory());
Map<String, Boolean> resultMap = new HashMap<>();
for(String entity : entities) {
long t1;
log.info("Reindex triggered for entity: {}", entity);
Class entityClass = classNameMap.get(entity);
if (Objects.isNull(entityClass)) {
log.error("Reindex cannot be run for entity: {}", entity);
resultMap.put(entity, false);
continue;
}
String entityName = searchMapping.indexedEntity(entityClass).jpaName().toLowerCase();
String writeAlias = entityName + "-write";
String readAlias = entityName + "-read";
//remove read and write alias from current index
t1 = System.currentTimeMillis();
String currentIndex = gsRestHighLevelClient.getIndexForAlias(writeAlias);
if (Objects.isNull(currentIndex)) {
resultMap.put(entity, false);
continue;
}
if (!gsRestHighLevelClient.removeAlias(currentIndex, writeAlias) ||
!gsRestHighLevelClient.removeAlias(currentIndex, readAlias)) {
resultMap.put(entity, false);
continue;
}
MonitoringDataService.logDataToES("remove_alias_current_index", System.currentTimeMillis() - t1,
MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));
//create new index
t1 = System.currentTimeMillis();
try {
SearchSchemaManager searchSchemaManager = searchSession.schemaManager(entityClass);
searchSchemaManager.createOrUpdate();
} catch (Exception e) {
log.error("Error occurred while creating new index for entity: {}", entity, e);
resultMap.put(entity, false);
continue;
}
String newIndex = gsRestHighLevelClient.getIndexForAlias(writeAlias);
if (Objects.isNull(newIndex)) {
resultMap.put(entity, false);
continue;
}
MonitoringDataService.logDataToES("create_new_index", System.currentTimeMillis() - t1,
MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));
//remove read alias from new index and add to current index
t1 = System.currentTimeMillis();
if (!gsRestHighLevelClient.removeAlias(newIndex, readAlias) ||
!gsRestHighLevelClient.addAlias(currentIndex, readAlias, false)) {
resultMap.put(entity, false);
continue;
}
MonitoringDataService.logDataToES("switch_read_alias", System.currentTimeMillis() - t1,
MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));
//run mass indexing on new index
try {
t1 = System.currentTimeMillis();
MassIndexer massIndexer = searchSession.massIndexer(entityClass);
massIndexer.type(Rule.class).reindexOnly(
"e.ruleNumber like 'CR-%' or e.id in (select r.id from Rule r join SubRule sr on sr.rule.id " +
"=" + " r.id" + " join " + "CodeRule cr " + "on" + " sr" +
".codeRule.id = cr.id where cr.ruleNumber like 'ARC-%' or cr" +
".ruleNumber like 'AZC-%')");
massIndexer.startAndWait();
MonitoringDataService.logDataToES("mass_index", System.currentTimeMillis() - t1,
MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));
TimeUnit.SECONDS.sleep(configurationService.findKey("gs.indexing.interval.seconds", 60));
// change this time
} catch (Exception e) {
log.error("Error occurred while mass indexing on entity: {}", entity, e);
//continuing as GSDataIntegrity cron will take care of missing entries in new index
}
//add read index to new index
t1 = System.currentTimeMillis();
if (!gsRestHighLevelClient.addAlias(newIndex, readAlias, false)) {
resultMap.put(entity, false);
continue;
}
MonitoringDataService.logDataToES("new_index_read_alias", System.currentTimeMillis() - t1,
MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));
//delete current index
t1 = System.currentTimeMillis();
if (!gsRestHighLevelClient.deleteIndex(currentIndex)) {
resultMap.put(entity, false);
continue;
}
MonitoringDataService.logDataToES("delete_current_index", System.currentTimeMillis() - t1,
MonitoringData.ESDataType.MASS_INDEXING, generateMassIndexingTags(entity));
resultMap.put(entity, true);
}
return resultMap;
}
private Map<String, String> generateMassIndexingTags(String entity) {
Map<String, String> tags = new HashMap<>();
tags.put("entity", entity);
return tags;
}
@Transactional
public Map<String, Boolean> reindexParticularEntities(String searchKey, List<String> ids) {
SearchSession searchSession = Search.session(entityManager);
SearchIndexingPlan searchIndexingPlan = searchSession.indexingPlan();
Map<String, Boolean> resultMap = new HashMap<>();
Class entityClass = classNameMap.get(searchKey);
if (Objects.nonNull(entityClass)) {
for (String id : ids) {
//entityManager.getReference throws Exception if id is not present in db, .find returns null
Object entityObject = entityManager.find(entityClass, Long.parseLong(id));
if (Objects.nonNull(entityObject)) {
//if object is present in db, then send index request
searchIndexingPlan.addOrUpdate(entityObject);
resultMap.put(id, true);
} else {
//if object is not present in db, send delete doc request
searchIndexingPlan.purge(entityClass, Long.parseLong(id), null);
resultMap.put(id, false);
}
}
}
return resultMap;
}
@Transactional
public Map<String, Boolean> reindexEntitiesForCustomerId(List<String> entities, String customerId) {
SearchSession searchSession = Search.session(entityManager);
SearchIndexingPlan searchIndexingPlan = searchSession.indexingPlan();
SearchMapping searchMapping = Search.mapping(entityManager.getEntityManagerFactory());
Map<String, Boolean> resultMap = new HashMap<>();
for (String entity : entities) {
Class entityClass = classNameMap.get(entity);
if (Objects.isNull(entityClass)) {
resultMap.put(entity, false);
continue;
}
Object entityObject = null;
String tableName;
try {
entityObject = entityClass.newInstance();
} catch (ReflectiveOperationException e) {
log.error("Error while creating entity instance for entity: {}", entity);
resultMap.put(entity, false);
}
SessionImpl session = entityManager.unwrap(SessionImpl.class);
EntityPersister persister = session.getEntityPersister(null, entityObject);
if (persister instanceof AbstractEntityPersister) {
AbstractEntityPersister persisterImpl = (AbstractEntityPersister) persister;
tableName = persisterImpl.getTableName();
} else {
log.error("Unexpected persister type; a subtype of AbstractEntityPersister expected for entity: {}",
entity);
resultMap.put(entity, false);
continue;
}
//get all db object ids for searchkey
String dbQuery;
if (crossCustomerEntities.containsKey(entityClass)) {
log.error("Entity {} is cross-customer type, try reindexing using other endpoints", entity);
resultMap.put(entity, false);
continue;
} else if (customerLevelEntities.containsKey(entityClass)) {
dbQuery = generateDbQueryForSpecialEntities(tableName);
if (dbQuery.isEmpty()) {
log.error("Invalid entity {}", entity);
resultMap.put(entity, false);
continue;
}
} else {
dbQuery = "select id from " + tableName + " where customer_id=?";
}
Object[] args = {customerId};
int[] argsType = {4}; //argType code INTEGER
List<Map<String, Object>> dbObjects = jdbcTemplate.queryForList(dbQuery, args, argsType);
//delete all docs from ES for searchkey
String entityName = searchMapping.indexedEntity(entityClass).jpaName().toLowerCase();
String writeAlias = entityName + "-write";
String index = gsRestHighLevelClient.getIndexForAlias(writeAlias);
if (StringUtils.isBlank(index)) {
resultMap.put(entity, false);
continue;
}
BoolQueryBuilder finalBoolQuery = ESUtil.getBoolQuery();
MatchQueryBuilder customerIdQuery = ESUtil.constructMatchQuery("customerId", customerId);
finalBoolQuery.must(customerIdQuery);
MatchQueryBuilder searchKeyQuery = ESUtil.constructMatchQuery("searchKey", entity);
finalBoolQuery.must(searchKeyQuery);
DeleteByQueryRequest esQuery = new DeleteByQueryRequest(index);
esQuery.setQuery(finalBoolQuery);
try {
gsRestHighLevelClient.getClient().deleteByQuery(esQuery, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("IOException while deleting docs for entity: {}", entity);
resultMap.put(entity, false);
continue;
}
//reindex db objects to ES
for (Map<String, Object> idObj : dbObjects) {
Long id = (Long) idObj.get("id");
Object entityObjectToIndex = entityManager.getReference(entityClass, id);
searchIndexingPlan.addOrUpdate(entityObjectToIndex);
}
//refresh index (optional step)
RefreshRequest refreshRequest = new RefreshRequest(index);
try {
gsRestHighLevelClient.getClient().indices().refresh(refreshRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("IOException while refreshing index for entity: {}", entity);
}
resultMap.put(entity, true);
}
return resultMap;
}
private String generateDbQueryForSpecialEntities(String tableName) {
switch (tableName.toUpperCase()) {
case "RULE":
return "select r.id from rule r left join sub_rule sr on r.id = sr.rule_id left join code_rule cr on " +
"cr" + ".id " +
"= sr.code_rule_id join sub_policy sp on sp.id = r.subpolicy_id join policy p on p.id = sp" +
".policy_id where ((r.rule_number like 'CR-%' or cr.rule_number like any(array['ARC-%', " +
"'AZC-%'])) and r.is_deleted = false and p.customer_id = ?);";
// Add more cases according to requirements
default:
return "";
}
}
//used in ITs
@Transactional
public void purgeEntitiesFromES(List<Object> entities) {
SearchSession searchSession = Search.session(entityManager);
SearchIndexingPlan searchIndexingPlan = searchSession.indexingPlan();
for (Object entity : entities) {
String searchKey = ((SearchEntity) entity).getSearchKey();
Class entityClass = classNameMap.get(searchKey);
Long id = ((DomainObject) entity).getId();
searchIndexingPlan.purge(entityClass, id, null);
}
}
@Transactional
public void reindexEntity(Object entity) {
if (!entity.getClass().isAnnotationPresent(Indexed.class))
return;
SearchSession searchSession = Search.session(entityManager);
SearchIndexingPlan searchIndexingPlan = searchSession.indexingPlan();
searchIndexingPlan.addOrUpdate(entity);
}