使用 MongoDB Change Streams 和 Python 进行实时数据处理

PHPz
发布: 2024-09-12 16:15:12
原创
728 人浏览过

Real-Time Data Processing with MongoDB Change Streams and Python

介绍

MongoDB 中的更改流允许您的应用程序立即对实时数据更改做出反应。在这篇博文中,我将向您展示如何使用 Python 设置和使用变更流,而无需深入研究理论。我们将创建一个简单的程序来监听数据库事件,首先关注插入,然后将其扩展到其他事件类型。

变革流入门

更改流让您的应用程序能够侦听特定的数据库事件,例如插入或更新,并立即响应。想象一下用户更新其个人资料的场景;通过更改流,您可以立即在应用程序中反映此更改,而无需用户刷新页面。在此功能之前,您必须不断轮询数据库或使用复杂的方法,例如跟踪 MongoDB Oplog。变更流通过提供更加用户友好的 API 来简化这一过程。

如果没有变更流会发生什么

假设我有一个上传发票的 API。流程是客户将发票图像上传到 MongoDB,然后我们使用 AI 提取信息并更新发票。以下是上传发票的代码示例:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")
登录后复制

首先,我将把 pymongo 包装在一个类中,以防万一:))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
登录后复制

一个合理的问题可能是:为什么不等到 AI 模型处理完图像后再更新?问题是处理时间大约需要 4-5 分钟,我们不想影响用户体验。

卡夫卡怎么样?

另一种选择是使用 Kafka。我们可以将图像发布到 Kafka 主题,然后另一个服务将处理数据。

优点:

  • 解耦上传和处理服务。
  • 高效的大规模实时数据处理。
  • 改进的用户体验:用户上传发票后立即得到回复。处理是异步处理的。

缺点:

  • 引入了额外的复杂性。
  • 需要设置和维护 Kafka 基础设施。
  • 对于小型应用程序来说可能有点过分了。

这是一个基本实现,演示如何使用 Kafka 处理发票上传过程。

用户通过 API 端点上传发票。发票图像保存在 MongoDB 中,并向 Kafka 主题发送一条消息以进行进一步处理。

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
登录后复制

Kafka消费者监听invoice_topic。当它收到消息时,它会处理发票(例如,从图像中提取信息)并更新 MongoDB 中的相应文档。

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")
登录后复制

流程摘要:

  1. 上传发票:用户通过API上传发票。
  2. 保存到 MongoDB:发票文档保存在 MongoDB 中。
  3. 发送消息到 Kafka: 包含发票详细信息的消息被发送到 Kafka 主题 (invoice_topic)。
  4. Kafka 消费者处理发票: Kafka 消费者监听发票主题,处理发票,并使用提取的信息更新 MongoDB 中的相应文档。

哇,我不敢相信我自己写了这个!它确实凸显了所涉及的努力。这甚至没有考虑管理和配置三个服务的复杂性:MongoDB、Kafka 和 Invoice 服务。

使用 MongoDB Change Streams 进行发票处理

这是用 Markdown 重写的完整代码,用于演示 MongoDB 变更流,包括用于处理由变更流触发的发票处理的其他方法和函数。

我们将首先创建一个 MongoDB 包装类来处理数据库操作,例如创建文档和监听更改流。

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()
登录后复制

为了方便起见,我在 MongoDatabase 类中添加了 process_invoice。但你应该把它留在其他地方

上传API应该和原来的一样。

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before
登录后复制

流程摘要:

  1. 用户上传发票:用户通过API上传发票。
  2. 保存到 MongoDB:发票文档保存在 MongoDB 中。
  3. 触发 MongoDB 更改流: MongoDB 更改流检测到新文档的插入。
  4. 发票处理: 变更流触发 process_invoice 函数,该函数处理发票并使用提取的信息更新 MongoDB 中的文档。

结论

借助 MongoDB 更改流,您可以高效地处理数据库中的实时更改。扩展此示例,您可以处理各种事件,例如更新和删除,使您的应用程序更具反应性和响应能力。

参考:

  • https://www.mongodb.com/developer/languages/python/python-change-streams/#listen-to-inserts-from-an-application

以上是使用 MongoDB Change Streams 和 Python 进行实时数据处理的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:dev.to
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板