Da die Einführung von Generativer KI (GenAI) in allen Branchen zunimmt, nutzen Unternehmen zunehmend Retrieval-Augmented Generation (RAG)-Techniken, um ihre KI-Modelle mit kontextreichen Echtzeitdaten zu ergänzen Daten. Die Verwaltung des komplexen Informationsflusses in solchen Anwendungen stellt erhebliche Herausforderungen dar, insbesondere beim Umgang mit kontinuierlich generierten Daten in großem Maßstab. KubeMQ, ein robuster Nachrichtenbroker, erweist sich als Lösung zur Rationalisierung des Routings mehrerer RAG-Prozesse und sorgt so für eine effiziente Datenverarbeitung in GenAI-Anwendungen.
Um die Effizienz und Skalierbarkeit von RAG-Workflows weiter zu verbessern, ist die Integration einer Hochleistungsdatenbank wie FalkorDB unerlässlich. FalkorDB bietet eine zuverlässige und skalierbare Speicherlösung für die dynamischen Wissensdatenbanken, auf die RAG-Systeme angewiesen sind, und gewährleistet einen schnellen Datenabruf und eine nahtlose Integration mit Messaging-Systemen wie KubeMQ.
RAG ist ein Paradigma, das generative KI-Modelle durch die Integration eines Abrufmechanismus verbessert, der es Modellen ermöglicht, während der Inferenz auf externe Wissensdatenbanken zuzugreifen. Dieser Ansatz verbessert die Genauigkeit, Relevanz und Aktualität der generierten Antworten erheblich, indem er sie auf den neuesten und relevantesten verfügbaren Informationen basiert.
In typischen GenAI-Workflows, die RAG verwenden, umfasst der Prozess mehrere Schritte:
Abfrageverarbeitung: Interpretation der Benutzereingaben, um Absicht und Kontext zu verstehen
Abruf: Abrufen relevanter Dokumente oder Daten aus einer dynamischen Wissensdatenbank wie FalkorDB, was einen schnellen und effizienten Zugriff auf die aktuellsten und relevantesten Informationen gewährleistet.
Generierung: Erstellen einer Antwort unter Verwendung sowohl der Eingabe als auch der abgerufenen Daten
Antwortübermittlung: Bereitstellung der endgültigen, angereicherten Ausgabe an den Benutzer
Die Skalierung dieser Schritte, insbesondere in Umgebungen, in denen Daten kontinuierlich generiert und aktualisiert werden, erfordert einen effizienten und zuverlässigen Mechanismus für den Datenfluss zwischen den verschiedenen Komponenten der RAG-Pipeline.
In Szenarien wie IoT-Netzwerken, Social-Media-Plattformen oder Echtzeit-Analysesystemen werden ständig neue Daten produziert und KI-Modelle müssen sich schnell anpassen, um diese Informationen zu integrieren. Herkömmliche Request-Response-Architekturen können unter Bedingungen mit hohem Durchsatz zu Engpässen führen, was zu Latenzproblemen und Leistungseinbußen führt.
KubeMQ verwaltet Messaging-Szenarien mit hohem Durchsatz, indem es eine skalierbare und robuste Infrastruktur für eine effiziente Datenweiterleitung zwischen Diensten bereitstellt. Durch die Integration von KubeMQ in die RAG-Pipeline wird jeder neue Datenpunkt in einer Nachrichtenwarteschlange oder einem Nachrichtenstream veröffentlicht, wodurch sichergestellt wird, dass Abrufkomponenten sofortigen Zugriff auf die neuesten Informationen haben, ohne das System zu überlasten. Diese Fähigkeit zur Datenverarbeitung in Echtzeit ist entscheidend für die Aufrechterhaltung der Relevanz und Genauigkeit der GenAI-Ausgaben.
KubeMQ bietet eine Vielzahl von Messaging-Mustern – darunter Warteschlangen, Streams, Publish-Subscribe (Pub/Sub) und Remote Procedure Calls (RPC) – und ist damit ein vielseitiger und leistungsstarker Router innerhalb einer RAG-Pipeline. Seine geringe Latenz und seine Hochleistungseigenschaften sorgen für eine schnelle Nachrichtenübermittlung, was für Echtzeit-GenAI-Anwendungen unerlässlich ist, bei denen Verzögerungen die Benutzererfahrung und die Systemeffizienz erheblich beeinträchtigen können.
Darüber hinaus ermöglicht die Fähigkeit von KubeMQ, komplexe Routing-Logik zu verarbeiten, anspruchsvolle Datenverteilungsstrategien. Dadurch wird sichergestellt, dass verschiedene Komponenten des KI-Systems genau die Daten erhalten, die sie benötigen, und zwar zum richtigen Zeitpunkt, ohne unnötige Duplikate oder Verzögerungen.
Während KubeMQ Nachrichten effizient zwischen Diensten weiterleitet, ergänzt FalkorDB dies durch die Bereitstellung einer skalierbaren und leistungsstarken Diagrammdatenbanklösung zum Speichern und Abrufen der riesigen Datenmengen, die für RAG-Prozesse erforderlich sind. Diese Integration stellt sicher, dass neue Daten, die durch KubeMQ fließen, nahtlos in FalkorDB gespeichert werden und so für Abrufvorgänge ohne Verzögerungen oder Engpässe verfügbar sind.
Da GenAI-Anwendungen sowohl an Benutzerbasis als auch an Datenvolumen wachsen, wird Skalierbarkeit zu einem vorrangigen Anliegen. KubeMQ ist skalierbar und unterstützt die horizontale Skalierung, um eine erhöhte Last nahtlos zu bewältigen. Es stellt sicher, dass die Messaging-Infrastruktur robust und reaktionsfähig bleibt, wenn die Anzahl der RAG-Prozesse steigt oder die Datengenerierung beschleunigt wird.
Darüber hinaus bietet KubeMQ Nachrichtenpersistenz und Fehlertoleranz. Bei Systemausfällen oder Netzwerkunterbrechungen stellt KubeMQ sicher, dass Nachrichten nicht verloren gehen und das System ordnungsgemäß wiederhergestellt werden kann. Diese Zuverlässigkeit ist entscheidend für die Aufrechterhaltung der Integrität von KI-Anwendungen, auf die Benutzer für zeitnahe und genaue Informationen angewiesen sind.
Die Implementierung benutzerdefinierter Routing-Dienste für die Datenverarbeitung in RAG-Pipelines kann ressourcenintensiv und komplex sein. Der Aufbau, die Wartung und die Skalierung dieser Dienste erfordern oft einen erheblichen Entwicklungsaufwand, wodurch der Fokus von der Kernentwicklung von KI-Anwendungen abgelenkt wird.
Durch die Einführung von KubeMQ entfällt für Unternehmen die Notwendigkeit, maßgeschneiderte Routing-Lösungen zu erstellen. KubeMQ bietet sofort einsatzbereite Funktionen, die die Routing-Anforderungen von RAG-Prozessen erfüllen, einschließlich komplexer Routing-Muster, Nachrichtenfilterung und Prioritätsbehandlung. Dies reduziert nicht nur den Entwicklungs- und Wartungsaufwand, sondern beschleunigt auch die Markteinführung von GenAI-Lösungen.
KubeMQ bietet mehrere Schnittstellen für die Interaktion mit seinen Message-Broker-Funktionen:
REST API: Ermöglicht sprachunabhängige Integration, sodass in jeder Programmiersprache geschriebene Dienste Nachrichten über HTTP senden und empfangen können
SDKs: Stellt Client-Bibliotheken für verschiedene Programmiersprachen (wie Python, Java, Go und .NET) bereit und ermöglicht effizientere Kommunikationsmuster und bessere Leistung durch native Integrationen
Diese Flexibilität ermöglicht es Entwicklern, die für ihren spezifischen Anwendungsfall am besten geeignete Methode auszuwählen, wodurch die Architektur vereinfacht und Entwicklungszyklen beschleunigt werden. Ein einziger Berührungspunkt für die Datenweiterleitung optimiert die Kommunikation zwischen verschiedenen Komponenten der RAG-Pipeline und verbessert so die Gesamtsystemkohärenz.
Das Codebeispiel zeigt, wie man durch die Integration von KubeMQ in eine RAG-Pipeline ein System zum Abrufen von Filminformationen aufbaut. Es richtet einen Server ein, der Film-URLs von Rotten Tomatoes aufnimmt, um mithilfe von GPT-4 einen Wissensgraphen zu erstellen. Benutzer können über einen Chat-Client mit diesem System interagieren, filmbezogene Anfragen senden und von der KI generierte Antworten erhalten. Dieser Anwendungsfall zeigt, wie man in einer praktischen Anwendung mit der kontinuierlichen Datenaufnahme und Echtzeit-Abfrageverarbeitung umgeht und KubeMQ für eine effiziente Nachrichtenverarbeitung und Kommunikation zwischen Diensten im Kontext von Filmen nutzt.
Datenaufnahmedienst: Erfasst und veröffentlicht neue Daten in KubeMQ-Streams, sobald sie verfügbar sind
Abrufdienst: Abonnieren Sie den KubeMQ-Stream, um Updates zu erhalten und die Wissensdatenbank zu aktualisieren
Generierungsdienst: Hört auf Abfrageanfragen, interagiert mit dem KI-Modell und generiert Antworten
Antwortdienst: Sendet die generierten Antworten über geeignete Kanäle an Benutzer zurück
Stellen Sie sicher, dass KubeMQ betriebsbereit ist. Dies kann durch die Bereitstellung mit Docker erreicht werden:
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
Dieser Befehl startet KubeMQ mit den erforderlichen Ports, die für die REST- und gRPC-Kommunikation verfügbar gemacht werden.
Dieser Code (GitHub-Repo) implementiert einen RAG-Server, der Chat-Anfragen verarbeitet und Wissensquellen mithilfe von KubeMQ für die Nachrichtenverarbeitung verwaltet.
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
Der Server führt zwei Hauptthreads aus: einen, der Chat-Anfragen über einen Kanal namens „rag-chat-query“ abonniert und sie mithilfe eines Wissensgraphen mit GPT-4 verarbeitet, und einen anderen, der kontinuierlich Daten aus einer Warteschlange namens „rag“ abruft -sources-queue“, um dem Wissensgraphen neue Quellen hinzuzufügen. Der Wissensgraph wird mit einer benutzerdefinierten Ontologie initialisiert, die aus einer JSON-Datei geladen wird, und verwendet für die Verarbeitung das GPT-4-Modell von OpenAI. Der Server implementiert eine ordnungsgemäße Herunterfahrbehandlung und Fehlerverwaltung und stellt sicher, dass alle Threads ordnungsgemäß beendet werden, wenn der Server gestoppt wird.
# server.py import json import threading from typing import List from dotenv import load_dotenv load_dotenv() import time from kubemq.common import CancellationToken from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription from kubemq.queues import Client as QueuesClient from graphrag_sdk.models.openai import OpenAiGenerativeModel from graphrag_sdk.model_config import KnowledgeGraphModelConfig from graphrag_sdk import KnowledgeGraph, Ontology from graphrag_sdk.source import URL class RAGServer: def __init__(self): self.cq_client = CQClient(address="localhost:50000") self.queues_client = QueuesClient(address="localhost:50000") model = OpenAiGenerativeModel(model_name="gpt-4o") with open("ontology.json", "r") as f: ontology = json.load(f) ontology = Ontology.from_json(ontology) self.kg = KnowledgeGraph( name="movies", model_config=KnowledgeGraphModelConfig.with_model(model), ontology=ontology) self.chat = self.kg.chat_session() self.shutdown_event = threading.Event() self.threads: List[threading.Thread] = [] def handle_chat(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') print(f"Received chat message: {message}") result= self.chat.send_message(message) answer = result.get("response","No answer") print(f"Chat response: {answer}") response = QueryResponseMessage( query_received=request, is_executed=True, body=answer.encode('utf-8') ) self.cq_client.send_response_message(response) except Exception as e: print(f"Error processing chat message: {str(e)}") self.cq_client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def pull_from_queue(self): while not self.shutdown_event.is_set(): try: result = self.queues_client.pull("rag-sources-queue", 10, 1) if result.is_error: print(f"Error pulling message from queue: {result.error}") continue sources = [] for message in result.messages: source = message.body.decode('utf-8') print(f"Received source: {source}, adding to knowledge graph") sources.append(URL(message.body.decode('utf-8'))) if sources: self.kg.process_sources(sources) except Exception as e: if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error processing sources: {str(e)}") def subscribe_to_chat_queries(self): def on_error(err: str): if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error: {err}") cancellation_token = CancellationToken() try: self.cq_client.subscribe_to_queries( subscription=QueriesSubscription( channel="rag-chat-query", on_receive_query_callback=self.handle_chat, on_error_callback=on_error, ), cancel=cancellation_token ) # Wait for shutdown signal while not self.shutdown_event.is_set(): time.sleep(0.1) # Cancel subscription when shutdown is requested cancellation_token.cancel() except Exception as e: if not self.shutdown_event.is_set(): print(f"Error in subscription thread: {str(e)}") def run(self): chat_thread = threading.Thread(target=self.subscribe_to_chat_queries) queue_thread = threading.Thread(target=self.pull_from_queue) self.threads.extend([chat_thread, queue_thread]) for thread in self.threads: thread.daemon = True # Make threads daemon so they exit when main thread exits thread.start() print("RAG server started") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down gracefully...") self.shutdown() self.cq_client.close() self.queues_client.close() def shutdown(self): print("Initiating shutdown sequence...") self.shutdown_event.set() # Signal all threads to stop for thread in self.threads: thread.join(timeout=5.0) # Wait up to 5 seconds for each thread if thread.is_alive(): print(f"Warning: Thread {thread.name} did not shutdown cleanly") print("Shutdown complete") if __name__ == "__main__": rag_server = RAGServer() rag_server.run()
Dieser Code implementiert einen einfachen Client, der Film-URLs über das Warteschlangensystem von KubeMQ an den RAG-Server sendet. Konkret wird eine SourceClient-Klasse erstellt, die eine Verbindung zu KubeMQ herstellt und Nachrichten an den Kanal „rag-sources-queue“ sendet, bei dem es sich um dieselbe Warteschlange handelt, die der RAG-Server überwacht. Wenn es als Hauptprogramm ausgeführt wird, sendet es eine Liste von Rotten Tomatoes-Film-URLs (einschließlich Matrix-Filmen, John Wick und Speed), die vom RAG-Server verarbeitet und dem Wissensgraphen hinzugefügt werden.
# sources_client.py from kubemq.queues import * class SourceClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_source(self, message: str) : send_result = self.client.send_queues_message( QueueMessage( channel="rag-sources-queue", body=message.encode("utf-8"), ) ) if send_result.is_error: print(f"message send error, error:{send_result.error}") if __name__ == "__main__": client = SourceClient() urls = ["https://www.rottentomatoes.com/m/side_by_side_2012", "https://www.rottentomatoes.com/m/matrix", "https://www.rottentomatoes.com/m/matrix_revolutions", "https://www.rottentomatoes.com/m/matrix_reloaded", "https://www.rottentomatoes.com/m/speed_1994", "https://www.rottentomatoes.com/m/john_wick_chapter_4"] for url in urls: client.send_source(url) print("done")
Dieser Code implementiert einen Chat-Client, der über das Abfragesystem von KubeMQ mit dem RAG-Server kommuniziert. Die ChatClient-Klasse sendet Nachrichten an den Kanal „rag-chat-query“ und wartet auf Antworten, wobei für jede Abfrage ein Timeout von 30 Sekunden gilt. Wenn es als Hauptprogramm ausgeführt wird, demonstriert es die Funktionalität des Clients, indem es zwei verwandte Fragen über den Regisseur von „The Matrix“ und seine Verbindung zu Keanu Reeves sendet und jede Antwort ausdruckt, sobald er sie erhält.
Alle Codebeispiele finden Sie in meinem Fork des ursprünglichen GitHub-Repositorys.
Die Integration von KubeMQ in RAG-Pipelines für GenAI-Anwendungen bietet einen skalierbaren, zuverlässigen und effizienten Mechanismus für die Verarbeitung kontinuierlicher Datenströme und komplexer Kommunikation zwischen Prozessen. Indem KubeMQ als einheitlicher Router mit vielseitigen Messaging-Mustern fungiert, vereinfacht es die Gesamtarchitektur, reduziert den Bedarf an benutzerdefinierten Routing-Lösungen und beschleunigt Entwicklungszyklen.
Darüber hinaus verbessert die Integration von FalkorDB das Datenmanagement durch die Bereitstellung einer leistungsstarken Wissensdatenbank, die sich nahtlos in KubeMQ integrieren lässt. Diese Kombination gewährleistet eine optimierte Datenabfrage und -speicherung und unterstützt die dynamischen Anforderungen von RAG-Prozessen.
Die Fähigkeit, Szenarien mit hohem Durchsatz zu bewältigen, kombiniert mit Funktionen wie Persistenz und Fehlertoleranz, stellt sicher, dass GenAI-Anwendungen auch unter hoher Last oder bei Systemunterbrechungen reaktionsfähig und zuverlässig bleiben.
Durch die Nutzung von KubeMQ und FalkorDB können sich Unternehmen auf die Verbesserung ihrer KI-Modelle und die Bereitstellung wertvoller Erkenntnisse und Dienste konzentrieren und dabei darauf vertrauen, dass ihre Datenrouting-Infrastruktur robust und in der Lage ist, die Anforderungen moderner KI-Workflows zu erfüllen.
Das obige ist der detaillierte Inhalt vonVerbesserung von GenAI-Anwendungen mit KubeMQ: Effiziente Skalierung der Retrieval-Augmented Generation (RAG). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!