Hibernate Search Scroller-API as Flow.Publisher

Hello,

are there any plans to make the Hibernate Search .scroll()-API expose as a java.util.concurrent.Flow.Publisher for a User-definied java.util.concurrent.Flow.Subscriber that processes the search results?

I wrap it myself and it’s hard because of transactions (at least for me) but the question is if it’s something the Hibernate team might consider in the future or maybe has already decided against it.

Reactive APIs have been considered, but are not priority in Hibernate Search right now, as we’re working on asynchronous indexing ([HSEARCH-3280] Simple, asynchronous, distributed reindexing relying on the database only (without any extra infrastructure) - Hibernate JIRA).

There is a ticket, though I intended to start with simpler things than scroll (such as a fetchAsync() that returns a CompletableFuture<SearchResult<T>>): [HSEARCH-3322] Async, reactive-friendly methods for simple search and indexing - Hibernate JIRA

That being said, I think this would mostly make sense for queries that do not load entities (projections only). When it comes to loading entities, to be honest I have no idea whether a Publisher can be implemented correctly with Hibernate ORM, because ORM is blocking by nature. There’s a whole new project to do reactive programming with ORM, called Hibernate Reactive. Maybe we would need an integration between Hibernate Reactive and Hibernate Search.

So, yeah. One day, but definitely not right now :slight_smile:

I’m curious though, are you sure you need the full complexity of Publisher/Subscriber? Any reason a simple loop, or a callback passed to a method implementing the loop, is not enough?

I’m curious though, are you sure you need the full complexity of Publisher / Subscriber ? Any reason a simple loop, or a callback passed to a method implementing the loop, is not enough?

I like the API with onError, onNext and onSuccess and i like the backpressure request(amount) from Reactor. I think the error handling of the whole chain would be more clear to other developers.

Yes Callback will also work but i think i’d also have to add the onError and the onSuccess and onFinally callback anyway.

I don’t need the Reactive (at no time blocking) features. Just call async jobs to return to the user early at the places i want. At other places I’d rather want to wait for the result. I think it would be quite readable with the subscribeOn() in my Code.

Ok so here is some prose-code I think of to build my processing/exporting code that will be used in different contexts (async, synchronous, export as excel, export as csv). Please anybody that reads this do not copy it! I’m new to Flux/Reactive/Publishers and i heavily reduced/changed my working prototype code to this

// Create REST-Entity for API Users that shows the percentage 
// create a tempfile to writeto
Flux.of(hibernateSearchScroller)
                // todo: wrap all the following in some kind of second subsciber
                .doOnNext(v -> // update user facing percentage)
                .doOnSuccess(v -> // transfer exported file to database )
                .doOnError(e -> // set exportjob as failed)
                .doFinally(type -> // delete tempfiles)
                .subscribeOn(Schedulers.boundedElastic()) // run async
                .subscribe(new WriteToExcelWithBackpressureSubscriber(MyExcelFromat.exampleFormat()));

Maybe at some other place in code without the user facing percentage info

Flux.of(hibernateSearchScrollerOnlyIDs)
                .map(HibernateReactive::load) // edit: this is how id imagine a high level hibernate reactive integration ignoring all the problems of batch fetching but maybe this one could handle it inside by grouping 10 ids or something like that
                .subscribe(new WriteToCSVWithBackpressureSubscriber(MyCSVFromat.exampleFormat()));
                ...

At a third place with jpa (or rest client or whatever) but not Hibernate Search:

Flux.of(jpaScroller)
                .subscribe(new WriteToCSVWithBackpressureSubscriber(MyCSVFromat.exampleFormat()));
                ...

I see. Well yes that looks useful, though as always the devil is in the details :slight_smile:

If you’re not interested in non-blocking and are fine with using a background thread-pool for loading (as well as using a connection from the pool for the whole duration of your batch process), then yes it should be doable, even without Hibernate Reactive.

Note that request(amount) could be a problem, as Elasticsearch scrolls set the size of each chunk explicitly at the start of the scroll, and it cannot be changed afterwards. So there will be a need for an adapter to cache. It’s doable though, and in fact we had to do something similar in org.hibernate.search.mapper.orm.search.query.spi.HibernateOrmSearchScrollableResultsAdapter.

Using List<MyEntity> as the T type in Publisher<T> could prove much easier, especially if you intend to clear the session regularly to avoid memory problems.

In any case… feel free to update us on your progress! It would be great if your code could eventually become the basis for an implementation in Hibernate Search :slight_smile:

Ok, what i can already share is this: (but please dear reader do not copy it, it is not tested at all)

    public Flux<MyEntity> searchScroll(Pageable pageable, MySearchRequest msear h) {
        final int MAX_BATCH_LOAD_SIZE = 50;

        // TODO: Test with TCK & fullfill the Publisher Spec - Do not use in Production
        final Flux<SearchScrollResult<MyEntity>> flux = Flux.push(sink -> {
            // TODO transactions not tested at all
            final DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setReadOnly(true);
            final TransactionStatus tx = transactionManager.getTransaction(def);
            sink.onDispose(() -> {
                transactionManager.commit(tx);
            });

            final SearchSession searchSession = Search.session(entityManager);

            final SearchScroll<MyEntity> scroll = searchSession.search(MyEntity.class)
                    .where(f -> whereSuche(f, mysearch))
                    .sort(f -> sort(f, pageable))
                    .scroll(Math.min(pageable.getPageSize(), MAX_BATCH_LOAD_SIZE));
            sink.onRequest(n -> {
                synchronized (searchSession) {
                    for (int i = 0; i < n; i++) {
                        final SearchScrollResult<MyEntity> next = scroll.next();
                        if (!next.hasHits()) {
                            sink.complete();
                            return;
                        }
                        sink.next(next);
                        entityManager.flush();
                        entityManager.clear();
                    }
                }
            });
        }, FluxSink.OverflowStrategy.BUFFER);

        return flux
                .publishOn(Schedulers.boundedElastic()) // todo im not sure if this is the correct way here at this place in the code
                .flatMapSequential(r -> Flux.fromIterable(r.hits()))
                .limitRate(100);
    }
1 Like

Seems interesting. It’s using Reactor though, which for various reasons we cannot add as a dependency in Hibernate Search (at least not in the core modules).

If we ever implement this in Hibernate Search, we will preferably use JDK APIs (Flow, CompletableFuture), or if we really need a library, Munity. Both of them should integrate just fine with Reactor.