search
  • Sign In
  • Sign Up
Password reset successful

Follow the proiects vou are interested in andi aet the latestnews about them taster

Table of Contents
1. Understand the challenges of asynchronous messaging
2. Ensure message delivery: application of callback mechanism
3. Improve message durability: producer acks and Broker min.insync.replicas
3.1 Producer configuration: acks
3.2 Broker configuration: min.insync.replicas
4. The ultimate solution: Outbox Pattern
5. Advanced considerations and integration solutions: Kafka Connect
Summarize
Home Java javaTutorial Strategies and practices to ensure that database data is deleted after Kafka messages are sent successfully

Strategies and practices to ensure that database data is deleted after Kafka messages are sent successfully

Dec 31, 2025 am 09:18 AM

Strategies and practices to ensure that database data is deleted after Kafka messages are sent successfully

This article aims to explore how to effectively deal with the data consistency challenges caused by asynchronous message sending in the scenario where database data is sent to Kafka and subsequently deleted. We will deeply analyze the asynchronous features of kafkaTemplate.send and provide solutions based on the callback mechanism, Kafka producer configuration (such as acks) and cluster settings (such as min.insync.replicas). In addition, the article will also introduce the powerful transactional pattern "Outbox Pattern" to ensure the atomicity and reliability of data operations, and briefly mention Kafka Connect as an integration solution.

1. Understand the challenges of asynchronous messaging

When using Spring Boot to synchronize data with Kafka, a common pattern is to read data from the database, send it to Kafka, and then delete the sent data. However, performing these operations directly sequentially, as shown in the following example, presents potential data consistency risks:

 public void syncData() {
    List<t> data = repository.findAll();
    data.forEach(value -&gt; kafkaTemplate.send(topicName, value));
    repository.deleteAll(data);
}</t>

The kafkaTemplate.send method returns a ListenableFuture object, which means that sending messages to Kafka Broker is an asynchronous operation. The data.forEach loop may be completed before all messages are actually received and persisted by the Kafka Broker. The repository.deleteAll(data) operation performed immediately may cause the data to be deleted without being successfully sent to Kafka, resulting in data loss. For example, if the Kafka Broker fails when sending the 7th message, subsequent messages may fail to be sent, but the original data in the database may have been deleted.

Therefore, the core question is:

  • Does Kafka throw an exception when message delivery fails?
  • Do we need to introduce additional logic to ensure that all messages are sent successfully before deleting the database?

The answer is yes, we need a rigorous strategy to ensure data consistency and reliability.

2. Ensure message delivery: application of callback mechanism

Since kafkaTemplate.send is asynchronous, we need to use the callback mechanism provided by the ListenableFuture it returns to monitor the results of message sending. By registering success and failure callbacks, we can perform subsequent database deletion operations after the message is confirmed to be received by Kafka Broker.

The following is an example using ListenableFutureCallback, showing how to handle the success and failure of message sending:

 import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class KafkaDataSynchronizer<t> {

    private final KafkaTemplate<string t> kafkaTemplate;
    private final DataRepository<t> repository; // Assume there is a data warehouse interface public KafkaDataSynchronizer(KafkaTemplate<string t> kafkaTemplate, DataRepository<t> repository) {
        this.kafkaTemplate = kafkaTemplate;
        this.repository = repository;
    }

    public void syncDataReliably() {
        List<t> dataToSync = repository.findAllPendingData(); // Assume that only the data to be synchronized is queried if (dataToSync.isEmpty()) {
            return;
        }

        AtomicInteger successfulSends = new AtomicInteger(0);
        AtomicInteger failedSends = new AtomicInteger(0);
        int totalMessages = dataToSync.size();

        for (T item : dataToSync) {
            ListenableFuture<sendresult t>&gt; future = kafkaTemplate.send("your-topic", item);
            future.addCallback(new ListenableFutureCallback<sendresult t>&gt;() {
                @Override
                public void onSuccess(SendResult<string t> result) {
                    successfulSends.incrementAndGet();
                    System.out.println("Message sent successfully: " result.getProducerRecord().value());
                    // Only consider deleting database data after all messages are successfully sent if (successfulSends.get() failedSends.get() == totalMessages &amp;&amp; failedSends.get() == 0) {
                        // Perform database deletion here to ensure that all messages have been sent successfully // Note: In actual applications, more complex transaction management or Outbox mode may be required here // repository.delete(item); // Delete items one by one or in batches System.out.println("All messages processed. Considering database deletion.");
                        // Trigger batch deletion or mark processed // repository.deleteAll(dataToSync); // This is a simplified example, more detailed control is actually required}
                }

                @Override
                public void onFailure(Throwable ex) {
                    failedSends.incrementAndGet();
                    System.err.println("Failed to send message: " item ", Error: " ex.getMessage());
                    // Recording failure may require a retry mechanism or rollback of the database operation // If any message fails to be sent, the corresponding data in the database should not be deleted // In actual applications, the failed message should be marked as "to be retried" or recorded in the dead letter queue}
            });
        }
        // Note: repository.deleteAll(dataToSync); cannot be called directly outside the loop.
        // Because future.addCallback is asynchronous, the callback may not have finished executing after the loop ends.
        //The actual deletion logic needs to be triggered after all callbacks are completed and there is no failure.
        // This is usually achieved through counters, latches, or higher-level coordination mechanisms.
    }
}

interface DataRepository<t> {
    List<t> findAllPendingData();
    void delete(T item);
    void deleteAll(List<t> items);
}</t></t></t></string></sendresult></sendresult></t></t></string></t></string></t>

Things to note:

  • The deletion logic in the above example is simplified. In practical applications, relying solely on counters to determine whether all messages have been sent and delete them in batches may still not be robust enough. Data consistency may still be compromised if the application crashes before all callbacks complete, or if an exception occurs during deletion.
  • For messages that fail to be sent, there needs to be a complete retry mechanism or send them to the Dead Letter Queue (DLQ) for subsequent processing instead of simply ignoring them.
  • In order to achieve truly atomic operations (either all succeed or both fail), the Outbox Pattern is the more recommended solution.

3. Improve message durability: producer acks and Broker min.insync.replicas

In addition to the callback mechanism, Kafka also provides powerful configuration options to ensure message persistence and availability.

3.1 Producer configuration: acks

acks is a key configuration on the producer side. It determines how many confirmations from the Broker the producer needs to receive after sending a message before it considers the message sent successfully.

  • acks=0: The producer returns immediately after sending the message without waiting for any confirmation from the Broker. Highest performance, but lowest reliability and possible data loss.
  • acks=1: The producer waits for confirmation from the Leader Broker. If the Leader Broker receives the message, the producer considers the sending successful. If the Leader Broker fails, data may be lost.
  • acks=all (or acks=-1): The producer waits for confirmation from all ISRs (In-Sync Replicas). This is the highest level of reliability guarantee, ensuring that messages are written to the Leader and all ISRs.

In scenarios where it is necessary to ensure that data is not lost, it is strongly recommended to set acks to all.

3.2 Broker configuration: min.insync.replicas

min.insync.replicas is a configuration on the Broker side (topic level or cluster level) that defines at least how many ISR replicas must be available in order for the Leader Broker to accept write requests.

acks=all works together with min.insync.replicas to determine the persistence of the message:

  • If acks=all and min.insync.replicas is set to N, then the leader will only send an acknowledgment to the producer when the leader and at least N-1 follower replicas have synchronized the message.
  • If the number of available ISRs is less than min.insync.replicas, the Leader will not accept new write requests and the producer will receive a NotEnoughReplicasException.

Important: acks=all does not mean that all assigned replicas acknowledged the message, but that all currently synchronized replicas acknowledged the message. If your min.insync.replicas is set to 1 (the default value), even if acks=all, it is only guaranteed that at least one Broker (i.e. Leader) sees the write. In this case, if this only ISR (Leader) subsequently fails, messages may still be lost.

To achieve high reliability, it is recommended to configure:

  • The topic's replication.factor is at least 3.
  • min.insync.replicas is at least 2 (or replication.factor - 1) to ensure that data is not lost when a single Broker fails.
  • produceracks=all.

4. The ultimate solution: Outbox Pattern

For scenarios that need to strictly ensure the atomicity of database operations and message sending, Outbox Pattern is a recognized best practice in the industry. It incorporates message sending operations into database transactions, ensuring that either database updates and message logging succeed or both fail.

The core idea of ​​Outbox Pattern:

  1. Transactional operations: When an application needs to update the database and send a message, it first performs the following two steps in the same database transaction:
    • Update business data.
    • Insert the message to be sent into a dedicated "Outbox Table".
  2. Message Relay: A separate service (or process) continuously polls this outbox table. Once a new pending message is detected, it:
    • Read messages from the outbox table.
    • Send to Kafka.
    • After successfully sending, mark the corresponding message in the outbox table as sent, or delete it directly.

Advantages of Outbox Pattern:

  • Atomicity: Database updates and messages are recorded in the same transaction, ensuring "all or nothing" atomicity. Even if the system crashes after the database transaction is committed but before the message is sent to Kafka, the message still remains in the outbox table, waiting to be sent on the next poll.
  • Decoupling: Business logic does not need to directly deal with the complexity of Kafka sending (such as retries, error handling).
  • Reliability: Even if the Kafka Broker is temporarily unavailable, messages will remain safely in the outbox until Kafka comes back up and is successfully sent.

How to implement Outbox Pattern:

  • Polling the outbox table: As described above, query the outbox table periodically through a scheduled task or an independent microservice.
  • Transactional Log Tailing: Use the transaction log of the database (such as PostgreSQL's WAL, MySQL's Binlog) to capture data changes and convert them into Kafka messages. This is usually achieved through CDC (Change Data Capture) tools such as Debezium, which is part of Kafka Connect and can reliably stream database changes to Kafka.

5. Advanced considerations and integration solutions: Kafka Connect

For complex database integration scenarios, especially when database changes need to be continuously synchronized to Kafka, Kafka Connect is a very powerful and reliable tool.

Advantages of Kafka Connect:

  • Out of the box: Provides rich connectors (Connectors), such as JDBC Source Connector (Source Connector), which can be directly configured to poll data from the database and send it to Kafka.
  • Fault tolerance: Built-in fault tolerance mechanism can handle problems such as connection interruption and data conversion failure.
  • Scalability: Can be deployed as a distributed service to handle high-throughput data integration tasks.
  • Offset management: Automatically manage the reading position (offset) to ensure that data is not repeated or lost.

If you need an independent, reliable database polling service and don't want to couple too much integration logic in your business application, consider using Kafka Connect. For example, using a CDC connector such as Debezium, incremental changes to the database can be directly captured and published to Kafka in real time, which is essentially a more automated implementation of the Outbox Pattern.

Summarize

In scenarios where database data is sent to Kafka and subsequently deleted, ensuring data consistency and reliability is crucial. We can gradually improve the robustness of the system through the following strategies:

  1. Use callback mechanism: Use ListenableFutureCallback to monitor the asynchronous results of kafkaTemplate.send and ensure that the message is successfully sent before performing the database deletion operation.
  2. Configure Kafka producer acks and Broker min.insync.replicas: Set acks to all, and configure min.insync.replicas appropriately (for example, replication.factor is 3, min.insync.replicas is 2) to maximize message durability.
  3. Adopt Outbox Pattern: For scenarios that require strict transaction guarantees, the Outbox pattern is the best choice. It incorporates the persistence of messages into database transactions and sends them to Kafka through an independent message relay service to ensure atomicity.
  4. Consider Kafka Connect: For large-scale, complex database integration needs, Kafka Connect (especially in combination with CDC tools such as Debezium) provides a mature, reliable, and scalable solution that can automatically synchronize database changes to Kafka.

Which strategy to choose depends on your business's requirements for data consistency, latency, and system complexity. In most production environments, a highly reliable data synchronization system can be built using a combination of callbacks, strong acks configuration, and the Outbox Pattern.

The above is the detailed content of Strategies and practices to ensure that database data is deleted after Kafka messages are sent successfully. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undress AI Tool

Undress AI Tool

Undress images for free

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

ArtGPT

ArtGPT

AI image generator for creative art from text prompts.

Stock Market GPT

Stock Market GPT

AI powered investment research for smarter decisions

Popular tool

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

How to configure Spark distributed computing environment in Java_Java big data processing How to configure Spark distributed computing environment in Java_Java big data processing Mar 09, 2026 pm 08:45 PM

Spark cannot run in local mode, ClassNotFoundException: org.apache.spark.sql.SparkSession. This is the most common first step of getting stuck: even the dependencies are not correct. Only spark-core_2.12 is written in Maven, but spark-sql_2.12 is not added. SparkSession crashes as soon as it is built. The Scala version must strictly match the official Spark compiled version - Spark3.4.x uses Scala2.12 by default. If you use spark-sqljar of 2.13, the class loader cannot directly find the main class. Practical advice: Go to mvnre

The correct way to send emails in batches using JavaMail API in Java The correct way to send emails in batches using JavaMail API in Java Mar 04, 2026 am 10:33 AM

This article explains in detail how to correctly set multiple recipients (BCC/CC/TO) through javax.mail in Java, solves common misunderstandings - repeatedly calling setRecipients() causes only the first/last address to take effect, and provides a safe and reusable code implementation.

Elementary practice: How to write a simple console blog searcher in Java_String matching Elementary practice: How to write a simple console blog searcher in Java_String matching Mar 04, 2026 am 10:39 AM

String.contains() is not suitable for blog search because it only supports strict substring matching and cannot handle case, spaces, punctuation, spelling errors, synonyms and fuzzy queries; preprocessing toLowerCase() indexOf() or escaped wildcard regular matching (such as .*java.*config.*) is a more practical lightweight alternative.

How to safely map user-entered weekday string to integer value and implement date offset operation in Java How to safely map user-entered weekday string to integer value and implement date offset operation in Java Mar 09, 2026 pm 09:43 PM

This article introduces a concise and maintainable way to map the weekday string (such as "Monday") to the corresponding serial number (1-7), and use the modulo operation to realize the forward and backward offset of any number of days (such as Monday plus 4 days to get Friday), avoiding lengthy if chains and hard-coded logic.

How to generate a list of duplicate elements using Java's Collections.nCopies_Initialization tips How to generate a list of duplicate elements using Java's Collections.nCopies_Initialization tips Mar 06, 2026 am 06:24 AM

Collections.nCopies returns an immutable view. Calling add/remove will throw UnsupportedOperationException; it needs to be wrapped with newArrayList() to modify it, and it is disabled for mutable objects.

How to use Homebrew to install Java on Mac_A must-have Java tool chain for developers How to use Homebrew to install Java on Mac_A must-have Java tool chain for developers Mar 09, 2026 pm 09:48 PM

Homebrew installs the latest stable version of openjdk (such as JDK22) by default, not the LTS version; you need to explicitly execute brewinstallopenjdk@17 or brewinstallopenjdk@21 to install the LTS version, and manually configure PATH and JAVA_HOME to be correctly recognized by the system and IDE.

What is exception masking (Suppressed Exceptions) in Java_Multiple resource shutdown exception handling What is exception masking (Suppressed Exceptions) in Java_Multiple resource shutdown exception handling Mar 10, 2026 pm 06:57 PM

What is SuppressedException: It is not "swallowed", but actively archived by the JVM. SuppressedException is not an exception loss, but the JVM quietly attaches the secondary exception to the main exception under the premise that "only one exception must be thrown" for you to verify afterwards. It is automatically triggered by the JVM in only two scenarios: one is that the resource closure in try-with-resources fails, and the other is that you manually call addSuppressed() in finally. The key difference is: the former is fully automatic and safe; the latter requires you to keep it to yourself, and it can be written as shadowing if you are not careful. try-

How to correctly implement runtime file writing in Java applications (avoiding JAR internal write failures) How to correctly implement runtime file writing in Java applications (avoiding JAR internal write failures) Mar 09, 2026 pm 07:57 PM

After a Java application is packaged as a JAR, data cannot be written directly to the resources in the JAR package (such as test.txt) because the JAR is essentially a read-only ZIP archive; the correct approach is to write variable data to an external path (such as a user directory, a temporary directory, or a configuration-specified path).

Related articles