Änderungsströme in MongoDB ermöglichen es Ihrer Anwendung, sofort auf Datenänderungen in Echtzeit zu reagieren. In diesem Blogbeitrag zeige ich Ihnen, wie Sie Änderungsströme mit Python einrichten und nutzen, ohne zu tief in die Theorie einzutauchen. Wir erstellen ein einfaches Programm, das auf Datenbankereignisse lauscht, wobei wir uns zunächst auf Einfügungen konzentrieren und es dann auf andere Ereignistypen erweitern.
Änderungsströme ermöglichen es Ihrer App, bestimmte Datenbankereignisse wie Einfügungen oder Aktualisierungen abzuhören und sofort zu reagieren. Stellen Sie sich ein Szenario vor, in dem ein Benutzer sein Profil aktualisiert; Mit Änderungsströmen können Sie diese Änderung sofort in Ihrer App widerspiegeln, ohne dass der Benutzer die Seite aktualisieren muss. Vor dieser Funktion mussten Sie ständig die Datenbank abfragen oder komplexe Methoden wie das Tailing des MongoDB-Oplogs verwenden. Änderungsströme vereinfachen dies, indem sie eine benutzerfreundlichere API bereitstellen.
Angenommen, ich habe eine API zum Hochladen von Rechnungen. Der Ablauf besteht darin, dass Kunden ein Bild der Rechnung in MongoDB hochladen, dann extrahieren wir die Informationen mit KI und aktualisieren die Rechnung. Hier ist ein Beispiel für den Code zum Hochladen einer Rechnung:
from pymongo import MongoClient class MongoDatabase: def __init__(self, config_path: str): # Load the YAML configuration file using the provided utility function self.config_path = config_path self.config = read_config(path=self.config_path) # Initialize MongoDB connection self.client = MongoClient(self.config['mongodb']['uri']) self.db = self.client[self.config['mongodb']['database']] self.collection = self.db[self.config['mongodb']['collection']] def create_document(self, data: Dict[str, Any]) -> str: # Insert a new document and return the automatically generated document ID result = self.collection.insert_one(data) return str(result.inserted_id) def update_document_by_id(self, document_id: str, data: Dict[str, Any]): try: self.collection.update_one({"_id": document_id}, {"$set": data}) except PyMongoError as e: print(f"Error updating document: {e}")
Zuerst werde ich den Pymongo für alle Fälle in eine Klasse einpacken :))
@app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) # Generate invoice UUID current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "last_modified_at": None, "last_modified_by": None, "status": "not extracted", "invoice_image_base64": img, "invoice_info": {} } invoice_uuid = mongo_db.create_document(invoice_document) print('Result saved to MongoDB:', invoice_uuid) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) return JSONResponse( status_code=status.HTTP_201_CREATED, content={"invoice_uuid": invoice_uuid, "message": "Upload successful"} ) except Exception as e: # Handle errors return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Eine berechtigte Frage könnte sein: Warum nicht mit der Aktualisierung warten, bis das KI-Modell das Bild verarbeitet? Das Problem besteht darin, dass die Verarbeitung etwa 4 bis 5 Minuten dauert und wir die Benutzererfahrung nicht beeinträchtigen möchten.
Eine andere Option könnte die Verwendung von Kafka sein. Wir könnten das Bild in einem Kafka-Thema veröffentlichen und ein anderer Dienst würde die Daten verarbeiten.
Vorteile:
Nachteile:
Hier ist eine grundlegende Implementierung, um die Verwendung von Kafka für den Rechnungs-Upload-Prozess zu demonstrieren.
Der Benutzer lädt eine Rechnung über einen API-Endpunkt hoch. Das Rechnungsbild wird in MongoDB gespeichert und eine Nachricht wird zur weiteren Verarbeitung an ein Kafka-Thema gesendet.
from kafka import KafkaProducer import json from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from datetime import datetime, timezone app = FastAPI() producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "status": "not extracted", "invoice_image_base64": img, } # Save the document to MongoDB invoice_uuid = mongo_db.create_document(invoice_document) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) # Send a message to Kafka topic producer.send('invoice_topic', invoice_document) producer.flush() return JSONResponse( status_code=status.HTTP_201_CREATED, content={"message": "Invoice upload received and will be processed"} ) except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Der Kafka-Verbraucher hört sich das „voice_topic“ an. Wenn es eine Nachricht empfängt, verarbeitet es die Rechnung (z. B. extrahiert Informationen aus dem Bild) und aktualisiert das entsprechende Dokument in MongoDB.
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'invoice_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: invoice_document = message.value # Process the invoice, extract information, and update the document in MongoDB invoice_uuid = invoice_document["_id"] extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) mongo_db.update_document_by_id(invoice_uuid, { "invoice_info": extracted_data, "status": "extracted" }) print(f"Processed and updated invoice: {invoice_uuid}")
Ablaufzusammenfassung:
Wow, ich kann nicht glauben, dass ich es geschafft habe, das alleine zu schreiben! Es unterstreicht wirklich den damit verbundenen Aufwand. Dabei ist die Komplexität der Verwaltung und Konfiguration der drei Dienste noch nicht einmal berücksichtigt: MongoDB, Kafka und der Invoice-Dienst.
Hier ist der vollständige Code, der in Markdown neu geschrieben wurde, um MongoDB-Änderungsströme zu demonstrieren, einschließlich zusätzlicher Methoden und Funktionen zur Abwicklung der durch den Änderungsstrom ausgelösten Rechnungsverarbeitung.
Wir beginnen mit der Erstellung einer MongoDB-Wrapper-Klasse, die Datenbankoperationen wie das Erstellen von Dokumenten und das Abhören von Änderungsströmen abwickelt.
from pymongo import MongoClient from pymongo.errors import PyMongoError from typing import Dict, Any import threading import yaml class MongoDatabase: # Same code as before # def process_invoice(self, invoice_document: Dict[str, Any]): """Process the invoice by extracting data and updating the document in MongoDB.""" try: # Simulate extracting information from the invoice image extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) invoice_uuid = invoice_document["_id"] # Update the invoice document with the extracted data self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"}) print(f"Processed and updated invoice: {invoice_uuid}") except Exception as e: print(f"Error processing invoice: {str(e)}") def start_change_stream_listener(self): """Start listening to the change stream for the collection.""" def listen(): try: with self.collection.watch() as stream: for change in stream: if change['operationType'] == 'insert': invoice_document = change['fullDocument'] print(f"New invoice detected: {invoice_document['_id']}") self.process_invoice(invoice_document) except PyMongoError as e: print(f"Change stream error: {str(e)}") # Start the change stream listener in a separate thread listener_thread = threading.Thread(target=listen, daemon=True) listener_thread.start()
Um es einfacher zu machen, füge ich „process_invoice“ in die MongoDatabase-Klasse ein. Aber du solltest es woanders lassen
Die Upload-API sollte wie das Original sein.
mongo_db = MongoDatabase(config_path='path_to_your_config.yaml') mongo_db.start_change_stream_listener() @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() # same code as before
Ablaufzusammenfassung:
With MongoDB change streams, you can efficiently process real-time changes in your database. Extending this example, you can handle various events such as updates and deletes, making your application more reactive and responsive.
Das obige ist der detaillierte Inhalt vonEchtzeit-Datenverarbeitung mit MongoDB Change Streams und Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!