随着生成式人工智能 (GenAI) 在各行业的应用激增,组织越来越多地利用检索增强生成 (RAG) 技术通过实时、上下文丰富的内容来支持其人工智能模型数据。管理此类应用程序中复杂的信息流带来了重大挑战,特别是在处理大规模连续生成的数据时。 KubeMQ 是一个强大的消息代理,作为简化多个 RAG 进程的路由的解决方案而出现,确保 GenAI 应用程序中的高效数据处理。
为了进一步提高 RAG 工作流程的效率和可扩展性,集成 FalkorDB 这样的高性能数据库至关重要。 FalkorDB 为 RAG 系统所依赖的动态知识库提供可靠且可扩展的存储解决方案,确保快速数据检索以及与 KubeMQ 等消息传递系统的无缝集成。
RAG 是一种通过集成检索机制来增强生成式 AI 模型的范例,允许模型在推理过程中访问外部知识库。这种方法通过将生成的响应基于可用的最新相关信息,显着提高了生成响应的准确性、相关性和及时性。
在使用 RAG 的典型 GenAI 工作流程中,该过程涉及多个步骤:
查询处理:解释用户的输入以了解意图和上下文
检索:从动态知识库(例如 FalkorDB)中获取相关文档或数据,确保快速高效地访问最新且相关的信息。
生成:使用输入和检索到的数据生成响应
响应交付:向用户提供最终的、丰富的输出
扩展这些步骤,尤其是在数据不断生成和更新的环境中,需要一种高效可靠的机制来在 RAG 管道的各个组件之间传输数据。
在物联网网络、社交媒体平台或实时分析系统等场景中,不断产生新数据,人工智能模型必须迅速适应以合并这些信息。传统的请求-响应架构在高吞吐量条件下可能成为瓶颈,导致延迟问题和性能下降。
KubeMQ 通过提供可扩展且强大的基础设施来管理高吞吐量消息传递场景,以实现服务之间的高效数据路由。通过将 KubeMQ 集成到 RAG 管道中,每个新数据点都会发布到消息队列或流中,确保检索组件可以立即访问最新信息,而不会压垮系统。这种实时数据处理能力对于维持 GenAI 输出的相关性和准确性至关重要。
KubeMQ 提供各种消息传递模式 - 包括队列、流、发布-订阅 (pub/sub) 和远程过程调用 (RPC) - 使其成为 RAG 管道中多功能且功能强大的路由器。其低延迟和高性能特性可确保及时的消息传递,这对于实时 GenAI 应用程序至关重要,因为延迟会严重影响用户体验和系统效率。
此外,KubeMQ 处理复杂路由逻辑的能力允许复杂的数据分发策略。这确保了人工智能系统的不同组件在需要时准确接收所需的数据,而不会出现不必要的重复或延迟。
虽然 KubeMQ 在服务之间有效地路由消息,FalkorDB 通过提供可扩展且高性能的图形数据库解决方案来存储和检索 RAG 流程所需的大量数据来补充这一点。这种集成确保当新数据流经 KubeMQ 时,它会无缝存储在 FalkorDB 中,使其可随时用于检索操作,而不会引入延迟或瓶颈。
随着 GenAI 应用程序的用户群和数据量不断增长,可扩展性成为最重要的问题。 KubeMQ 具有可扩展性,支持水平扩展以无缝适应增加的负载。它确保随着 RAG 进程数量的增加或数据生成的加速,消息传递基础设施保持稳健和响应能力。
此外,KubeMQ 还提供消息持久化和容错能力。当发生系统故障或网络中断时,KubeMQ 可确保消息不会丢失并且系统可以正常恢复。这种可靠性对于维护人工智能应用程序的完整性至关重要,用户依赖这些应用程序来获取及时、准确的信息。
在 RAG 管道中实现用于数据处理的自定义路由服务可能会占用大量资源且复杂。通常需要大量的开发工作来构建、维护和扩展这些服务,从而分散了核心人工智能应用程序开发的注意力。
通过采用 KubeMQ,组织无需创建定制路由解决方案。 KubeMQ 提供开箱即用的功能,可满足 RAG 进程的路由需求,包括复杂的路由模式、消息过滤和优先级处理。这不仅减少了开发和维护开销,还加快了 GenAI 解决方案的上市时间。
KubeMQ 提供了多个与其消息代理功能交互的接口:
REST API:支持与语言无关的集成,允许以任何编程语言编写的服务通过 HTTP 发送和接收消息
SDK:为各种编程语言(例如 Python、Java、Go 和 .NET)提供客户端库,通过本机集成促进更高效的通信模式和更好的性能
这种灵活性允许开发人员为其特定用例选择最合适的方法,从而简化架构并加快开发周期。数据路由的单一接触点简化了 RAG 管道不同组件之间的通信,从而增强了整体系统的一致性。
代码示例展示了如何通过将 KubeMQ 集成到 RAG 管道来构建电影信息检索系统。它设置了一个服务器,从烂番茄中提取电影 URL,以使用 GPT-4 构建知识图谱。用户可以通过聊天客户端与该系统交互,发送与电影相关的查询并接收人工智能生成的响应。此用例演示了如何在实际应用程序中处理连续数据摄取和实时查询处理,利用 KubeMQ 在电影上下文中进行高效的消息处理和服务间通信。
数据摄取服务:捕获新数据并将其发布到可用的 KubeMQ 流
检索服务:订阅KubeMQ流以接收更新并刷新知识库
生成服务:监听查询请求,与AI模型交互,并生成响应
响应服务:将生成的响应通过适当的渠道发送回用户
确保 KubeMQ 可以运行,这可以通过使用 Docker 部署来实现:
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
此命令启动 KubeMQ,并为 REST 和 gRPC 通信公开必要的端口。
此代码(GitHub 存储库)实现了一个 RAG 服务器,该服务器处理聊天查询并使用 KubeMQ 进行消息处理来管理知识源。
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
服务器运行两个主线程:一个通过名为“rag-chat-query”的通道订阅聊天查询,并使用 GPT-4 的知识图来处理它们,另一个从名为“rag”的队列中持续拉取-sources-queue”将新源添加到知识图谱中。知识图谱使用从 JSON 文件加载的自定义本体进行初始化,并使用 OpenAI 的 GPT-4 模型进行处理。服务器实现了优雅的关闭处理和错误管理,确保服务器停止时所有线程都正确终止。
# server.py import json import threading from typing import List from dotenv import load_dotenv load_dotenv() import time from kubemq.common import CancellationToken from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription from kubemq.queues import Client as QueuesClient from graphrag_sdk.models.openai import OpenAiGenerativeModel from graphrag_sdk.model_config import KnowledgeGraphModelConfig from graphrag_sdk import KnowledgeGraph, Ontology from graphrag_sdk.source import URL class RAGServer: def __init__(self): self.cq_client = CQClient(address="localhost:50000") self.queues_client = QueuesClient(address="localhost:50000") model = OpenAiGenerativeModel(model_name="gpt-4o") with open("ontology.json", "r") as f: ontology = json.load(f) ontology = Ontology.from_json(ontology) self.kg = KnowledgeGraph( name="movies", model_config=KnowledgeGraphModelConfig.with_model(model), ontology=ontology) self.chat = self.kg.chat_session() self.shutdown_event = threading.Event() self.threads: List[threading.Thread] = [] def handle_chat(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') print(f"Received chat message: {message}") result= self.chat.send_message(message) answer = result.get("response","No answer") print(f"Chat response: {answer}") response = QueryResponseMessage( query_received=request, is_executed=True, body=answer.encode('utf-8') ) self.cq_client.send_response_message(response) except Exception as e: print(f"Error processing chat message: {str(e)}") self.cq_client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def pull_from_queue(self): while not self.shutdown_event.is_set(): try: result = self.queues_client.pull("rag-sources-queue", 10, 1) if result.is_error: print(f"Error pulling message from queue: {result.error}") continue sources = [] for message in result.messages: source = message.body.decode('utf-8') print(f"Received source: {source}, adding to knowledge graph") sources.append(URL(message.body.decode('utf-8'))) if sources: self.kg.process_sources(sources) except Exception as e: if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error processing sources: {str(e)}") def subscribe_to_chat_queries(self): def on_error(err: str): if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error: {err}") cancellation_token = CancellationToken() try: self.cq_client.subscribe_to_queries( subscription=QueriesSubscription( channel="rag-chat-query", on_receive_query_callback=self.handle_chat, on_error_callback=on_error, ), cancel=cancellation_token ) # Wait for shutdown signal while not self.shutdown_event.is_set(): time.sleep(0.1) # Cancel subscription when shutdown is requested cancellation_token.cancel() except Exception as e: if not self.shutdown_event.is_set(): print(f"Error in subscription thread: {str(e)}") def run(self): chat_thread = threading.Thread(target=self.subscribe_to_chat_queries) queue_thread = threading.Thread(target=self.pull_from_queue) self.threads.extend([chat_thread, queue_thread]) for thread in self.threads: thread.daemon = True # Make threads daemon so they exit when main thread exits thread.start() print("RAG server started") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down gracefully...") self.shutdown() self.cq_client.close() self.queues_client.close() def shutdown(self): print("Initiating shutdown sequence...") self.shutdown_event.set() # Signal all threads to stop for thread in self.threads: thread.join(timeout=5.0) # Wait up to 5 seconds for each thread if thread.is_alive(): print(f"Warning: Thread {thread.name} did not shutdown cleanly") print("Shutdown complete") if __name__ == "__main__": rag_server = RAGServer() rag_server.run()
此代码实现了一个简单的客户端,通过 KubeMQ 的队列系统将电影 URL 发送到 RAG 服务器。具体来说,它创建一个连接到 KubeMQ 的 SourceClient 类,并将消息发送到“rag-sources-queue”通道,该通道与 RAG 服务器监控的队列相同。当作为主程序运行时,它会发送一个烂番茄电影 URL 列表(包括《黑客帝国》电影、《疾速追杀》和《生死时速》),由 RAG 服务器处理并添加到知识图谱中。
# sources_client.py from kubemq.queues import * class SourceClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_source(self, message: str) : send_result = self.client.send_queues_message( QueueMessage( channel="rag-sources-queue", body=message.encode("utf-8"), ) ) if send_result.is_error: print(f"message send error, error:{send_result.error}") if __name__ == "__main__": client = SourceClient() urls = ["https://www.rottentomatoes.com/m/side_by_side_2012", "https://www.rottentomatoes.com/m/matrix", "https://www.rottentomatoes.com/m/matrix_revolutions", "https://www.rottentomatoes.com/m/matrix_reloaded", "https://www.rottentomatoes.com/m/speed_1994", "https://www.rottentomatoes.com/m/john_wick_chapter_4"] for url in urls: client.send_source(url) print("done")
此代码实现了一个聊天客户端,通过 KubeMQ 的查询系统与 RAG 服务器进行通信。 ChatClient 类将消息发送到“rag-chat-query”通道并等待响应,每个查询有 30 秒的超时时间。当作为主程序运行时,它通过发送两个有关《黑客帝国》导演及其与基努·里维斯的联系的相关问题来演示客户端的功能,并在收到问题时打印每个响应。
所有代码示例都可以在我的原始 GitHub 存储库的分支中找到。
将 KubeMQ 集成到 GenAI 应用程序的 RAG 管道中,为处理连续数据流和复杂的进程间通信提供了可扩展、可靠且高效的机制。通过充当具有多种消息传递模式的统一路由器,KubeMQ 简化了整体架构,减少了对自定义路由解决方案的需求,并加快了开发周期。
此外,合并 FalkorDB 通过提供与 KubeMQ 无缝集成的高性能知识库来增强数据管理。这种组合可确保优化数据检索和存储,支持 RAG 流程的动态要求。
处理高吞吐量场景的能力,与持久性和容错等功能相结合,确保 GenAI 应用程序即使在重负载或面临系统中断的情况下也能保持响应能力和可靠性。
通过利用 KubeMQ 和 FalkorDB,组织可以专注于增强其 AI 模型并提供有价值的见解和服务,并确信其数据路由基础设施强大且能够满足现代 AI 工作流程的需求。
以上是使用 KubeMQ 增强 GenAI 应用程序:有效扩展检索增强生成 (RAG)的详细内容。更多信息请关注PHP中文网其他相关文章!