Hibernate Search Scroller-API as Flow.Publisher

Hello,

are there any plans to make the Hibernate Search .scroll()-API expose as a java.util.concurrent.Flow.Publisher for a User-definied java.util.concurrent.Flow.Subscriber that processes the search results?

I wrap it myself and it’s hard because of transactions (at least for me) but the question is if it’s something the Hibernate team might consider in the future or maybe has already decided against it.

Reactive APIs have been considered, but are not priority in Hibernate Search right now, as we’re working on asynchronous indexing ([HSEARCH-3280] Simple, asynchronous, distributed reindexing relying on the database only (without any extra infrastructure) - Hibernate JIRA).

There is a ticket, though I intended to start with simpler things than scroll (such as a fetchAsync() that returns a CompletableFuture<SearchResult<T>>): [HSEARCH-3322] Async, reactive-friendly methods for simple search and indexing - Hibernate JIRA

That being said, I think this would mostly make sense for queries that do not load entities (projections only). When it comes to loading entities, to be honest I have no idea whether a Publisher can be implemented correctly with Hibernate ORM, because ORM is blocking by nature. There’s a whole new project to do reactive programming with ORM, called Hibernate Reactive. Maybe we would need an integration between Hibernate Reactive and Hibernate Search.

So, yeah. One day, but definitely not right now :slight_smile:

I’m curious though, are you sure you need the full complexity of Publisher/Subscriber? Any reason a simple loop, or a callback passed to a method implementing the loop, is not enough?

I’m curious though, are you sure you need the full complexity of Publisher / Subscriber ? Any reason a simple loop, or a callback passed to a method implementing the loop, is not enough?

I like the API with onError, onNext and onSuccess and i like the backpressure request(amount) from Reactor. I think the error handling of the whole chain would be more clear to other developers.

Yes Callback will also work but i think i’d also have to add the onError and the onSuccess and onFinally callback anyway.

I don’t need the Reactive (at no time blocking) features. Just call async jobs to return to the user early at the places i want. At other places I’d rather want to wait for the result. I think it would be quite readable with the subscribeOn() in my Code.

Ok so here is some prose-code I think of to build my processing/exporting code that will be used in different contexts (async, synchronous, export as excel, export as csv). Please anybody that reads this do not copy it! I’m new to Flux/Reactive/Publishers and i heavily reduced/changed my working prototype code to this

// Create REST-Entity for API Users that shows the percentage 
// create a tempfile to writeto
Flux.of(hibernateSearchScroller)
                // todo: wrap all the following in some kind of second subsciber
                .doOnNext(v -> // update user facing percentage)
                .doOnSuccess(v -> // transfer exported file to database )
                .doOnError(e -> // set exportjob as failed)
                .doFinally(type -> // delete tempfiles)
                .subscribeOn(Schedulers.boundedElastic()) // run async
                .subscribe(new WriteToExcelWithBackpressureSubscriber(MyExcelFromat.exampleFormat()));

Maybe at some other place in code without the user facing percentage info

Flux.of(hibernateSearchScrollerOnlyIDs)
                .map(HibernateReactive::load) // edit: this is how id imagine a high level hibernate reactive integration ignoring all the problems of batch fetching but maybe this one could handle it inside by grouping 10 ids or something like that
                .subscribe(new WriteToCSVWithBackpressureSubscriber(MyCSVFromat.exampleFormat()));
                ...

At a third place with jpa (or rest client or whatever) but not Hibernate Search:

Flux.of(jpaScroller)
                .subscribe(new WriteToCSVWithBackpressureSubscriber(MyCSVFromat.exampleFormat()));
                ...

I see. Well yes that looks useful, though as always the devil is in the details :slight_smile:

If you’re not interested in non-blocking and are fine with using a background thread-pool for loading (as well as using a connection from the pool for the whole duration of your batch process), then yes it should be doable, even without Hibernate Reactive.

Note that request(amount) could be a problem, as Elasticsearch scrolls set the size of each chunk explicitly at the start of the scroll, and it cannot be changed afterwards. So there will be a need for an adapter to cache. It’s doable though, and in fact we had to do something similar in org.hibernate.search.mapper.orm.search.query.spi.HibernateOrmSearchScrollableResultsAdapter.

Using List<MyEntity> as the T type in Publisher<T> could prove much easier, especially if you intend to clear the session regularly to avoid memory problems.

In any case… feel free to update us on your progress! It would be great if your code could eventually become the basis for an implementation in Hibernate Search :slight_smile:

Ok, what i can already share is this: (but please dear reader do not copy it, it is not tested at all)

    public Flux<MyEntity> searchScroll(Pageable pageable, MySearchRequest msear h) {
        final int MAX_BATCH_LOAD_SIZE = 50;

        // TODO: Test with TCK & fullfill the Publisher Spec - Do not use in Production
        final Flux<SearchScrollResult<MyEntity>> flux = Flux.push(sink -> {
            // TODO transactions not tested at all
            final DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setReadOnly(true);
            final TransactionStatus tx = transactionManager.getTransaction(def);
            sink.onDispose(() -> {
                transactionManager.commit(tx);
            });

            final SearchSession searchSession = Search.session(entityManager);

            final SearchScroll<MyEntity> scroll = searchSession.search(MyEntity.class)
                    .where(f -> whereSuche(f, mysearch))
                    .sort(f -> sort(f, pageable))
                    .scroll(Math.min(pageable.getPageSize(), MAX_BATCH_LOAD_SIZE));
            sink.onRequest(n -> {
                synchronized (searchSession) {
                    for (int i = 0; i < n; i++) {
                        final SearchScrollResult<MyEntity> next = scroll.next();
                        if (!next.hasHits()) {
                            sink.complete();
                            return;
                        }
                        sink.next(next);
                        entityManager.flush();
                        entityManager.clear();
                    }
                }
            });
        }, FluxSink.OverflowStrategy.BUFFER);

        return flux
                .publishOn(Schedulers.boundedElastic()) // todo im not sure if this is the correct way here at this place in the code
                .flatMapSequential(r -> Flux.fromIterable(r.hits()))
                .limitRate(100);
    }
1 Like

Seems interesting. It’s using Reactor though, which for various reasons we cannot add as a dependency in Hibernate Search (at least not in the core modules).

If we ever implement this in Hibernate Search, we will preferably use JDK APIs (Flow, CompletableFuture), or if we really need a library, Munity. Both of them should integrate just fine with Reactor.

Update with Munity variant:

  • (1) I can’t put the scrolling for loop out of the Transaction (or in tiny new ones per .next()) because Hibernate Search fails. (As you already said ofc). I think getting the EntityReference without ORM Session/Transaction its something you’d might consider but i understand it’s an really advanced case. Maybe I just used something wrong and there is a way?
    I get
    java.util.concurrent.ExecutionException: org.hibernate.search.util.common.SearchException: HSEARCH800017: Underlying Hibernate ORM Session is closed.
    
    at scroll.next()
  • (2) The example below runs through a simple test with a subscription but i haven’t gotten the Reactive TCK to work yet ( :grinning: all green now) but please reader still do not use this code it is not tested in production and the transaction runs very long!!!

Full code here: Draft: add publisher tryout with mutiny (!1) · Merge requests · Peter Müller / spring-hibernate-search-6-demo · GitLab

    @Autowired
    private PlatformTransactionManager transactionManager;


    /** not tested againt the reactive spec and TCK, will probably not work in many cases */ 
    public Publisher<Integer> searchScroll(Pageable pageable, String query) {
        return Multi.createFrom().<SearchScrollResult<EntityReference>>emitter(
                multiEmitter -> {
                    TransactionTemplate tx = new TransactionTemplate(transactionManager);
                    tx.setReadOnly(true);
                    tx.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);

                    tx.executeWithoutResult((status) -> {
                        SearchSession session = Search.session(entityManager);
                        SearchScroll<EntityReference> scroll = session.search(Person.class)
                                .select(SearchProjectionFactory::entityReference)
                                .where(
                                        f -> f.match().fields(Person_.NAME, Person_.ADDRESS)
                                                .matching(query)
                                                .fuzzy(1)
                                ).scroll(pageable.getPageSize());


                        for ( SearchScrollResult<EntityReference> chunk = scroll.next();
                              chunk.hasHits(); chunk = scroll.next() ) {

                            while (multiEmitter.requested() == 0) {
                                try {
                                    TimeUnit.SECONDS.sleep((long) (Math.random() * 1));
                                } catch (InterruptedException e) {
                                    multiEmitter.fail(e);
                                }
                            }
                            multiEmitter.emit(chunk);
                            entityManager.flush();
                            entityManager.clear();
                        }
                    });

                    multiEmitter.complete();
                },
                BackPressureStrategy.BUFFER
        )
                .flatMap(r -> Multi.createFrom().items(r.hits().stream().map(ref -> (Integer) ref.id())))
                .convert().toPublisher();
    }

About the entity managers, it’s more about how Spring’s injected EntityManagers work than about Hibernate Search. Basically the injected EntityManager is just a proxy, it’s not a real one. The real one is created when you start a transaction, and the proxy just delegates to the real one.

So I think your options are:

  1. Open a large transaction like you did. The transaction may time out depending on your underlying stack, though, so it may not work great in all situations. Also this means holding onto a DB connection even though you don’t load anything.
  2. Open small transactions and entity managers for each iteration of the loop, as you mentioned. It feels a bit wasteful if you don’t actually load anything, though.
  3. Work without transactions, creating an entity manager explicitly from an injected EntityManagerFactory.
  4. Fix HSEARCH-3519, which should allow you to run search without a session.

I believe solution 1 is the way to go, though YMMV.

It might be improved to directly load the entities instead of emitting entity references, but there are several challenges.

First, Spring’s transaction-scoped entity managers are based on thread-locals, so the consumer threads (if different from the publishing thread) probably cannot use that. You may end up creating the entity manager manually.

Second, I don’t think the call to multiEmitter.emit is synchronous, so the objects you emit may be used after the call to emit() returns… Which means during or after:

  • the call to flush()
  • the call to clear()
  • the transaction commit
  • the call to entityManager.close() (done automatically by Spring, on commit)

Each of the above could just wreck the state of your session. If the publisher pattern is meant to be completely asynchronous, then I believe it’s not possible to do Hibernate ORM loading here; either you have to use EntityReference (like you did) or we’ll have to work on some support for Hibernate Reactive (which is designed to handle such use cases). Support for Hibernate Reactive would be a lot of work, obviously :slight_smile:

1 Like

Ok thanks! HSEARCH-3519 is the thing for me i think.

Thanks for the Spring pointer that it might close the Entitymanager on Transaction end!

To clarify my Usecase: I actually want to fetch only selected data fields for the id from the Database by JPQL or Entity Graph in the next steps so I don’t care about the closed session.

        CompletableFuture<Long> done = Multi.createFrom()
                .publisher(personSearchService.searchScroll(PageRequest.of(0, 100), "hans"))
                .group().intoLists().of(100)
                .onItem().transformToMultiAndConcatenate(ids -> Multi.createFrom().iterable(personRepository.findAllByIdOnlyReducedFields(ids)))
                .invoke((Consumer<PersonReducedFields>) System.out::println)
                // do something random with the result
                .call(a -> Uni.createFrom().item(a).onItem().delayIt().by(Duration.ofMillis(200)))
                // some random aggregate operation
                .collect().with(Collectors.counting())
                .runSubscriptionOn(Infrastructure.getDefaultExecutor())
                .subscribe().asCompletionStage();

        Long count = done.get(10, TimeUnit.SECONDS);
        Assertions.assertEquals(PERSON_COUNT, count, "All persons should be found");