Menu

Official website

Parallel execution: A Hibernate Reactive Gotcha


22 May 2023

min read

Hibernate Reactive is a reactive API for Hibernate, enabling you to create applications that use Hibernate for persistence, but that don’t rely on JDBC and that - unlike regular Hibernate and JDBC - work without blocking any threads.

Hibernate Reactive gives you a reactive Session, with many of the operations that are familiar for Hibernate users, but this time they don’t perform their work in a blocking way, but rather return an asynchronous type.

The reactive Session is available in two flavours: the Stage.Session which uses Java’s CompletionStage as the async type, and a Mutiny.Session, which uses Red Hat’s Mutiny asynchronous type Uni.

In this blog post, I’m using the Mutiny flavour, which is what you’d typically use in a Quarkus application.

Evaluating Unis in parallel

When dealing with Unis, a common need is to evaluate them in parallel. For example, when we call two web services, whose responses are returned in a Uni, we can combine the result using Uni.combine().all():

Uni<String> one = service1.call();
Uni<String> two = service2.call();

Uni.combine()
        .all()
        .unis(one, two)
        .combinedWith((first, second) -> first + " " + second);

This will subscribe to both Unis in parallel.

In fact, it’s quite common to subscribe to multiple Unis in parallel. A consumer of Unis would typically expect that it can do so.

Hibernate Reactive Unis

The Unis that you get back from Hibernate Reactive are different though! The Hibernate documentation has a pretty clear warning:

The session is not thread-safe (or "stream-safe"), so using it across different threads (or reactive streams) may cause bugs that are extremely hard to detect. Don’t say we didn’t warn you!
— Hibernate reactive docs

If you’re an unsuspecting person thinking I don’t need to read the Hibernate docs, because I already know Unis, you might be inclined to evaluate multiple Unis that you get from Hibernate reactive in parallel, for example using Uni.join() or Uni.combine().

If you do that, you’ll run into "wonderful" errors such as:

2023-05-19 14:39:10,359 ERROR [io.sma.graphql] (vert.x-eventloop-thread-2) SRGQL012000: Data Fetching Error: java.lang.IllegalStateException: Session/EntityManager is closed
	at org.hibernate.internal.AbstractSharedSessionContract.checkOpen(AbstractSharedSessionContract.java:429)

or

2023-05-19 14:42:48,145 ERROR [io.sma.graphql] (vert.x-eventloop-thread-1) SRGQL012000: Data Fetching Error: java.lang.IllegalStateException: Illegal pop() with non-matching JdbcValuesSourceProcessingState

What’s going on exactly?

As mentioned in the docs, Hibernate Reactive doesn’t support using the same Session from multiple reactive streams. This is effectively what you do when you evaluate multiple Unis that use the same Session in parallel.

Concurrent opening of a Session with Quarkus SessionOperations is also not supported, which is something that could happen if you evaluable multiple Unis that use the SessionOperations.withSession or SessionOperations.withTransaction methods or the @WithSession or @WithTransaction annotations.

It’s interesting to realize that the problem is not only parallel execution (on multiple threads) that’s problematic here, but also concurrent execution of two streams by the same thread. The latter is what typically happens, because Quarkus runs everything belonging to the same Vertx context on the same Vertx eventloop thread.

Conceptually, what happens is that the first stream makes a query to the DB and stores some state in the Vertx context. Then it hits an async boundary, because it needs to wait for the DB to return a response. The scheduler then runs the second stream (on the same thread) which finds unexpected state in the Vertx context, and it throws an exception.

How to prevent this?

If your own code evaluates Unis in parallel

If your own code evaluates Unis in parallel, you have several options:

The most obvious option is to use .usingConcurrencyOf(1) when using Uni.join() or Uni.combine(), for example:

Uni.join().all(one, two).usingConcurrencyOf(1); // Will evaluate `one` and `two` sequentially.

Another way is to use flatMap for more explicit sequential evaluation:

one.flatMap(x -> two.map(y -> x + " " + y)); // Will evaluate `one` and `two` sequentially.

If someone else’s code evaluates Uni’s in parallel

Sometimes you pass Unis from Hibernate Reactive to some library for further processing. In that case, you may not have control over how this library evaluates the Unis.

This bit us in the past, when using Hibernate Reactive together with Smallrye GraphQL: https://github.com/quarkusio/quarkus/issues/32870

In this scenario, the GraphQL engine decides to evaluate Unis in parallel (which it should, according to the GraphQL spec!), which is not allowed for Hibernate Unis.

In the next sections we’ll dive a little deeper into the fundamental problem here, solutions of other libraries, and what we can do in our situation.

What’s the problem here?

In my opinion, the fundamental problem is that Hibernate Reactive doesn’t return 'regular' Unis, but Unis with an additional instruction manual. The instruction manual (don’t evaluate them in parallel!) doesn’t show up in the type, so developers won’t know about them until they read the manual. Worse, other libraries won’t know about them either and violate their rules.

Another approach: Doobie

Doobie is a popular database access library for Scala, which has solved this by using a specific type ConnectionIO, which does not allow parallel composition, but only sequential composition. You can convert from this type to a more general type (somewhat like Uni), but at that point you define the transaction boundary.

So the entire instruction manual of you can not run database work on the same transaction in parallel, which is similar to the constraint of Hibernate, is encoded in the type system. Beautiful.

Making your Unis safe

So, the problem is that Hibernate Unis should not be evaluated in parallel, and that libraries that consume the Unis are not aware of this restriction.

So naturally, we could ask: can we remove this constraint, and make our Unis safe for parallel execution?

The answer is that we can, and it’s not terribly complicated.

What we want to do is to shift the responsibility for sequential execution from outside the Unis to inside the Unis. So that if two Hibernate Reactive Unis are run in parallel, one of them will wait with executing the actual Hibernate work until the other is done.

We will make a mutex for Unis that doesn’t block threads, allowing multiple Unis evaluated in parallel to sequence themselves, without blocking threads.

A Uni Mutex

We want to create a semaphore that we can use to protect a critical section:

public interface UniSemaphore {
    <T> Uni<T> protect(Supplier<Uni<T>> inner);
}

If we have an instance of this UniSemaphore, we can protect critical sections by calling the protect method. The returned Uni<T> will acquire a permit from the semaphore before executing, and return it when it’s completed (either with a value or with an error).

Here’s an implementation:

class UniSemaphoreImpl implements UniSemaphore {

    private int permits;
    private final Queue<UniEmitter<Void>> queue;

    public UniSemaphoreImpl(int permits) {
        assert(permits > 0);
        this.permits = permits;
        queue = new LinkedBlockingDeque<>();
    }

    @Override
    public <T> Uni<T> protect(Uni<T> uni) {
        return acquire().replaceWith(uni).eventually(this::release);
    }

    private Uni<Void> release() {
        return Uni.createFrom().item(() -> {
            synchronized (this) {
                UniEmitter<Void> next = queue.poll();
                if (next == null) {
                    permits++;
                } else {
                    next.complete(null);
                }
                return null;
            }
        });
    }

    private Uni<Void> acquire() {
        return Uni.createFrom().deferred(() -> {
            synchronized (this) {
                if (permits >= 1) {
                    permits--;
                    return Uni.createFrom().voidItem();
                } else {
                    return Uni.createFrom().emitter(emitter -> queue.add((UniEmitter<Void>) emitter));
                }
            }
        });
    }
}

The protect method will wrap the Uni with work (typically the Uni doing Hibernate reactive stuff), between an acquire and a release.

acquire will see if there are permits, if so it’ll take one and perform the work. If not, it will schedule the work in a queue and immediately return a Uni. This way, it doesn’t block the thread if there’s no permit available.

release will run when the work completes or when it fails and will either start queued work, or return the permit.

Now we can convert 'unsafe' Unis to 'safe' Unis, that a user can safely run in parallel:

Uni<String> unsafe1 = repo.getFoo();
Uni<String> unsafe2 = repo.getBar();

Uni.join().all(unsafe1, unsafe2).andFailFast(); // This will cause Hibernate exceptions

// Now make them 'safe':
UniSemaphore mutex = new UniSemaphoreImpl(1);
Uni<String> safe1 = mutex.protect(() -> unsafe1);
Uni<String> safe2 = mutex.protect(() -> unsafe2);

Uni.join().all(safe1, safe2).andFailFast(); // This will work properly

Alternative approach

An alternative approach to executing Unis in parallel, is making sure each Uni runs on their own Vertx context, with each their own Hibernate Reactive Session:

public static <T> Uni<T> runOnDuplicateContex(Supplier<Uni<T>> uni) {
    Context ctx = VertxContext.createNewDuplicatedContext();
    VertxContextSafetyToggle.setContextSafe(ctx, true);
    return Uni.createFrom().emitter(e ->
        ctx.runOnContext(ignore -> uni.get().subscribe().with(e::complete, e::fail)));
}

This allows true parallel execution of Unis, at the expense of losing transactionality.

Next steps

In a follow-up blog post we’ll show how to use CDI Interceptors to do the wrapping of Uni. This makes the conversion from a Uni you can’t run in parallel to one that you can, a little nicer.

expand_less