
Change streams in MongoDB allow your application to react to real-time data changes instantly. In this blog post, I’ll show you how to set up and use change streams with Python, without diving too deep into theory. We'll create a simple program that listens to database events, focusing on inserts first, then extending it to other event types.
Change streams let your app listen to specific database events, like inserts or updates, and respond immediately. Imagine a scenario where a user updates their profile; with change streams, you can instantly reflect this change across your app without needing the user to refresh the page. Before this feature, you had to constantly poll the database or use complex methods like tailing the MongoDB Oplog. Change streams simplify this by providing a more user-friendly API.
Let's say I have an API to upload invoices. The flow is that customers will upload an image of the invoice to MongoDB, then we extract the information with AI and update the invoice. Here's an example of the code for uploading an invoice:
from pymongo import MongoClient
class MongoDatabase:
def __init__(self, config_path: str):
# Load the YAML configuration file using the provided utility function
self.config_path = config_path
self.config = read_config(path=self.config_path)
# Initialize MongoDB connection
self.client = MongoClient(self.config['mongodb']['uri'])
self.db = self.client[self.config['mongodb']['database']]
self.collection = self.db[self.config['mongodb']['collection']]
def create_document(self, data: Dict[str, Any]) -> str:
# Insert a new document and return the automatically generated document ID
result = self.collection.insert_one(data)
return str(result.inserted_id)
def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
try:
self.collection.update_one({"_id": document_id}, {"$set": data})
except PyMongoError as e:
print(f"Error updating document: {e}")
First I will wrap the pymongo inside a class, just in case :))
@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
try:
# Parse JSON body
body = await request.json()
img = body.get("img")
user_uuid = body.get("user_uuid")
if not img or not is_base64(img):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"status": "error", "message": "Base64 image is required"},
)
# Generate invoice UUID
current_time = datetime.now(timezone.utc)
img = valid_base64_image(img)
invoice_document = {
"invoice_type": None,
"created_at": current_time,
"created_by": user_uuid,
"last_modified_at": None,
"last_modified_by": None,
"status": "not extracted",
"invoice_image_base64": img,
"invoice_info": {}
}
invoice_uuid = mongo_db.create_document(invoice_document)
print('Result saved to MongoDB:', invoice_uuid)
mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})
return JSONResponse(
status_code=status.HTTP_201_CREATED,
content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
)
except Exception as e:
# Handle errors
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"status": "error", "message": str(e)}
)
A reasonable question might be: why not wait until the AI model processes the image before updating? The problem is that it takes around 4-5 minutes to process, and we don't want to affect the user experience.
Another option could be using Kafka. We could publish the image to a Kafka topic, and another service would process the data.
Pros:
Cons:
Here’s a basic implementation to demonstrate using Kafka to handle the invoice upload process.
The user uploads an invoice through an API endpoint. The invoice image is saved in MongoDB, and a message is sent to a Kafka topic for further processing.
from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone
app = FastAPI()
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
try:
body = await request.json()
img = body.get("img")
user_uuid = body.get("user_uuid")
if not img or not is_base64(img):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"status": "error", "message": "Base64 image is required"},
)
current_time = datetime.now(timezone.utc)
img = valid_base64_image(img)
invoice_document = {
"invoice_type": None,
"created_at": current_time,
"created_by": user_uuid,
"status": "not extracted",
"invoice_image_base64": img,
}
# Save the document to MongoDB
invoice_uuid = mongo_db.create_document(invoice_document)
mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})
# Send a message to Kafka topic
producer.send('invoice_topic', invoice_document)
producer.flush()
return JSONResponse(
status_code=status.HTTP_201_CREATED,
content={"message": "Invoice upload received and will be processed"}
)
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"status": "error", "message": str(e)}
)
The Kafka consumer listens to the invoice_topic. When it receives a message, it processes the invoice (e.g., extracting information from the image) and updates the corresponding document in MongoDB.
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'invoice_topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
invoice_document = message.value
# Process the invoice, extract information, and update the document in MongoDB
invoice_uuid = invoice_document["_id"]
extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
mongo_db.update_document_by_id(invoice_uuid, {
"invoice_info": extracted_data,
"status": "extracted"
})
print(f"Processed and updated invoice: {invoice_uuid}")
Flow Summary:
Wow, I can't believe I managed to write this on my own! It really highlights the effort involved. And that's not even considering the complexities of managing and configuring the three services: MongoDB, Kafka, and the Invoice service.
Here's the complete code rewritten in Markdown to demonstrate MongoDB change streams, including additional methods and functions to handle invoice processing triggered by the change stream.
We'll start by creating a MongoDB wrapper class that handles database operations such as creating documents and listening to change streams.
from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml
class MongoDatabase:
# Same code as before #
def process_invoice(self, invoice_document: Dict[str, Any]):
"""Process the invoice by extracting data and updating the document in MongoDB."""
try:
# Simulate extracting information from the invoice image
extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
invoice_uuid = invoice_document["_id"]
# Update the invoice document with the extracted data
self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
print(f"Processed and updated invoice: {invoice_uuid}")
except Exception as e:
print(f"Error processing invoice: {str(e)}")
def start_change_stream_listener(self):
"""Start listening to the change stream for the collection."""
def listen():
try:
with self.collection.watch() as stream:
for change in stream:
if change['operationType'] == 'insert':
invoice_document = change['fullDocument']
print(f"New invoice detected: {invoice_document['_id']}")
self.process_invoice(invoice_document)
except PyMongoError as e:
print(f"Change stream error: {str(e)}")
# Start the change stream listener in a separate thread
listener_thread = threading.Thread(target=listen, daemon=True)
listener_thread.start()
To make it easy I add process_invoice inside the MongoDatabase class. But you should leave it somewhere else
The upload API should be like the original one.
mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()
@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
try:
# Parse JSON body
body = await request.json()
# same code as before
Flow Summary:
Avec les flux de modifications MongoDB, vous pouvez traiter efficacement les modifications en temps réel dans votre base de données. En étendant cet exemple, vous pouvez gérer divers événements tels que les mises à jour et les suppressions, rendant votre application plus réactive et réactive.
The above is the detailed content of Real-Time Data Processing with MongoDB Change Streams and Python. For more information, please follow other related articles on the PHP Chinese website!