• 技术文章 >后端开发 >Python教程

    使用Python语言实现消息传递的gRPC教程

    PHPzPHPz2023-04-27 17:13:17转载85

    1. grpc开源包的安装

    # conda
    $ conda create -n grpc_env python=3.9
     
    # install grpc
    $ pip install grpc -i https://pypi.doubanio.com/simple
    $ pip install grpc-tools -i https://pypi.doubanio.com/simple
     
    # 有时proto生成py文件不对就是得换换grpc两个包的版本

    2. grpc的使用之传送消息

    整体结构,client.py server.py 和proto目录下的example.proto

    怎么用Python语言的grpc实现消息传送

    1)在example.proto定义传送体

    // 声明
    syntax = "proto3";
    package proto;
     
    // service创建
    service HelloService{
      rpc Hello(Request) returns (Response) {}  // 单单传送消息
    }
     
    // 请求参数消息体 1、2是指参数顺序
    message Request {
      string data = 1;
    }
     
    // 返回参数消息体
    message Response {
      int32 ret = 1;    //返回码
      string data = 2;
    }
     
    //python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

    2) 在虚拟环境里使用命令生成py文件

    $ conda activate grpc_env
    $ f:
    $ cd F:\examples
    $ python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

    在proto目录下会生成两个py文件,如下图所示:

    怎么用Python语言的grpc实现消息传送

    3) 编辑client.py 和 server.py

    # server.py
    import time
    import grpc
    from concurrent import futures
    from proto import example_pb2_grpc, example_pb2
     
     
    class ServiceBack(example_pb2_grpc.HelloServiceServicer):
        """接口的具体功能实现"""
     
        def Hello(self, request, context):
            """hello"""
            data = request.data
            print(data)
            ret_data = "Response:" + data
            return example_pb2.Response(ret=0, data=ret_data)
     
     
    def server(ip: str, port: int) -> None:
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # ⼤⼩为10的线程池
        ai_servicer = ServiceBack()
        example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
        server.add_insecure_port(f"{ip}:{port}")  
        server.start()
        try:
            print(f"server is started! ip:{ip} port:{str(port)}")
            while True:
                time.sleep(60 * 60)
        except Exception as es:
            print(es)
            server.stop(0)
     
     
    if __name__ == '__main__':
        server("127.0.0.1", 8000)
    # client.py
    import grpc
    from proto import example_pb2_grpc, example_pb2
     
     
    def client(ip: str, port: int) -> None:
        target = str(ip) + ":" + str(port)
        channel = grpc.insecure_channel(target)  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
     
        data = "hello 123"
        request = example_pb2.Request(data=data)
        res = cli.Hello(request)
        print(f"ret:{res.ret}, data:{res.data}")
     
     
    if __name__ == '__main__':
        client("127.0.0.1", 8000)

    3. grpc的使用之数据传输大小配置

    默认情况下,gRPC 将传入消息限制为 4 MB。 传出消息没有限制。

    1)example.proto定义不变

    2)编辑client.py 和 server.py

    # server.py
    import time
    import grpc
    from concurrent import futures
    from proto import example_pb2_grpc, example_pb2
     
     
    class ServiceBack(example_pb2_grpc.HelloServiceServicer):
        """接口的具体功能实现"""
     
        def Hello(self, request, context):
            """hello"""
            data = request.data
            print(data)
            ret_data = "Response:" + data
            return example_pb2.Response(ret=0, data=ret_data)
     
     
    def server(ip: str, port: int) -> None:
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
        ai_servicer = ServiceBack()
        example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
        server.add_insecure_port(f"{ip}:{port}")  
        server.start()
        try:
            print(f"server is started! ip:{ip} port:{str(port)}")
            while True:
                time.sleep(60 * 60)
        except Exception as es:
            print(es)
            server.stop(0)
     
     
    if __name__ == '__main__':
        server("127.0.0.1", 8000)
    # client.py
    import grpc
    from proto import example_pb2_grpc, example_pb2
     
     
    def client(ip: str, port: int) -> None:
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        target = str(ip) + ":" + str(port)
        channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
     
        data = "hello 123" * 1024 * 1024
        request = example_pb2.Request(data=data)
        res = cli.Hello(request)
        print(f"ret:{res.ret}, data:{res.data}")
     
     
    if __name__ == '__main__':
        client("127.0.0.1", 8000)

    4. grpc的使用之超时配置

    1)example.proto定义不变

    2)编辑client.py 和 server.py

    # server.py
    import time
    import grpc
    from concurrent import futures
    from proto import example_pb2_grpc, example_pb2
     
     
    class ServiceBack(example_pb2_grpc.HelloServiceServicer):
        """接口的具体功能实现"""
     
        def Hello(self, request, context):
            """hello"""
            data = request.data
            print(data)
            time.sleep(2)
            ret_data = "Response:" + data
            return example_pb2.Response(ret=0, data=ret_data)
     
     
    def server(ip: str, port: int) -> None:
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
        ai_servicer = ServiceBack()
        example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
        server.add_insecure_port(f"{ip}:{port}")  
        server.start()
        try:
            print(f"server is started! ip:{ip} port:{str(port)}")
            while True:
                time.sleep(60 * 60)
        except Exception as es:
            print(es)
            server.stop(0)
     
     
    if __name__ == '__main__':
        server("127.0.0.1", 8000)
    # client.py
    import sys
    import grpc
    from proto import example_pb2_grpc, example_pb2
     
     
    def client(ip: str, port: int) -> None:
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        target = str(ip) + ":" + str(port)
        channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            data = "hello 123"
            request = example_pb2.Request(data=data)
            res = cli.Hello(request, timeout=1)  # timeout 单位:秒
            print(f"ret:{res.ret}, data:{res.data}")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
     
     
    if __name__ == '__main__':
        client("127.0.0.1", 8000)

    运行结果:

    grpc.RpcError Deadline Exceeded

    5. grpc之大文件之流stream传输

    1)在example.proto重新定义传送体

    // 声明
    syntax = "proto3";
    package proto;
     
    // service创建
    service HelloService{
      rpc Hello(Request) returns (Response) {}  // 单单传送消息
      rpc ClientTOServer(stream UpFileRequest) returns (Response) {}  // 流式上传文件
      rpc ServerTOClient(Request) returns (stream UpFileRequest) {}  // 流式下载文件
    }
     
    // 请求参数消息体 1、2是指参数顺序
    message Request {
      string data = 1;
    }
     
    // 返回参数消息体
    message Response {
      int32 ret = 1;    //返回码
      string data = 2;
    }
     
    message UpFileRequest {
      string filename = 1;
      int64 sendsize = 2;
      int64 totalsize = 3;
      bytes data = 4;
    }
     
     
    //python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

    2)在虚拟环境里使用命令生成py文件,参考2. 2)

    3)编辑client.py 和 server.py

    # server.py
    import os
    import time
    import grpc
    from concurrent import futures
    from proto import example_pb2_grpc, example_pb2
     
     
    class ServiceBack(example_pb2_grpc.HelloServiceServicer):
        """接口的具体功能实现"""
     
        def Hello(self, request, context):
            """hello"""
            data = request.data
            print(data)
            time.sleep(2)
            ret_data = "Response:" + data
            return example_pb2.Response(ret=0, data=ret_data)
     
        def ClientTOServer(self, request_iterator, context):
            """上传文件"""
            data = bytearray()
            for UpFileRequest in request_iterator:
                file_name = UpFileRequest.filename
                file_size = UpFileRequest.totalsize
                file_data = UpFileRequest.data
                print(f"文件名称:{file_name}, 文件总长度:{file_size}")
                data.extend(file_data)  # 拼接两个bytes
                print(f"已接收长度:{len(data)}")
            if len(data) == file_size:
                with open("242_copy.mp3", "wb") as fw:
                    fw.write(data)
                print(f"{file_name=} 下载完成")
                (ret, res) = (0, file_name)
            else:
                print(f"{file_name=} 下载失败")
                (ret, res) = (-1, file_name)
            return example_pb2.Response(ret=ret, data=res)
     
        def ServerTOClient(self, request, context):
            """下载文件"""
            fp = request.data
            print(f"下载文件:{fp=}")
            # 获取文件名和文件大小
            file_name = os.path.basename(fp)
            file_size = os.path.getsize(fp)  # 获取文件大小
            # 发送文件内容
            part_size = 1024 * 1024  # 每次读取1MB数据
            count = 1
     
            with open(fp, "rb") as fr:
                while True:
                    try:
                        if count == 1:
                            count += 1
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                        else:
                            context = fr.read(part_size)
                            if context:
                                yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
                                                                sendsize=len(context),
                                                                data=context)
                            else:
                                print(f"发送完毕")
                                return 0
                    except Exception as es:
                        print(es)
     
     
    def server(ip: str, port: int) -> None:
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
        ai_servicer = ServiceBack()
        example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
        server.add_insecure_port(f"{ip}:{port}")  
        server.start()
        try:
            print(f"server is started! ip:{ip} port:{str(port)}")
            while True:
                time.sleep(60 * 60)
        except Exception as es:
            print(es)
            server.stop(0)
     
     
    if __name__ == '__main__':
        server("127.0.0.1", 8000)
    # client.py
    import os
    import sys
    import grpc
    from proto import example_pb2_grpc, example_pb2
     
     
    def send_stream_data(fp: str):
        """迭代器发送大文件"""
        # 获取文件名和文件大小
        file_name = os.path.basename(fp)
        file_size = os.path.getsize(fp)  # 获取文件大小
        # 发送文件内容
        part_size = 1024 * 1024  # 每次读取1MB数据
        count = 1
     
        with open(fp, "rb") as fr:
            while True:
                try:
                    if count == 1:
                        count += 1
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                    else:
                        context = fr.read(part_size)
                        if context:
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
                                                            data=context)
                        else:
                            print(f"发送完毕")
                            return 0
                except Exception as es:
                    print(es)
     
     
    def client(ip: str, port: int) -> None:
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        target = str(ip) + ":" + str(port)
        channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            data = "hello 123"
            request = example_pb2.Request(data=data)
            res = cli.Hello(request, timeout=1)  # timeout 单位:秒
            print(f"ret:{res.ret}, data:{res.data}")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
     
     
    def client_to_server(ip: str, port: int, fp: str):
        """
        流式上传数据。
        """
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        target = str(ip) + ":" + str(port)
        channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            request = send_stream_data(fp=fp)
            res = cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒
            print(f"ret:{res.ret}, data:{res.data}")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
     
     
    def server_to_client(ip: str, port: int, fp: str):
        """
        流式上传数据。
        """
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        target = str(ip) + ":" + str(port)
        channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            data = bytearray()
            request = example_pb2.Request(data=fp)
            filename = ""
            for res in cli.ServerTOClient(request, timeout=300):
                filename = res.filename
                total_size = res.totalsize
                data += res.data
            if total_size == len(data):
                with open("242_1.mp3", "wb") as fw:
                    fw.write(data)
                print(f"{filename=} : {total_size=} 下载完成!")
            else:
                print(f"{filename=} 下载失败!")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
     
     
    if __name__ == '__main__':
        # client("127.0.0.1", 8000)
        # client_to_server("127.0.0.1", 8000, "242.mp3")
        server_to_client("127.0.0.1", 8000, "242.mp3")

    6. grpc之大文件之流async异步传输

    # server.py
    import os
    import time
    import grpc
    from concurrent import futures
    from proto import example_pb2_grpc, example_pb2
    import asyncio
     
     
    class ServiceBack(example_pb2_grpc.HelloServiceServicer):
        """接口的具体功能实现"""
     
        def Hello(self, request, context):
            """hello"""
            data = request.data
            print(data)
            time.sleep(2)
            ret_data = "Response:" + data
            return example_pb2.Response(ret=0, data=ret_data)
     
        def ClientTOServer(self, request_iterator, context):
            """上传文件"""
            data = bytearray()
            for UpFileRequest in request_iterator:
                file_name = UpFileRequest.filename
                file_size = UpFileRequest.totalsize
                file_data = UpFileRequest.data
                print(f"文件名称:{file_name}, 文件总长度:{file_size}")
                data.extend(file_data)  # 拼接两个bytes
                print(f"已接收长度:{len(data)}")
            if len(data) == file_size:
                with open("242_copy.mp3", "wb") as fw:
                    fw.write(data)
                print(f"{file_name=} 下载完成")
                (ret, res) = (0, file_name)
            else:
                print(f"{file_name=} 下载失败")
                (ret, res) = (-1, file_name)
            return example_pb2.Response(ret=ret, data=res)
     
        def ServerTOClient(self, request, context):
            """下载文件"""
            fp = request.data
            print(f"下载文件:{fp=}")
            # 获取文件名和文件大小
            file_name = os.path.basename(fp)
            file_size = os.path.getsize(fp)  # 获取文件大小
            # 发送文件内容
            part_size = 1024 * 1024  # 每次读取1MB数据
            count = 1
     
            with open(fp, "rb") as fr:
                while True:
                    try:
                        if count == 1:
                            count += 1
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                        else:
                            context = fr.read(part_size)
                            if context:
                                yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
                                                                sendsize=len(context),
                                                                data=context)
                            else:
                                print(f"发送完毕")
                                return 0
                    except Exception as es:
                        print(es)
     
     
    async def server(ip: str, port: int) -> None:
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
        ai_servicer = ServiceBack()
        example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
        server.add_insecure_port(f"{ip}:{port}")
        await server.start()
        try:
            print(f"server is started! ip:{ip} port:{str(port)}")
            await server.wait_for_termination()
        except Exception as es:
            print(es)
            await server.stop(None)
     
     
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)]))
        loop.close()
    # client.py
    import os
    import sys
    import grpc
    from proto import example_pb2_grpc, example_pb2
    import asyncio
     
     
    def send_stream_data(fp: str):
        """迭代器发送大文件"""
        # 获取文件名和文件大小
        file_name = os.path.basename(fp)
        file_size = os.path.getsize(fp)  # 获取文件大小
        # 发送文件内容
        part_size = 1024 * 1024  # 每次读取1MB数据
        count = 1
     
        with open(fp, "rb") as fr:
            while True:
                try:
                    if count == 1:
                        count += 1
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                    else:
                        context = fr.read(part_size)
                        if context:
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
                                                            data=context)
                        else:
                            print(f"发送完毕")
                            return 0
                except Exception as es:
                    print(es)
     
     
    async def client(ip: str, port: int) -> None:
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        target = str(ip) + ":" + str(port)
        async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器
            cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
            try:
                data = "hello 123"
                request = example_pb2.Request(data=data)
                res = await cli.Hello(request, timeout=3)  # timeout 单位:秒
                print(f"ret:{res.ret}, data:{res.data}")
            except grpc.RpcError as rpc_error:
                print("grpc.RpcError", rpc_error.details())
            except Exception as es:
                print(es)
            finally:
                sys.exit(-1)
     
     
    async def client_to_server(ip: str, port: int, fp: str):
        """
        流式上传数据。
        """
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        target = str(ip) + ":" + str(port)
        async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器
            cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
            try:
                request = send_stream_data(fp=fp)
                res = await cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒
                print(f"ret:{res.ret}, data:{res.data}")
            except grpc.RpcError as rpc_error:
                print("grpc.RpcError", rpc_error.details())
            except Exception as es:
                print(es)
            finally:
                sys.exit(-1)
     
     
    def server_to_client(ip: str, port: int, fp: str):
        """
        流式上传数据。
        """
        # 数据传输大小配置
        max_message_length = 1024 * 1024 * 1024  # 1G
        options = [('grpc.max_send_message_length', max_message_length),
                   ('grpc.max_receive_message_length', max_message_length),
                   ('grpc.enable_retries', 1),
                   ]
        target = str(ip) + ":" + str(port)
        channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            data = bytearray()
            request = example_pb2.Request(data=fp)
            filename = ""
            for res in cli.ServerTOClient(request, timeout=300):
                filename = res.filename
                total_size = res.totalsize
                data += res.data
            if total_size == len(data):
                with open("242_1.mp3", "wb") as fw:
                    fw.write(data)
                print(f"{filename=} : {total_size=} 下载完成!")
            else:
                print(f"{filename=} 下载失败!")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
     
     
    if __name__ == '__main__':
        # asyncio.run(client("127.0.0.1", 8000))
        asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3"))
        # server_to_client("127.0.0.1", 8000, "242.mp3")

    以上就是使用Python语言实现消息传递的gRPC教程的详细内容,更多请关注php中文网其它相关文章!

    声明:本文转载于:亿速云,如有侵犯,请联系admin@php.cn删除
    专题推荐:Python grpc
    上一篇:常见的 Python 库有哪些? 下一篇:自己动手写 PHP MVC 框架(40节精讲/巨细/新人进阶必看)

    相关文章推荐

    • Python中数据类型如何转换• Python中的Array模块怎么使用• 怎么用Python展示全国高校的分布情况• 使用树状图可视化聚类• python numpy中linspace函数怎么使用
    1/1

    PHP中文网