Java
javaTutorial
Mutiny asynchronous stream processing: efficient concurrent processing of elements in Uni
Mutiny asynchronous stream processing: efficient concurrent processing of elements in Uni

- in the Mutiny framework. By converting Uni
- Uni : Represents an asynchronous operation that will eventually emit 0 or 1 elements, or a failure event.
- Multi : Represents an asynchronous operation stream that can emit 0 to N elements, or a failure event, and will eventually emit a completion event.
- Uni.createFrom().item(List.of("a", "b", "c")) : Create an initial Uni containing a list of strings.
- .onItem().transformToMulti(Multi.createFrom()::iterable) : This is the key step to convert Uni
- > to Multi
. It expands the contents of the list emitted by Uni so that each element in the list becomes a new Multi event. - .onItem().transformToUniAndMerge(s -> processItemAsync(s, random)) : This is the core of implementing concurrent asynchronous processing.
- transformToUni: For each element s in Multi, the processItemAsync(s, random) method is called, which returns a Uni
. - andMerge: Mutiny will concurrently subscribe to and execute these Unis created by transformToUni. When any Uni completes, its results are immediately merged into the output Multi stream. This means that the order of the results may not be the same as the original list, but instead depends on which asynchronous operation completes first.
- transformToUni: For each element s in Multi, the processItemAsync(s, random) method is called, which returns a Uni
- .subscribe().with(...) : Subscribe to the final Multi stream.
- The first Lambda expression handles each successful result emitted in the Multi.
- The second Lambda expression handles any errors in the stream.
- The third Lambda expression is called when the stream completes successfully, here we use context::completeNow to notify Vert.x Unit that the test has successfully completed all asynchronous operations.
- The first two steps are the same as solution one : convert Uni
- > to Multi
, and then use onItem().transformToUniAndMerge() to process each element concurrently. - .collect().asList() : This operator collects all emitted elements in the Multi stream into a List, and finally returns a Uni
- >. This Uni will emit a list containing all collected elements when the source Multi completes.
- .await().indefinitely() : This is a blocking operation. It blocks the current thread until the upstream Uni
- > emits its result (i.e. all asynchronous operations complete and the results are collected into the list). indefinitely() means waiting indefinitely.
- Non-blocking first : Use non-blocking reactive mode as much as possible (such as solution 1). The await() operation blocks the current thread and should be used with caution in production environments, especially in I/O-intensive or web applications, as it may cause thread starvation and performance issues. It is more suitable for startup code, testing, or specific scenarios where you need to wait synchronously for all asynchronous tasks to complete.
- Error handling : There should be complete error handling logic in the Uni created inside transformToUniAndMerge and in the final subscribe().with() method. Mutiny provides a wealth of error handling operators, such as onFailure().recoverWith(), onFailure().retry(), etc.
- Concurrency : transformToUniAndMerge will process tasks concurrently, but the actual concurrency may be limited by the underlying thread pool configuration, system resources, and the implementation of specific asynchronous operations. If you need fine control over concurrency, consider using the transformToUniAndMerge(concurrency, ...) variant.
- Order guarantee : transformToUniAndMerge does not guarantee that the order of the results is consistent with the order of the original list. If you need to maintain order, consider using transformToUniAndConcatenate or sorting manually after collection.
- Resource management : Ensure that any external resources (such as database connections, file handles) used in asynchronous operations are properly managed and released.
- to a Multi stream and using the onItem().transformToUniAndMerge() operator, concurrent and asynchronous processing of elements in the list can be achieved. The article provides two main solutions: combining Vert.x Unit for non-blocking testing, and using collect().asList().await().indefinitely() for blocking result collection, and highlights relevant considerations and best practices.
1. Problem background and common misunderstandings
In reactive programming, we often encounter the need to handle asynchronous operations involving multiple elements. For example, given a Uni>, we want to perform a time-consuming asynchronous task on each string in the list and eventually collect or process the results of all tasks.
A common attempt is to use map to convert List
The core of the problem is that Uni> itself represents a single value stream, and its value is a complete list. If you want to operate asynchronously on each element in the list and treat it as an independent reactive event, you need to "unroll" the list into a stream that can be processed one by one. Mutiny provides the Multi type to handle streams of zero to N elements, which is the key to solving such problems.
2. Mutiny asynchronous stream processing core: Uni and Multi
Mutiny is a widely used reactive programming library in frameworks such as Quarkus. It provides two core types:
To achieve asynchronous and concurrent processing of each element in Uni>, we need to first convert Uni
> into a Multi
3. Solution 1: Handle asynchronous streams gracefully in a test environment (combined with Vert.x Unit)
In unit testing or scenarios that require non-blocking waiting for all asynchronous operations to be completed, we can use the Multi feature and the onTermination().invoke() callback to ensure that all tasks are completed. The following example incorporates a Vert.x Unit, which provides an Async mechanism to manage the lifecycle of asynchronous tests.
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ExtendWith(VertxExtension.class)
public class AsyncListProcessingTest {
// Simulate an asynchronous operation and return a Uni
private Uni<string> processItemAsync(String item, Random random) {
final int duration = (random.nextInt(5) 1) * 1000; // Random delay of 1-5 seconds System.out.println("Starting process for: " item ", duration: " duration "ms");
return Uni.createFrom().item(item)
.onItem().delayIt().by(Duration.ofMillis(duration))
.invoke(() -> System.out.println("Finished process for: " item));
}
@Test
public void testAsyncProcessingWithVertxUnit(VertxTestContext context) {
Random random = new Random();
// The Async object of Vert.x Unit, used to notify the test framework when the asynchronous operation is completed context.verify(() -> { // Ensure that Uni.createFrom() is executed in the context of VertxTestContext
.item(List.of("a", "b", "c")) // Initial Uni<list>>
// 1. Convert Uni<list>> to Multi<string>
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 2. Apply an asynchronous transformation to each element in the Multi and merge the results back into the Multi
.onItem().transformToUniAndMerge(s -> processItemAsync(s, random))
// 3. Subscribe to the Multi stream and process each completed element.subscribe()
.with(
s -> System.out.println("Printing result: " s), // Successfully process each element context::failNow, // Any error causes the stream to fail context::completeNow // The stream is completed, notifying VertxTestContext that the test is over);
});
}
}</string></list></list></string>
Code explanation:
4. Solution 2: Blocking wait for all asynchronous results
In some scenarios, such as command line tools or when we need to wait for all asynchronous operations to complete before continuing the main program execution, we can choose to block the current thread until all results are collected.
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
public class BlockingAsyncListProcessing {
private static Uni<string> processItemAsync(String item, Random random) {
final int duration = (random.nextInt(5) 1) * 1000; // Random delay of 1-5 seconds return Uni.createFrom().item(item)
.onItem().delayIt().by(Duration.ofMillis(duration))
.invoke(() -> System.out.println("Letter: " item ", duration in ms: " duration));
}
public static void main(String[] args) {
Random random = new Random();
System.out.println("Starting blocking asynchronous processing...");
List<string> results = Uni.createFrom()
.item(List.of("a", "b", "c")) // Initial Uni<list>>
// 1. Convert Uni<list>> to Multi<string>
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 2. Apply an asynchronous transformation to each element in the Multi and merge the results back into the Multi
.onItem().transformToUniAndMerge(s -> processItemAsync(s, random))
// 3. Optional: handle each completed element.onItem().invoke(s -> System.out.println("Printing collected item: " s))
// 4. Collect all elements in Multi into a list.collect().asList()
// 5. Block the current thread until Uni<list>> completes and returns the result.await().indefinitely();
System.out.println("All items processed. Collected results: " results);
}
}</list></string></list></list></string></string>
Code explanation:
5. Precautions and best practices
6. Summary
Through Mutiny's Multi type and onItem().transformToUniAndMerge() operator, we can effectively convert each element in Uni> into an independent asynchronous task and process it concurrently. Depending on the application scenario, we can choose the non-blocking subscription mode (suitable for reactive systems and testing) or the blocking await() mode (suitable for specific scenarios that require synchronous waiting for results). Understanding and correctly using these Mutiny operators is key to building efficient and robust reactive applications.
The above is the detailed content of Mutiny asynchronous stream processing: efficient concurrent processing of elements in Uni. For more information, please follow other related articles on the PHP Chinese website!
Hot AI Tools
Undress AI Tool
Undress images for free
AI Clothes Remover
Online AI tool for removing clothes from photos.
Undresser.AI Undress
AI-powered app for creating realistic nude photos
ArtGPT
AI image generator for creative art from text prompts.
Stock Market GPT
AI powered investment research for smarter decisions
Hot Article
Popular tool
Notepad++7.3.1
Easy-to-use and free code editor
SublimeText3 Chinese version
Chinese version, very easy to use
Zend Studio 13.0.1
Powerful PHP integrated development environment
Dreamweaver CS6
Visual web development tools
SublimeText3 Mac version
God-level code editing software (SublimeText3)
Hot Topics
20516
7
13629
4
How to configure Spark distributed computing environment in Java_Java big data processing
Mar 09, 2026 pm 08:45 PM
Spark cannot run in local mode, ClassNotFoundException: org.apache.spark.sql.SparkSession. This is the most common first step of getting stuck: even the dependencies are not correct. Only spark-core_2.12 is written in Maven, but spark-sql_2.12 is not added. SparkSession crashes as soon as it is built. The Scala version must strictly match the official Spark compiled version - Spark3.4.x uses Scala2.12 by default. If you use spark-sqljar of 2.13, the class loader cannot directly find the main class. Practical advice: Go to mvnre
The correct way to send emails in batches using JavaMail API in Java
Mar 04, 2026 am 10:33 AM
This article explains in detail how to correctly set multiple recipients (BCC/CC/TO) through javax.mail in Java, solves common misunderstandings - repeatedly calling setRecipients() causes only the first/last address to take effect, and provides a safe and reusable code implementation.
Elementary practice: How to write a simple console blog searcher in Java_String matching
Mar 04, 2026 am 10:39 AM
String.contains() is not suitable for blog search because it only supports strict substring matching and cannot handle case, spaces, punctuation, spelling errors, synonyms and fuzzy queries; preprocessing toLowerCase() indexOf() or escaped wildcard regular matching (such as .*java.*config.*) is a more practical lightweight alternative.
How to safely map user-entered weekday string to integer value and implement date offset operation in Java
Mar 09, 2026 pm 09:43 PM
This article introduces a concise and maintainable way to map the weekday string (such as "Monday") to the corresponding serial number (1-7), and use the modulo operation to realize the forward and backward offset of any number of days (such as Monday plus 4 days to get Friday), avoiding lengthy if chains and hard-coded logic.
How to generate a list of duplicate elements using Java's Collections.nCopies_Initialization tips
Mar 06, 2026 am 06:24 AM
Collections.nCopies returns an immutable view. Calling add/remove will throw UnsupportedOperationException; it needs to be wrapped with newArrayList() to modify it, and it is disabled for mutable objects.
How to correctly implement runtime file writing in Java applications (avoiding JAR internal write failures)
Mar 09, 2026 pm 07:57 PM
After a Java application is packaged as a JAR, data cannot be written directly to the resources in the JAR package (such as test.txt) because the JAR is essentially a read-only ZIP archive; the correct approach is to write variable data to an external path (such as a user directory, a temporary directory, or a configuration-specified path).
What is exception masking (Suppressed Exceptions) in Java_Multiple resource shutdown exception handling
Mar 10, 2026 pm 06:57 PM
What is SuppressedException: It is not "swallowed", but actively archived by the JVM. SuppressedException is not an exception loss, but the JVM quietly attaches the secondary exception to the main exception under the premise that "only one exception must be thrown" for you to verify afterwards. It is automatically triggered by the JVM in only two scenarios: one is that the resource closure in try-with-resources fails, and the other is that you manually call addSuppressed() in finally. The key difference is: the former is fully automatic and safe; the latter requires you to keep it to yourself, and it can be written as shadowing if you are not careful. try-
How to use Homebrew to install Java on Mac_A must-have Java tool chain for developers
Mar 09, 2026 pm 09:48 PM
Homebrew installs the latest stable version of openjdk (such as JDK22) by default, not the LTS version; you need to explicitly execute brewinstallopenjdk@17 or brewinstallopenjdk@21 to install the LTS version, and manually configure PATH and JAVA_HOME to be correctly recognized by the system and IDE.





