Heim > Backend-Entwicklung > Python-Tutorial > Echtzeit-Datenverarbeitung mit MongoDB Change Streams und Python

Echtzeit-Datenverarbeitung mit MongoDB Change Streams und Python

PHPz
Freigeben: 2024-09-12 16:15:12
Original
757 Leute haben es durchsucht

Real-Time Data Processing with MongoDB Change Streams and Python

pengenalan

Tukar strim dalam MongoDB membolehkan aplikasi anda bertindak balas terhadap perubahan data masa nyata serta-merta. Dalam catatan blog ini, saya akan menunjukkan kepada anda cara menyediakan dan menggunakan strim perubahan dengan Python, tanpa mendalami teori. Kami akan mencipta atur cara mudah yang mendengar acara pangkalan data, memfokuskan pada sisipan dahulu, kemudian memanjangkannya kepada jenis acara lain.

Bermula dengan Tukar Strim

Tukar strim membenarkan apl anda mendengar acara pangkalan data tertentu, seperti sisipan atau kemas kini dan bertindak balas dengan segera. Bayangkan senario di mana pengguna mengemas kini profil mereka; dengan strim perubahan, anda boleh mencerminkan serta-merta perubahan ini merentas apl anda tanpa memerlukan pengguna memuat semula halaman. Sebelum ciri ini, anda perlu sentiasa meninjau pangkalan data atau menggunakan kaedah yang kompleks seperti mengikuti Oplog MongoDB. Tukar strim memudahkan perkara ini dengan menyediakan API yang lebih mesra pengguna.

Apa yang Berlaku Tanpa Perubahan Strim

Katakan saya mempunyai API untuk memuat naik invois. Alirannya ialah pelanggan akan memuat naik imej invois ke MongoDB, kemudian kami mengekstrak maklumat dengan AI dan mengemas kini invois. Berikut ialah contoh kod untuk memuat naik invois:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")
Nach dem Login kopieren

Mula-mula saya akan membungkus pymongo di dalam kelas, untuk berjaga-jaga :))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
Nach dem Login kopieren

Persoalan munasabah mungkin: mengapa tidak menunggu sehingga model AI memproses imej sebelum mengemas kini? Masalahnya ialah proses itu mengambil masa sekitar 4-5 minit dan kami tidak mahu menjejaskan pengalaman pengguna.

Bagaimana dengan Kafka?

Pilihan lain boleh menggunakan Kafka. Kami boleh menerbitkan imej ke topik Kafka, dan perkhidmatan lain akan memproses data.

Kebaikan:

  • Mengasingkan perkhidmatan muat naik dan pemprosesan.
  • Cekap untuk pemprosesan data masa nyata berskala besar.
  • Pengalaman pengguna yang dipertingkatkan: Pengguna mendapat respons segera selepas memuat naik invois. Pemprosesan dikendalikan secara tidak segerak.

Keburukan:

  • Memperkenalkan kerumitan tambahan.
  • Memerlukan persediaan dan penyelenggaraan infrastruktur Kafka.
  • Mungkin berlebihan untuk aplikasi berskala kecil.

Berikut ialah pelaksanaan asas untuk menunjukkan penggunaan Kafka untuk mengendalikan proses muat naik invois.

Pengguna memuat naik invois melalui titik akhir API. Imej invois disimpan dalam MongoDB dan mesej dihantar ke topik Kafka untuk diproses selanjutnya.

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
Nach dem Login kopieren

Pengguna Kafka mendengar topik_invois. Apabila ia menerima mesej, ia memproses invois (cth., mengekstrak maklumat daripada imej) dan mengemas kini dokumen yang sepadan dalam MongoDB.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")
Nach dem Login kopieren

Ringkasan Aliran:

  1. Muat Naik Invois: Pengguna memuat naik invois melalui API.
  2. Simpan ke MongoDB: Dokumen invois disimpan dalam MongoDB.
  3. Hantar Mesej kepada Kafka: Mesej yang mengandungi butiran invois dihantar ke topik Kafka (topik_invois).
  4. Invois Proses Pengguna Kafka: Pengguna Kafka mendengar invois_topic, memproses invois dan mengemas kini dokumen yang sepadan dalam MongoDB dengan maklumat yang diekstrak.

Wah, saya tidak percaya saya berjaya menulis ini sendiri! Ia benar-benar menyerlahkan usaha yang terlibat. Dan itu tidak mengambil kira kerumitan mengurus dan mengkonfigurasi tiga perkhidmatan: MongoDB, Kafka dan perkhidmatan Invois.

Pemprosesan Invois dengan MongoDB Change Streams

Berikut ialah kod lengkap yang ditulis semula dalam Markdown untuk menunjukkan aliran perubahan MongoDB, termasuk kaedah dan fungsi tambahan untuk mengendalikan pemprosesan invois yang dicetuskan oleh aliran perubahan.

Kami akan mulakan dengan mencipta kelas pembalut MongoDB yang mengendalikan operasi pangkalan data seperti mencipta dokumen dan mendengar menukar strim.

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()
Nach dem Login kopieren

Untuk memudahkan saya menambah process_invoice di dalam kelas MongoDatabase. Tetapi anda harus meninggalkannya di tempat lain

API muat naik hendaklah seperti yang asal.

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before
Nach dem Login kopieren

Ringkasan Aliran:

  1. Benutzer lädt Rechnung hoch: Der Benutzer lädt eine Rechnung über die API hoch.
  2. In MongoDB speichern: Das Rechnungsdokument wird in MongoDB gespeichert.
  3. MongoDB-Änderungsstream ausgelöst: Der MongoDB-Änderungsstream erkennt das Einfügen des neuen Dokuments.
  4. Rechnungsverarbeitung: Der Änderungsstrom löst die Funktion „process_invoice“ aus, die die Rechnung verarbeitet und das Dokument in MongoDB mit den extrahierten Informationen aktualisiert.

Abschluss

Mit MongoDB-Änderungsströmen können Sie Echtzeitänderungen in Ihrer Datenbank effizient verarbeiten. Wenn Sie dieses Beispiel erweitern, können Sie verschiedene Ereignisse wie Aktualisierungen und Löschungen verarbeiten, wodurch Ihre Anwendung reaktiver und reaktionsfähiger wird.

Referenz:

  • https://www.mongodb.com/developer/linguals/python/python-change-streams/#listen-to-inserts-from-an-application

Das obige ist der detaillierte Inhalt vonEchtzeit-Datenverarbeitung mit MongoDB Change Streams und Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage