使用python语言的grpc的消息传送
创始人
2025-05-31 03:58:02

目录

1. grpc开源包的安装

2. grpc的使用之传送消息

 3. grpc的使用之数据传输大小配置

4. grpc的使用之超时配置

5. grpc之大文件之流stream传输

6. grpc之大文件之流async异步传输


1. grpc开源包的安装

# conda
$ conda create -n grpc_env python=3.9# install grpc
$ pip install grpc -i https://pypi.doubanio.com/simple
$ pip install grpc-tools -i https://pypi.doubanio.com/simple# 有时proto生成py文件不对就是得换换grpc两个包的版本

2. grpc的使用之传送消息

整体结构,client.py server.py 和proto目录下的example.proto

1)在example.proto定义传送体

// 声明
syntax = "proto3";
package proto;// service创建
service HelloService{rpc Hello(Request) returns (Response) {}  // 单单传送消息
}// 请求参数消息体 1、2是指参数顺序
message Request {string data = 1;
}// 返回参数消息体
message Response {int32 ret = 1;    //返回码string data = 2;
}//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2) 在虚拟环境里使用命令生成py文件

$ conda activate grpc_env
$ f:
$ cd F:\examples
$ python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

在proto目录下会生成两个py文件,如下图所示:

3) 编辑client.py 和 server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2class ServiceBack(example_pb2_grpc.HelloServiceServicer):"""接口的具体功能实现"""def Hello(self, request, context):"""hello"""data = request.dataprint(data)ret_data = "Response:" + datareturn example_pb2.Response(ret=0, data=ret_data)def server(ip: str, port: int) -> None:server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # ⼤⼩为10的线程池ai_servicer = ServiceBack()example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)server.add_insecure_port(f"{ip}:{port}")  server.start()try:print(f"server is started! ip:{ip} port:{str(port)}")while True:time.sleep(60 * 60)except Exception as es:print(es)server.stop(0)if __name__ == '__main__':server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2def client(ip: str, port: int) -> None:target = str(ip) + ":" + str(port)channel = grpc.insecure_channel(target)  # 连接rpc服务器cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stubdata = "hello 123"request = example_pb2.Request(data=data)res = cli.Hello(request)print(f"ret:{res.ret}, data:{res.data}")if __name__ == '__main__':client("127.0.0.1", 8000)

 3. grpc的使用之数据传输大小配置

默认情况下,gRPC 将传入消息限制为 4 MB。 传出消息没有限制。

2. 1)example.proto定义不变

2)编辑client.py 和 server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2class ServiceBack(example_pb2_grpc.HelloServiceServicer):"""接口的具体功能实现"""def Hello(self, request, context):"""hello"""data = request.dataprint(data)ret_data = "Response:" + datareturn example_pb2.Response(ret=0, data=ret_data)def server(ip: str, port: int) -> None:# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池ai_servicer = ServiceBack()example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)server.add_insecure_port(f"{ip}:{port}")  server.start()try:print(f"server is started! ip:{ip} port:{str(port)}")while True:time.sleep(60 * 60)except Exception as es:print(es)server.stop(0)if __name__ == '__main__':server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2def client(ip: str, port: int) -> None:# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]target = str(ip) + ":" + str(port)channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stubdata = "hello 123" * 1024 * 1024request = example_pb2.Request(data=data)res = cli.Hello(request)print(f"ret:{res.ret}, data:{res.data}")if __name__ == '__main__':client("127.0.0.1", 8000)

4. grpc的使用之超时配置

2. 1)example.proto定义不变

2)编辑client.py 和 server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2class ServiceBack(example_pb2_grpc.HelloServiceServicer):"""接口的具体功能实现"""def Hello(self, request, context):"""hello"""data = request.dataprint(data)time.sleep(2)ret_data = "Response:" + datareturn example_pb2.Response(ret=0, data=ret_data)def server(ip: str, port: int) -> None:# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池ai_servicer = ServiceBack()example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)server.add_insecure_port(f"{ip}:{port}")  server.start()try:print(f"server is started! ip:{ip} port:{str(port)}")while True:time.sleep(60 * 60)except Exception as es:print(es)server.stop(0)if __name__ == '__main__':server("127.0.0.1", 8000)
# client.py
import sys
import grpc
from proto import example_pb2_grpc, example_pb2def client(ip: str, port: int) -> None:# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]target = str(ip) + ":" + str(port)channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stubtry:data = "hello 123"request = example_pb2.Request(data=data)res = cli.Hello(request, timeout=1)  # timeout 单位:秒print(f"ret:{res.ret}, data:{res.data}")except grpc.RpcError as rpc_error:print("grpc.RpcError", rpc_error.details())except Exception as es:print(es)finally:sys.exit(-1)if __name__ == '__main__':client("127.0.0.1", 8000)

运行结果: grpc.RpcError Deadline Exceeded

5. grpc之大文件之流stream传输

1)在example.proto重新定义传送体

// 声明
syntax = "proto3";
package proto;// service创建
service HelloService{rpc Hello(Request) returns (Response) {}  // 单单传送消息rpc ClientTOServer(stream UpFileRequest) returns (Response) {}  // 流式上传文件rpc ServerTOClient(Request) returns (stream UpFileRequest) {}  // 流式下载文件
}// 请求参数消息体 1、2是指参数顺序
message Request {string data = 1;
}// 返回参数消息体
message Response {int32 ret = 1;    //返回码string data = 2;
}message UpFileRequest {string filename = 1;int64 sendsize = 2;int64 totalsize = 3;bytes data = 4;
}//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2)在虚拟环境里使用命令生成py文件,参考2. 2)

3)编辑client.py 和 server.py

# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2class ServiceBack(example_pb2_grpc.HelloServiceServicer):"""接口的具体功能实现"""def Hello(self, request, context):"""hello"""data = request.dataprint(data)time.sleep(2)ret_data = "Response:" + datareturn example_pb2.Response(ret=0, data=ret_data)def ClientTOServer(self, request_iterator, context):"""上传文件"""data = bytearray()for UpFileRequest in request_iterator:file_name = UpFileRequest.filenamefile_size = UpFileRequest.totalsizefile_data = UpFileRequest.dataprint(f"文件名称:{file_name}, 文件总长度:{file_size}")data.extend(file_data)  # 拼接两个bytesprint(f"已接收长度:{len(data)}")if len(data) == file_size:with open("242_copy.mp3", "wb") as fw:fw.write(data)print(f"{file_name=} 下载完成")(ret, res) = (0, file_name)else:print(f"{file_name=} 下载失败")(ret, res) = (-1, file_name)return example_pb2.Response(ret=ret, data=res)def ServerTOClient(self, request, context):"""下载文件"""fp = request.dataprint(f"下载文件:{fp=}")# 获取文件名和文件大小file_name = os.path.basename(fp)file_size = os.path.getsize(fp)  # 获取文件大小# 发送文件内容part_size = 1024 * 1024  # 每次读取1MB数据count = 1with open(fp, "rb") as fr:while True:try:if count == 1:count += 1yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")else:context = fr.read(part_size)if context:yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,sendsize=len(context),data=context)else:print(f"发送完毕")return 0except Exception as es:print(es)def server(ip: str, port: int) -> None:# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池ai_servicer = ServiceBack()example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)server.add_insecure_port(f"{ip}:{port}")  server.start()try:print(f"server is started! ip:{ip} port:{str(port)}")while True:time.sleep(60 * 60)except Exception as es:print(es)server.stop(0)if __name__ == '__main__':server("127.0.0.1", 8000)
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2def send_stream_data(fp: str):"""迭代器发送大文件"""# 获取文件名和文件大小file_name = os.path.basename(fp)file_size = os.path.getsize(fp)  # 获取文件大小# 发送文件内容part_size = 1024 * 1024  # 每次读取1MB数据count = 1with open(fp, "rb") as fr:while True:try:if count == 1:count += 1yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")else:context = fr.read(part_size)if context:yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),data=context)else:print(f"发送完毕")return 0except Exception as es:print(es)def client(ip: str, port: int) -> None:# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]target = str(ip) + ":" + str(port)channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stubtry:data = "hello 123"request = example_pb2.Request(data=data)res = cli.Hello(request, timeout=1)  # timeout 单位:秒print(f"ret:{res.ret}, data:{res.data}")except grpc.RpcError as rpc_error:print("grpc.RpcError", rpc_error.details())except Exception as es:print(es)finally:sys.exit(-1)def client_to_server(ip: str, port: int, fp: str):"""流式上传数据。"""# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]target = str(ip) + ":" + str(port)channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stubtry:request = send_stream_data(fp=fp)res = cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒print(f"ret:{res.ret}, data:{res.data}")except grpc.RpcError as rpc_error:print("grpc.RpcError", rpc_error.details())except Exception as es:print(es)finally:sys.exit(-1)def server_to_client(ip: str, port: int, fp: str):"""流式上传数据。"""# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]target = str(ip) + ":" + str(port)channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stubtry:data = bytearray()request = example_pb2.Request(data=fp)filename = ""for res in cli.ServerTOClient(request, timeout=300):filename = res.filenametotal_size = res.totalsizedata += res.dataif total_size == len(data):with open("242_1.mp3", "wb") as fw:fw.write(data)print(f"{filename=} : {total_size=} 下载完成!")else:print(f"{filename=} 下载失败!")except grpc.RpcError as rpc_error:print("grpc.RpcError", rpc_error.details())except Exception as es:print(es)finally:sys.exit(-1)if __name__ == '__main__':# client("127.0.0.1", 8000)# client_to_server("127.0.0.1", 8000, "242.mp3")server_to_client("127.0.0.1", 8000, "242.mp3")

6. grpc之大文件之流async异步传输

# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
import asyncioclass ServiceBack(example_pb2_grpc.HelloServiceServicer):"""接口的具体功能实现"""def Hello(self, request, context):"""hello"""data = request.dataprint(data)time.sleep(2)ret_data = "Response:" + datareturn example_pb2.Response(ret=0, data=ret_data)def ClientTOServer(self, request_iterator, context):"""上传文件"""data = bytearray()for UpFileRequest in request_iterator:file_name = UpFileRequest.filenamefile_size = UpFileRequest.totalsizefile_data = UpFileRequest.dataprint(f"文件名称:{file_name}, 文件总长度:{file_size}")data.extend(file_data)  # 拼接两个bytesprint(f"已接收长度:{len(data)}")if len(data) == file_size:with open("242_copy.mp3", "wb") as fw:fw.write(data)print(f"{file_name=} 下载完成")(ret, res) = (0, file_name)else:print(f"{file_name=} 下载失败")(ret, res) = (-1, file_name)return example_pb2.Response(ret=ret, data=res)def ServerTOClient(self, request, context):"""下载文件"""fp = request.dataprint(f"下载文件:{fp=}")# 获取文件名和文件大小file_name = os.path.basename(fp)file_size = os.path.getsize(fp)  # 获取文件大小# 发送文件内容part_size = 1024 * 1024  # 每次读取1MB数据count = 1with open(fp, "rb") as fr:while True:try:if count == 1:count += 1yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")else:context = fr.read(part_size)if context:yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,sendsize=len(context),data=context)else:print(f"发送完毕")return 0except Exception as es:print(es)async def server(ip: str, port: int) -> None:# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池ai_servicer = ServiceBack()example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)server.add_insecure_port(f"{ip}:{port}")await server.start()try:print(f"server is started! ip:{ip} port:{str(port)}")await server.wait_for_termination()except Exception as es:print(es)await server.stop(None)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)]))loop.close()
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
import asynciodef send_stream_data(fp: str):"""迭代器发送大文件"""# 获取文件名和文件大小file_name = os.path.basename(fp)file_size = os.path.getsize(fp)  # 获取文件大小# 发送文件内容part_size = 1024 * 1024  # 每次读取1MB数据count = 1with open(fp, "rb") as fr:while True:try:if count == 1:count += 1yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")else:context = fr.read(part_size)if context:yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),data=context)else:print(f"发送完毕")return 0except Exception as es:print(es)async def client(ip: str, port: int) -> None:# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]target = str(ip) + ":" + str(port)async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stubtry:data = "hello 123"request = example_pb2.Request(data=data)res = await cli.Hello(request, timeout=3)  # timeout 单位:秒print(f"ret:{res.ret}, data:{res.data}")except grpc.RpcError as rpc_error:print("grpc.RpcError", rpc_error.details())except Exception as es:print(es)finally:sys.exit(-1)async def client_to_server(ip: str, port: int, fp: str):"""流式上传数据。"""# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]target = str(ip) + ":" + str(port)async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stubtry:request = send_stream_data(fp=fp)res = await cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒print(f"ret:{res.ret}, data:{res.data}")except grpc.RpcError as rpc_error:print("grpc.RpcError", rpc_error.details())except Exception as es:print(es)finally:sys.exit(-1)def server_to_client(ip: str, port: int, fp: str):"""流式上传数据。"""# 数据传输大小配置max_message_length = 1024 * 1024 * 1024  # 1Goptions = [('grpc.max_send_message_length', max_message_length),('grpc.max_receive_message_length', max_message_length),('grpc.enable_retries', 1),]target = str(ip) + ":" + str(port)channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stubtry:data = bytearray()request = example_pb2.Request(data=fp)filename = ""for res in cli.ServerTOClient(request, timeout=300):filename = res.filenametotal_size = res.totalsizedata += res.dataif total_size == len(data):with open("242_1.mp3", "wb") as fw:fw.write(data)print(f"{filename=} : {total_size=} 下载完成!")else:print(f"{filename=} 下载失败!")except grpc.RpcError as rpc_error:print("grpc.RpcError", rpc_error.details())except Exception as es:print(es)finally:sys.exit(-1)if __name__ == '__main__':# asyncio.run(client("127.0.0.1", 8000))asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3"))# server_to_client("127.0.0.1", 8000, "242.mp3")

结论: 在本地测了一下不加async和加async的文件上传传送, async还慢点,嘿嘿嘿。

相关内容

热门资讯

[第一财经]“大宝连云港麻将确... 亲:大宝连云港麻将这款游戏是可以开挂的,确实是有挂的,添加客服【3671900】很多玩家在这款游戏中...
「全新升级」美味冰淇淋.到底有... 「全新升级」美味冰淇淋.到底有挂吗[原来真的有挂]您好:美味冰淇淋这款游戏可以开挂,确实是有挂的,需...
重大通报“518乐游斗牛透视辅... 您好:518乐游斗牛这款游戏可以开挂,确实是有挂的,需要软件加微信【4194432】,很多玩家在51...
今日重大通报“南通长牌有挂吗”... 您好:【南通长牌】这款游戏可以开挂,确实是有挂的,需要了解加客服微信【3636476】很多玩家在这款...
重大通报“七天乐游有开挂辅助软... 您好:【七天乐游】这款游戏可以开挂,确实是有挂的,需要了解加客服微信【6948699】很多玩家在这款...