search
  • Sign In
  • Sign Up
Password reset successful

Follow the proiects vou are interested in andi aet the latestnews about them taster

Table of Contents
1. Problem background and common misunderstandings
2. Mutiny asynchronous stream processing core: Uni and Multi
3. Solution 1: Handle asynchronous streams gracefully in a test environment (combined with Vert.x Unit)
4. Solution 2: Blocking wait for all asynchronous results
5. Precautions and best practices
6. Summary
Home Java javaTutorial Mutiny asynchronous stream processing: efficient concurrent processing of elements in Uni

Mutiny asynchronous stream processing: efficient concurrent processing of elements in Uni

Dec 01, 2025 pm 01:18 PM

Mutiny asynchronous stream processing: efficient concurrent processing of elements in Uni

This article takes an in-depth look at how to asynchronously process each element in a Uni in the Mutiny framework. By converting Uni to a Multi stream and using the onItem().transformToUniAndMerge() operator, concurrent and asynchronous processing of elements in the list can be achieved. The article provides two main solutions: combining Vert.x Unit for non-blocking testing, and using collect().asList().await().indefinitely() for blocking result collection, and highlights relevant considerations and best practices.

1. Problem background and common misunderstandings

In reactive programming, we often encounter the need to handle asynchronous operations involving multiple elements. For example, given a Uni>, we want to perform a time-consuming asynchronous task on each string in the list and eventually collect or process the results of all tasks.

A common attempt is to use map to convert List to List>, and then merge these Unis via Uni.join().all(unis).andCollectFailures(). However, this method may not achieve the expected concurrent processing effect, or in a short-life cycle program (such as a unit test), the asynchronous task will be terminated before being completed due to the premature exit of the main thread, thus giving the illusion that "only the first element has been processed."

The core of the problem is that Uni> itself represents a single value stream, and its value is a complete list. If you want to operate asynchronously on each element in the list and treat it as an independent reactive event, you need to "unroll" the list into a stream that can be processed one by one. Mutiny provides the Multi type to handle streams of zero to N elements, which is the key to solving such problems.

2. Mutiny asynchronous stream processing core: Uni and Multi

Mutiny is a widely used reactive programming library in frameworks such as Quarkus. It provides two core types:

  • Uni : Represents an asynchronous operation that will eventually emit 0 or 1 elements, or a failure event.
  • Multi : Represents an asynchronous operation stream that can emit 0 to N elements, or a failure event, and will eventually emit a completion event.

To achieve asynchronous and concurrent processing of each element in Uni>, we need to first convert Uni> into a Multi, so that each string in the list becomes an independent event in the Multi stream. We can then apply an asynchronous transformation to each event in this Multi stream.

3. Solution 1: Handle asynchronous streams gracefully in a test environment (combined with Vert.x Unit)

In unit testing or scenarios that require non-blocking waiting for all asynchronous operations to be completed, we can use the Multi feature and the onTermination().invoke() callback to ensure that all tasks are completed. The following example incorporates a Vert.x Unit, which provides an Async mechanism to manage the lifecycle of asynchronous tests.

 import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@ExtendWith(VertxExtension.class)
public class AsyncListProcessingTest {

    // Simulate an asynchronous operation and return a Uni
    private Uni<string> processItemAsync(String item, Random random) {
        final int duration = (random.nextInt(5) 1) * 1000; // Random delay of 1-5 seconds System.out.println("Starting process for: " item ", duration: " duration "ms");
        return Uni.createFrom().item(item)
                .onItem().delayIt().by(Duration.ofMillis(duration))
                .invoke(() -&gt; System.out.println("Finished process for: " item));
    }

    @Test
    public void testAsyncProcessingWithVertxUnit(VertxTestContext context) {
        Random random = new Random();
        // The Async object of Vert.x Unit, used to notify the test framework when the asynchronous operation is completed context.verify(() -&gt; { // Ensure that Uni.createFrom() is executed in the context of VertxTestContext
                    .item(List.of("a", "b", "c")) // Initial Uni<list>&gt;
                    // 1. Convert Uni<list>&gt; to Multi<string>
                    .onItem().transformToMulti(Multi.createFrom()::iterable)
                    // 2. Apply an asynchronous transformation to each element in the Multi and merge the results back into the Multi
                    .onItem().transformToUniAndMerge(s -&gt; processItemAsync(s, random))
                    // 3. Subscribe to the Multi stream and process each completed element.subscribe()
                    .with(
                            s -&gt; System.out.println("Printing result: " s), // Successfully process each element context::failNow, // Any error causes the stream to fail context::completeNow // The stream is completed, notifying VertxTestContext that the test is over);
        });
    }
}</string></list></list></string>

Code explanation:

  1. Uni.createFrom().item(List.of("a", "b", "c")) : Create an initial Uni containing a list of strings.
  2. .onItem().transformToMulti(Multi.createFrom()::iterable) : This is the key step to convert Uni> to Multi. It expands the contents of the list emitted by Uni so that each element in the list becomes a new Multi event.
  3. .onItem().transformToUniAndMerge(s -> processItemAsync(s, random)) : This is the core of implementing concurrent asynchronous processing.
    • transformToUni: For each element s in Multi, the processItemAsync(s, random) method is called, which returns a Uni.
    • andMerge: Mutiny will concurrently subscribe to and execute these Unis created by transformToUni. When any Uni completes, its results are immediately merged into the output Multi stream. This means that the order of the results may not be the same as the original list, but instead depends on which asynchronous operation completes first.
  4. .subscribe().with(...) : Subscribe to the final Multi stream.
    • The first Lambda expression handles each successful result emitted in the Multi.
    • The second Lambda expression handles any errors in the stream.
    • The third Lambda expression is called when the stream completes successfully, here we use context::completeNow to notify Vert.x Unit that the test has successfully completed all asynchronous operations.

4. Solution 2: Blocking wait for all asynchronous results

In some scenarios, such as command line tools or when we need to wait for all asynchronous operations to complete before continuing the main program execution, we can choose to block the current thread until all results are collected.

 import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

import java.time.Duration;
import java.util.List;
import java.util.Random;

public class BlockingAsyncListProcessing {

    private static Uni<string> processItemAsync(String item, Random random) {
        final int duration = (random.nextInt(5) 1) * 1000; // Random delay of 1-5 seconds return Uni.createFrom().item(item)
                .onItem().delayIt().by(Duration.ofMillis(duration))
                .invoke(() -&gt; System.out.println("Letter: " item ", duration in ms: " duration));
    }

    public static void main(String[] args) {
        Random random = new Random();

        System.out.println("Starting blocking asynchronous processing...");

        List<string> results = Uni.createFrom()
                .item(List.of("a", "b", "c")) // Initial Uni<list>&gt;
                // 1. Convert Uni<list>&gt; to Multi<string>
                .onItem().transformToMulti(Multi.createFrom()::iterable)
                // 2. Apply an asynchronous transformation to each element in the Multi and merge the results back into the Multi
                .onItem().transformToUniAndMerge(s -&gt; processItemAsync(s, random))
                // 3. Optional: handle each completed element.onItem().invoke(s -&gt; System.out.println("Printing collected item: " s))
                // 4. Collect all elements in Multi into a list.collect().asList()
                // 5. Block the current thread until Uni<list>&gt; completes and returns the result.await().indefinitely();

        System.out.println("All items processed. Collected results: " results);
    }
}</list></string></list></list></string></string>

Code explanation:

  1. The first two steps are the same as solution one : convert Uni> to Multi, and then use onItem().transformToUniAndMerge() to process each element concurrently.
  2. .collect().asList() : This operator collects all emitted elements in the Multi stream into a List, and finally returns a Uni>. This Uni will emit a list containing all collected elements when the source Multi completes.
  3. .await().indefinitely() : This is a blocking operation. It blocks the current thread until the upstream Uni> emits its result (i.e. all asynchronous operations complete and the results are collected into the list). indefinitely() means waiting indefinitely.

5. Precautions and best practices

  • Non-blocking first : Use non-blocking reactive mode as much as possible (such as solution 1). The await() operation blocks the current thread and should be used with caution in production environments, especially in I/O-intensive or web applications, as it may cause thread starvation and performance issues. It is more suitable for startup code, testing, or specific scenarios where you need to wait synchronously for all asynchronous tasks to complete.
  • Error handling : There should be complete error handling logic in the Uni created inside transformToUniAndMerge and in the final subscribe().with() method. Mutiny provides a wealth of error handling operators, such as onFailure().recoverWith(), onFailure().retry(), etc.
  • Concurrency : transformToUniAndMerge will process tasks concurrently, but the actual concurrency may be limited by the underlying thread pool configuration, system resources, and the implementation of specific asynchronous operations. If you need fine control over concurrency, consider using the transformToUniAndMerge(concurrency, ...) variant.
  • Order guarantee : transformToUniAndMerge does not guarantee that the order of the results is consistent with the order of the original list. If you need to maintain order, consider using transformToUniAndConcatenate or sorting manually after collection.
  • Resource management : Ensure that any external resources (such as database connections, file handles) used in asynchronous operations are properly managed and released.

6. Summary

Through Mutiny's Multi type and onItem().transformToUniAndMerge() operator, we can effectively convert each element in Uni> into an independent asynchronous task and process it concurrently. Depending on the application scenario, we can choose the non-blocking subscription mode (suitable for reactive systems and testing) or the blocking await() mode (suitable for specific scenarios that require synchronous waiting for results). Understanding and correctly using these Mutiny operators is key to building efficient and robust reactive applications.

The above is the detailed content of Mutiny asynchronous stream processing: efficient concurrent processing of elements in Uni. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undress AI Tool

Undress AI Tool

Undress images for free

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

ArtGPT

ArtGPT

AI image generator for creative art from text prompts.

Stock Market GPT

Stock Market GPT

AI powered investment research for smarter decisions

Popular tool

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

How to configure Spark distributed computing environment in Java_Java big data processing How to configure Spark distributed computing environment in Java_Java big data processing Mar 09, 2026 pm 08:45 PM

Spark cannot run in local mode, ClassNotFoundException: org.apache.spark.sql.SparkSession. This is the most common first step of getting stuck: even the dependencies are not correct. Only spark-core_2.12 is written in Maven, but spark-sql_2.12 is not added. SparkSession crashes as soon as it is built. The Scala version must strictly match the official Spark compiled version - Spark3.4.x uses Scala2.12 by default. If you use spark-sqljar of 2.13, the class loader cannot directly find the main class. Practical advice: Go to mvnre

The correct way to send emails in batches using JavaMail API in Java The correct way to send emails in batches using JavaMail API in Java Mar 04, 2026 am 10:33 AM

This article explains in detail how to correctly set multiple recipients (BCC/CC/TO) through javax.mail in Java, solves common misunderstandings - repeatedly calling setRecipients() causes only the first/last address to take effect, and provides a safe and reusable code implementation.

Elementary practice: How to write a simple console blog searcher in Java_String matching Elementary practice: How to write a simple console blog searcher in Java_String matching Mar 04, 2026 am 10:39 AM

String.contains() is not suitable for blog search because it only supports strict substring matching and cannot handle case, spaces, punctuation, spelling errors, synonyms and fuzzy queries; preprocessing toLowerCase() indexOf() or escaped wildcard regular matching (such as .*java.*config.*) is a more practical lightweight alternative.

How to safely map user-entered weekday string to integer value and implement date offset operation in Java How to safely map user-entered weekday string to integer value and implement date offset operation in Java Mar 09, 2026 pm 09:43 PM

This article introduces a concise and maintainable way to map the weekday string (such as "Monday") to the corresponding serial number (1-7), and use the modulo operation to realize the forward and backward offset of any number of days (such as Monday plus 4 days to get Friday), avoiding lengthy if chains and hard-coded logic.

How to generate a list of duplicate elements using Java's Collections.nCopies_Initialization tips How to generate a list of duplicate elements using Java's Collections.nCopies_Initialization tips Mar 06, 2026 am 06:24 AM

Collections.nCopies returns an immutable view. Calling add/remove will throw UnsupportedOperationException; it needs to be wrapped with newArrayList() to modify it, and it is disabled for mutable objects.

How to correctly implement runtime file writing in Java applications (avoiding JAR internal write failures) How to correctly implement runtime file writing in Java applications (avoiding JAR internal write failures) Mar 09, 2026 pm 07:57 PM

After a Java application is packaged as a JAR, data cannot be written directly to the resources in the JAR package (such as test.txt) because the JAR is essentially a read-only ZIP archive; the correct approach is to write variable data to an external path (such as a user directory, a temporary directory, or a configuration-specified path).

What is exception masking (Suppressed Exceptions) in Java_Multiple resource shutdown exception handling What is exception masking (Suppressed Exceptions) in Java_Multiple resource shutdown exception handling Mar 10, 2026 pm 06:57 PM

What is SuppressedException: It is not "swallowed", but actively archived by the JVM. SuppressedException is not an exception loss, but the JVM quietly attaches the secondary exception to the main exception under the premise that "only one exception must be thrown" for you to verify afterwards. It is automatically triggered by the JVM in only two scenarios: one is that the resource closure in try-with-resources fails, and the other is that you manually call addSuppressed() in finally. The key difference is: the former is fully automatic and safe; the latter requires you to keep it to yourself, and it can be written as shadowing if you are not careful. try-

How to use Homebrew to install Java on Mac_A must-have Java tool chain for developers How to use Homebrew to install Java on Mac_A must-have Java tool chain for developers Mar 09, 2026 pm 09:48 PM

Homebrew installs the latest stable version of openjdk (such as JDK22) by default, not the LTS version; you need to explicitly execute brewinstallopenjdk@17 or brewinstallopenjdk@21 to install the LTS version, and manually configure PATH and JAVA_HOME to be correctly recognized by the system and IDE.

Related articles