Echtzeit-Datenverarbeitung mit MongoDB Change Streams und Python

PHPz
Freigeben: 2024-09-12 16:15:12
Original
494 Leute haben es durchsucht

Real-Time Data Processing with MongoDB Change Streams and Python

Einführung

Änderungsströme in MongoDB ermöglichen es Ihrer Anwendung, sofort auf Datenänderungen in Echtzeit zu reagieren. In diesem Blogbeitrag zeige ich Ihnen, wie Sie Änderungsströme mit Python einrichten und nutzen, ohne zu tief in die Theorie einzutauchen. Wir erstellen ein einfaches Programm, das auf Datenbankereignisse lauscht, wobei wir uns zunächst auf Einfügungen konzentrieren und es dann auf andere Ereignistypen erweitern.

Erste Schritte mit Change Streams

Änderungsströme ermöglichen es Ihrer App, bestimmte Datenbankereignisse wie Einfügungen oder Aktualisierungen abzuhören und sofort zu reagieren. Stellen Sie sich ein Szenario vor, in dem ein Benutzer sein Profil aktualisiert; Mit Änderungsströmen können Sie diese Änderung sofort in Ihrer App widerspiegeln, ohne dass der Benutzer die Seite aktualisieren muss. Vor dieser Funktion mussten Sie ständig die Datenbank abfragen oder komplexe Methoden wie das Tailing des MongoDB-Oplogs verwenden. Änderungsströme vereinfachen dies, indem sie eine benutzerfreundlichere API bereitstellen.

Was passiert ohne Change Streams?

Angenommen, ich habe eine API zum Hochladen von Rechnungen. Der Ablauf besteht darin, dass Kunden ein Bild der Rechnung in MongoDB hochladen, dann extrahieren wir die Informationen mit KI und aktualisieren die Rechnung. Hier ist ein Beispiel für den Code zum Hochladen einer Rechnung:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")
Nach dem Login kopieren

Zuerst werde ich den Pymongo für alle Fälle in eine Klasse einpacken :))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
Nach dem Login kopieren

Eine berechtigte Frage könnte sein: Warum nicht mit der Aktualisierung warten, bis das KI-Modell das Bild verarbeitet? Das Problem besteht darin, dass die Verarbeitung etwa 4 bis 5 Minuten dauert und wir die Benutzererfahrung nicht beeinträchtigen möchten.

Wie wäre es mit Kafka?

Eine andere Option könnte die Verwendung von Kafka sein. Wir könnten das Bild in einem Kafka-Thema veröffentlichen und ein anderer Dienst würde die Daten verarbeiten.

Vorteile:

  • Entkoppelt die Upload- und Verarbeitungsdienste.
  • Effizient für die Datenverarbeitung in großem Maßstab in Echtzeit.
  • Verbesserte Benutzererfahrung: Benutzer erhalten nach dem Hochladen der Rechnung sofort eine Antwort. Die Verarbeitung erfolgt asynchron.

Nachteile:

  • Führt zusätzliche Komplexität ein.
  • Erfordert die Einrichtung und Wartung der Kafka-Infrastruktur.
  • Kann für kleine Anwendungen übertrieben sein.

Hier ist eine grundlegende Implementierung, um die Verwendung von Kafka für den Rechnungs-Upload-Prozess zu demonstrieren.

Der Benutzer lädt eine Rechnung über einen API-Endpunkt hoch. Das Rechnungsbild wird in MongoDB gespeichert und eine Nachricht wird zur weiteren Verarbeitung an ein Kafka-Thema gesendet.

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
Nach dem Login kopieren

Der Kafka-Verbraucher hört sich das „voice_topic“ an. Wenn es eine Nachricht empfängt, verarbeitet es die Rechnung (z. B. extrahiert Informationen aus dem Bild) und aktualisiert das entsprechende Dokument in MongoDB.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")
Nach dem Login kopieren

Ablaufzusammenfassung:

  1. Rechnung hochladen: Der Benutzer lädt eine Rechnung über die API hoch.
  2. In MongoDB speichern: Das Rechnungsdokument wird in MongoDB gespeichert.
  3. Nachricht an Kafka senden: Eine Nachricht mit den Rechnungsdetails wird an ein Kafka-Thema (invoice_topic) gesendet.
  4. Kafka-Verbraucher verarbeitet Rechnung: Ein Kafka-Verbraucher hört sich „voice_topic“ an, verarbeitet die Rechnung und aktualisiert das entsprechende Dokument in MongoDB mit den extrahierten Informationen.

Wow, ich kann nicht glauben, dass ich es geschafft habe, das alleine zu schreiben! Es unterstreicht wirklich den damit verbundenen Aufwand. Dabei ist die Komplexität der Verwaltung und Konfiguration der drei Dienste noch nicht einmal berücksichtigt: MongoDB, Kafka und der Invoice-Dienst.

Rechnungsverarbeitung mit MongoDB Change Streams

Hier ist der vollständige Code, der in Markdown neu geschrieben wurde, um MongoDB-Änderungsströme zu demonstrieren, einschließlich zusätzlicher Methoden und Funktionen zur Abwicklung der durch den Änderungsstrom ausgelösten Rechnungsverarbeitung.

Wir beginnen mit der Erstellung einer MongoDB-Wrapper-Klasse, die Datenbankoperationen wie das Erstellen von Dokumenten und das Abhören von Änderungsströmen abwickelt.

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()
Nach dem Login kopieren

Um es einfacher zu machen, füge ich „process_invoice“ in die MongoDatabase-Klasse ein. Aber du solltest es woanders lassen

Die Upload-API sollte wie das Original sein.

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before
Nach dem Login kopieren

Ablaufzusammenfassung:

  1. User Uploads Invoice: The user uploads an invoice through the API.
  2. Save to MongoDB: The invoice document is saved in MongoDB.
  3. MongoDB Change Stream Triggered: The MongoDB change stream detects the insertion of the new document.
  4. Invoice Processing: The change stream triggers the process_invoice function, which processes the invoice and updates the document in MongoDB with the extracted information.

Conclusion

With MongoDB change streams, you can efficiently process real-time changes in your database. Extending this example, you can handle various events such as updates and deletes, making your application more reactive and responsive.

Reference:

  • https://www.mongodb.com/developer/languages/python/python-change-streams/#listen-to-inserts-from-an-application

Das obige ist der detaillierte Inhalt vonEchtzeit-Datenverarbeitung mit MongoDB Change Streams und Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage
Über uns Haftungsausschluss Sitemap
Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!