Traitement des données en temps réel avec MongoDB Change Streams et Python

PHPz
Libérer: 2024-09-12 16:15:12
original
728 Les gens l'ont consulté

Real-Time Data Processing with MongoDB Change Streams and Python

Introduction

Les flux de modifications dans MongoDB permettent à votre application de réagir instantanément aux modifications de données en temps réel. Dans cet article de blog, je vais vous montrer comment configurer et utiliser des flux de modifications avec Python, sans trop plonger dans la théorie. Nous allons créer un programme simple qui écoute les événements de la base de données, en nous concentrant d'abord sur les insertions, puis en l'étendant à d'autres types d'événements.

Premiers pas avec les flux de modifications

Les flux de modifications permettent à votre application d'écouter des événements de base de données spécifiques, tels que des insertions ou des mises à jour, et de répondre immédiatement. Imaginez un scénario dans lequel un utilisateur met à jour son profil ; avec les flux de modifications, vous pouvez refléter instantanément ce changement dans votre application sans que l'utilisateur ait besoin d'actualiser la page. Avant cette fonctionnalité, vous deviez constamment interroger la base de données ou utiliser des méthodes complexes comme suivre MongoDB Oplog. Les flux de modifications simplifient cela en fournissant une API plus conviviale.

Que se passe-t-il sans flux de changement

Disons que j'ai une API pour télécharger des factures. Le flux est que les clients téléchargent une image de la facture sur MongoDB, puis nous extrayons les informations avec l'IA et mettons à jour la facture. Voici un exemple de code pour télécharger une facture :

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")
Copier après la connexion

Je vais d'abord envelopper le pymongo dans une classe, juste au cas où :))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
Copier après la connexion

Une question raisonnable pourrait être : pourquoi ne pas attendre que le modèle d'IA traite l'image avant de la mettre à jour ? Le problème est que le traitement prend environ 4 à 5 minutes et nous ne voulons pas affecter l'expérience utilisateur.

Et Kafka ?

Une autre option pourrait être d'utiliser Kafka. Nous pourrions publier l'image sur un sujet Kafka et un autre service traiterait les données.

Avantages :

  • Dissocie les services de téléchargement et de traitement.
  • Efficace pour le traitement de données à grande échelle en temps réel.
  • Expérience utilisateur améliorée : les utilisateurs reçoivent une réponse immédiate après avoir téléchargé la facture. Le traitement est géré de manière asynchrone.

Inconvénients :

  • Introduit une complexité supplémentaire.
  • Nécessite la configuration et la maintenance de l'infrastructure Kafka.
  • Peut être excessif pour les applications à petite échelle.

Voici une implémentation de base pour démontrer l'utilisation de Kafka pour gérer le processus de téléchargement des factures.

L'utilisateur télécharge une facture via un point de terminaison API. L'image de la facture est enregistrée dans MongoDB et un message est envoyé à un sujet Kafka pour un traitement ultérieur.

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
Copier après la connexion

Le consommateur Kafka écoute le bill_topic. Lorsqu'il reçoit un message, il traite la facture (par exemple en extrayant les informations de l'image) et met à jour le document correspondant dans MongoDB.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")
Copier après la connexion

Résumé du flux :

  1. Télécharger la facture : L'utilisateur télécharge une facture via l'API.
  2. Enregistrer dans MongoDB : Le document de facturation est enregistré dans MongoDB.
  3. Envoyer un message à Kafka : Un message contenant les détails de la facture est envoyé à un sujet Kafka (invoice_topic).
  4. Kafka Consumer Processes Invoice : Un consommateur Kafka écoute bill_topic, traite la facture et met à jour le document correspondant dans MongoDB avec les informations extraites.

Wow, je n'arrive pas à croire que j'ai réussi à écrire ça tout seul ! Cela met vraiment en valeur l’effort impliqué. Et cela sans même tenir compte de la complexité de la gestion et de la configuration des trois services : MongoDB, Kafka et le service Invoice.

Traitement des factures avec MongoDB Change Streams

Voici le code complet réécrit dans Markdown pour démontrer les flux de modifications MongoDB, y compris des méthodes et fonctions supplémentaires pour gérer le traitement des factures déclenché par le flux de modifications.

Nous allons commencer par créer une classe wrapper MongoDB qui gère les opérations de base de données telles que la création de documents et l'écoute des flux de modifications.

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()
Copier après la connexion

Pour faciliter les choses, j'ajoute process_invoice dans la classe MongoDatabase. Mais tu devrais le laisser ailleurs

L'API de téléchargement doit être comme celle d'origine.

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before
Copier après la connexion

Résumé du flux :

  1. L'utilisateur télécharge une facture : L'utilisateur télécharge une facture via l'API.
  2. Enregistrer dans MongoDB : Le document de facturation est enregistré dans MongoDB.
  3. MongoDB Change Stream Déclenché : Le flux de modifications MongoDB détecte l'insertion du nouveau document.
  4. Traitement des factures : Le flux de modifications déclenche la fonction process_invoice, qui traite la facture et met à jour le document dans MongoDB avec les informations extraites.

Conclusion

Avec les flux de modifications MongoDB, vous pouvez traiter efficacement les modifications en temps réel dans votre base de données. En étendant cet exemple, vous pouvez gérer divers événements tels que les mises à jour et les suppressions, rendant votre application plus réactive et réactive.

Référence:

  • https://www.mongodb.com/developer/linguals/python/python-change-streams/#listen-to-inserts-from-an-application

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal