


Multi-process extension to optimize FastAPI high memory cache: event-driven architecture practice
Challenge: The conflict between high memory cache and multi-process extensions
When a FastAPI application contains a huge memory cache (e.g. 8GB) and runs through Gunicorn in multi-process mode to handle more requests, there is a core challenge: each worker of Gunicorn is a separate operating system process, and they do not share memory. This means that if N worker processes are started, each process will load an 8GB cache copy, resulting in a total memory consumption of up to 8GB * N. For example, running 4 worker processes will require 32GB of RAM, which is unacceptable for resource-limited environments and severely limits the application's scalability.
In the original idea, the use of distributed caches (such as Redis) was considered to share data, but this often means that there are a lot of modifications to existing third-party libraries that rely on large memory caches, increasing the complexity and effort of implementation. Therefore, we need a more elegant and less intrusive solution.
Core strategy: decoupling and asynchronous processing
The best practice to solve the above problems is to adopt an event-driven architecture , limiting the core responsibilities of a Web server (FastAPI application) to receive requests and respond quickly, while offloading time-consuming, CPU-intensive, or memory-intensive data processing tasks into independent, asynchronous processing components. In this way, the web server can be kept lightweight and consumes only a small amount of memory, allowing more Gunicorn worker processes to be started to handle concurrent requests without causing memory explosions.
The core idea of this strategy is decoupling : decoupling request reception from actual data processing logic. When a web server receives a request that needs to process big data, it does not immediately perform processing, but publishes relevant information about processing the request (such as task ID, input data, etc.) to a message queue or task queue, and then immediately returns a "received" or "processing" response to the client. These tasks are then consumed from the queue and processed by independent background worker processes or services.
Specific implementation plan
The following are several effective solutions to implement event-driven architecture and uninstall data processing tasks:
1. Task queue (such as Celery)
Celery is a powerful distributed task queue suitable for handling a large number of Python tasks that require asynchronous execution. It allows web applications to send time-consuming tasks to independent Celery Worker processes, thus not blocking the web server.
How it works:
- Producer (FastAPI application) : After receiving the request, encapsulate the task data into a Celery task and send it to a message broker (Broker, such as Redis or RabbitMQ).
- Message broker : Stores pending tasks.
- Celery Worker : An independent process that continuously listens to the message broker, obtains and executes tasks.
Sample code (conceptual):
First, install Celery and its message broker (such as Redis):
pip install celery redis
Define Celery applications and tasks (app/celery_app.py):
from celery import Celery # Configure Celery, use Redis as message broker and result storage celery_app = Celery( 'my_fastapi_tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0' ) # Define a simulated time-consuming task that may require access to "cache" data @celery_app.task def process_huge_data_task(data_id: str): """ Simulate tasks that process large amounts of data. This task will be executed in a separate process by Celery Worker. If you need to access shared data, you can consider passing the data ID to Worker. Workers are then retrieved from a shared, web server-independent storage (such as distributed caches or databases). """ print(f"Celery Worker is processing data: {data_id}") # Assume that this is the logical import time for accessing and processing 8GB of data time.sleep(10) # Simulation time-consuming operation result = f"Data {data_id} processing is completed." print(result) return result
Call a task (app/main.py) in a FastAPI application:
from fastapi import FastAPI, BackgroundTasks from app.celery_app import process_huge_data_task app = FastAPI() @app.get("/process_data/{data_id}") async def trigger_data_processing(data_id: str): # Send time-consuming tasks to Celery Worker asynchronous processing task = process_huge_data_task.delay(data_id) # Return the response immediately, including the task ID return {"message": "Data processing task has been submitted", "task_id": task.id} @app.get("/task_status/{task_id}") async def get_task_status(task_id: str): task = process_huge_data_task.AsyncResult(task_id) if task.ready(): return {"status": "complete", "result": task.result} elif task.pending: return {"status": "Waiting"} elif task.failed(): return {"status": "Failed", "error": str(task.result)} else: return {"status": "In progress"}
deploy:
- Start the Redis server.
- Start the FastAPI application (via Gunicorn): gunicorn app.main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
- Start Celery Worker: celery -A app.celery_app worker --loglevel=info
In this mode, the web server can run multiple worker processes, each occupying only a small amount of memory, while the actual data processing is done by independent Celery Workers, which can be deployed on machines with sufficient memory as needed and can be independently scaled.
2. Message Queue (such as Apache Kafka/RabbitMQ)
Apache Kafka or RabbitMQ is a powerful messaging broker for building high-throughput, low-latency event streaming platforms or reliable messaging systems. They can serve as a more general and flexible decoupling mechanism.
How it works:
- Producer (FastAPI application) : Publish data processing requests as messages to a specific topic (Kafka) or queue (RabbitMQ).
- Message broker : Reliably store and forward messages.
- Consumer (Independent Services) : One or more independent microservices or background processes subscribe to and consume these messages and perform data processing.
Advantages:
- High throughput and scalability : able to handle massive amounts of messages.
- Decoupling is more thorough : producers and consumers know very little about each other and are easy to develop, deploy and scale independently.
- Persistence : Messages can be persisted to ensure that messages are not lost.
Example (conceptual): FastAPI as producer:
from fastapi import FastAPI # Suppose you have a message queue client, for example for Kafka: confluent-kafka-python # from confluent_kafka import Producer app = FastAPI() # producer = Producer({'bootstrap.servers': 'localhost:9092'}) # Kafka Producer @app.post("/submit_analysis") async def submit_analysis(payload: dict): # Publish analysis requests to message queue# producer.produce('data_analysis_topic', value=json.dumps(payload).encode('utf-8')) # producer.flush() print(f"Analysis request has been posted to message queue: {payload}") return {"message": "Analysis request has been submitted to queue"}
Independent consumer services:
# This is a standalone Python service running on another process or server # from confluent_kafka import Consumer, KafkaException # consumer = Consumer({ # 'bootstrap.servers': 'localhost:9092', # 'group.id': 'my_analysis_group', # 'auto.offset.reset': 'earliest' # }) # consumer.subscribe(['data_analysis_topic']) # while True: # msg = consumer.poll(timeout=1.0) # if msg is None: continue # if msg.error(): # if msg.error().code() == KafkaException._PARTITION_EOF: # continue # else: # print(msg.error()) #break # # data_to_process = json.loads(msg.value().decode('utf-8')) # print(f"Consumer is processing data: {data_to_process}") # # Execute CPU-intensive or high-memory data processing logic here # # ... # consumer.close()
This approach requires individual maintenance of message brokers and consumer services, but provides extremely high flexibility and scalability.
3. Cloud service serverless functions (such as AWS Lambda)
For applications deployed in cloud environments, serverless computing services from cloud providers (such as AWS Lambda, Azure Functions, Google Cloud Functions) can be used to uninstall data processing tasks.
How it works:
- FastAPI application (as the backend of API Gateway) : After receiving the request, it triggers a serverless function through the SDK or API call.
- Serverless function : The cloud platform launches a function instance as needed to execute data processing logic. Function instances can be independently extended and are usually charged based on actual computing resource consumption.
Advantages:
- No server management is required : The cloud platform is responsible for the underlying server management and scaling.
- Pay on demand : only pays for the actual running time of the function, which is cost-effective.
- Elastic expansion and contraction : Automatically expand and shrink according to the load.
Example (conceptual): Calling Lambda in a FastAPI application:
from fastapi import FastAPI # import boto3 # AWS SDK for Python app = FastAPI() # lambda_client = boto3.client('lambda', region_name='your-region') @app.post("/process_data_with_lambda") async def process_data_with_lambda(payload: dict): # Call AWS Lambda function to process data asynchronously# response = lambda_client.invoke( # FunctionName='your-data-processing-lambda', # InvocationType='Event', # Asynchronous call# Payload=json.dumps(payload) # ) print(f"Data processing request has been sent to Lambda: {payload}") return {"message": "Data processing task submitted to Lambda"}
Lambda functions (for example written in Python):
# lambda_function.py import json def lambda_handler(event, context): data_to_process = json.loads(event['body']) # Assume that a POST request is received from the API Gateway print(f"Lambda is processing data: {data_to_process}") # Execute CPU-intensive or high-memory data processing logic here#... return { 'statusCode': 200, 'body': json.dumps({'message': 'Data processing completed'}) }
This solution completely hand over the management of computing resources to the cloud platform, simplifying operations and maintenance.
Plan selection and precautions
- Celery : It is most suitable for asynchronous task processing within the Python ecosystem. It is relatively simple to deploy, but it requires management of Broker and Worker.
- Apache Kafka / RabbitMQ : Suitable for building more complex microservice architectures, event-driven systems, or scenarios that require high throughput and persistence. More professional operation and maintenance knowledge is needed.
- Cloud service serverless function : It is most suitable for cloud-native applications, which can greatly reduce the operation and maintenance burden and pay on demand, but there may be cold start delay and supplier locking problems.
Notes:
- Data Sharing Policy : If the uninstalled task still needs to access the 8GB of "cache" data, then the data itself needs to be externalized. Consider storing it in a distributed file system, object storage (such as S3), distributed cache (such as Redis, but it requires reevaluation of the degree of modification to third-party libraries), or databases, rather than in the memory of the web server. The task processor is then loaded on demand from these shared storage when executed.
- Result notification : If the client needs to know the processing results of the task, a mechanism needs to be designed to notify the client, for example:
- Push results in real time through WebSocket.
- The client timed polls the task status query interface provided by FastAPI.
- After the task is completed, the FastAPI is notified through the callback API.
- Error handling and monitoring : All asynchronous tasks require robust error handling mechanisms and complete monitoring to promptly discover and solve problems.
- Data consistency : In environments of decoupling and asynchronous processing, data consistency issues need to be carefully considered, especially when writing operations are involved.
Summarize
Faced with the conflict between huge memory caches and multi-process extensions in FastAPI applications, directly increasing the Gunicorn worker process will lead to unacceptable memory consumption. The best solution is to adopt an event-driven architecture that decouples and data-intensive tasks from the web server and processes them asynchronously. Whether it is through Celery task queue, Kafka/RabbitMQ message queue, or cloud service serverless function, its core idea is to keep the web server lightweight, focus on responding to requests quickly, and hand over the heavy work to independent and scalable backend services. This not only effectively optimizes memory usage, but also significantly improves the overall concurrency processing capability and scalability of the application. Selecting the solution that is most suitable for your technology stack and deployment environment, and paying attention to key links such as data sharing, result notification, error handling and monitoring will help you build an efficient and robust FastAPI application.
The above is the detailed content of Multi-process extension to optimize FastAPI high memory cache: event-driven architecture practice. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undress AI Tool
Undress images for free

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

ArtGPT
AI image generator for creative art from text prompts.

Stock Market GPT
AI powered investment research for smarter decisions

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Run pipinstall-rrequirements.txt to install the dependency package. It is recommended to create and activate the virtual environment first to avoid conflicts, ensure that the file path is correct and that the pip has been updated, and use options such as --no-deps or --user to adjust the installation behavior if necessary.

This tutorial details how to efficiently merge the PEFT LoRA adapter with the base model to generate a completely independent model. The article points out that it is wrong to directly use transformers.AutoModel to load the adapter and manually merge the weights, and provides the correct process to use the merge_and_unload method in the peft library. In addition, the tutorial also emphasizes the importance of dealing with word segmenters and discusses PEFT version compatibility issues and solutions.

Python is a simple and powerful testing tool in Python. After installation, test files are automatically discovered according to naming rules. Write a function starting with test_ for assertion testing, use @pytest.fixture to create reusable test data, verify exceptions through pytest.raises, supports running specified tests and multiple command line options, and improves testing efficiency.

Theargparsemoduleistherecommendedwaytohandlecommand-lineargumentsinPython,providingrobustparsing,typevalidation,helpmessages,anderrorhandling;usesys.argvforsimplecasesrequiringminimalsetup.

This article aims to explore the common problem of insufficient calculation accuracy of floating point numbers in Python and NumPy, and explains that its root cause lies in the representation limitation of standard 64-bit floating point numbers. For computing scenarios that require higher accuracy, the article will introduce and compare the usage methods, features and applicable scenarios of high-precision mathematical libraries such as mpmath, SymPy and gmpy to help readers choose the right tools to solve complex accuracy needs.

PyPDF2, pdfplumber and FPDF are the core libraries for Python to process PDF. Use PyPDF2 to perform text extraction, merging, splitting and encryption, such as reading the page through PdfReader and calling extract_text() to get content; pdfplumber is more suitable for retaining layout text extraction and table recognition, and supports extract_tables() to accurately capture table data; FPDF (recommended fpdf2) is used to generate PDF, and documents are built and output through add_page(), set_font() and cell(). When merging PDFs, PdfWriter's append() method can integrate multiple files

Import@contextmanagerfromcontextlibanddefineageneratorfunctionthatyieldsexactlyonce,wherecodebeforeyieldactsasenterandcodeafteryield(preferablyinfinally)actsas__exit__.2.Usethefunctioninawithstatement,wheretheyieldedvalueisaccessibleviaas,andthesetup

Getting the current time can be implemented in Python through the datetime module. 1. Use datetime.now() to obtain the local current time, 2. Use strftime("%Y-%m-%d%H:%M:%S") to format the output year, month, day, hour, minute and second, 3. Use datetime.now().time() to obtain only the time part, 4. It is recommended to use datetime.now(timezone.utc) to obtain UTC time, avoid using deprecated utcnow(), and daily operations can meet the needs by combining datetime.now() with formatted strings.
