在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
Asp.Net Core 3.0之后,对Grpc提供了高集成度的支持,对于需要连续传输大批量对象数据的应用场景而言,等于多了一条高铁线路。如果没有Grpc,连续传输大批量对象数据是一个很纠结的问题。用TCP的话,可以达到最高速度,但是传输过程的断线续传,对象数据的序列化和反序列化都要自己处理,开发效率低效。用HTTP的话,要频繁调用POST,反复建立连接,传输性能差。Grpc能够一次建立传输通道,多次传输对象数据,自动序列化和反序列化,并且采用ProtoBuf协议序列化对象数据,压缩率接近二进制byte数组,实现了TCP的性能优势和HTTP POST的使用方便性的完美结合。
但是Asp.Net Core使用proto文件定义传输对象比较费事,对于已经存在的Asp.Net Core Web项目,已经定义了很多DTO类,服务端和客户端还有其他数据传输方式,例如MQTT,HTTP等等,为了Grpc重新写一大堆代码,非常麻烦。所以决定寻找能够复用C#对象的Grpc解决方案。最终找到了这篇文章,使用protobuf-net.Grpc.AspNetCore解决了我的需求,非常感谢作者ElderJames。 https://www.cnblogs.com/ElderJames/p/code-first-generate-gRPC-services-and-clients-in-dotnet-core-3_0.html 《_NET Core 3.0中用 Code-First 方式创建 gRPC 服务与客户端 - Elder_James - 博客园.html》
决定写了一个Demo做练习。实现的需求是,客户端连续发送带有byte数组的对象到服务端,服务端保存对象,服务端会返回保存成功标志,客户端可以根据服务器的响应动态改变发送内容。 新建Net Standar类库GrpcShare,NuGet安装库 <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.0" /> <PackageReference Include="System.ServiceModel.Primitives" Version="4.7.0" /> 定义DTO对象,服务类接口 /// <summary> /// 文本消息 /// </summary> [DataContract] public class Message { /// <summary> /// 内容 /// </summary> [DataMember(Order = 1)] public string Context { get; set; } } /// <summary> /// 上传数据包请求 /// </summary> [DataContract] public class UploadRequest { /// <summary> /// 数据包索引 /// </summary> [DataMember(Order = 1)] public int Index { get; set; } /// <summary> /// 采样时间 /// </summary> [DataMember(Order = 2)] public DateTime SampleTime { get; set; } /// <summary> /// 内容 /// </summary> [DataMember(Order = 3)] public byte[] Content { get; set; } public override string ToString() { return $"发送第{Index}包数据, {SampleTime}, 长度={Content.Length}"; } } /// <summary> /// 上传数据包应答 /// </summary> [DataContract] public class UploadReply { /// <summary> /// 数据包索引 /// </summary> [DataMember(Order = 1)] public int Index { get; set; } /// <summary> /// 保存到数据库成功标志 /// </summary> [DataMember(Order = 2)] public bool ArchiveSuccess { get; set; } public override string ToString() { return $"收到第{Index}包数据, 保存成功标志={ArchiveSuccess}"; } } /// <summary> /// 上传数据包接口 /// </summary> [ServiceContract] public interface IUpload { /// <summary> /// 简单测试 /// </summary> /// <param name="message"></param> /// <returns></returns> [OperationContract] ValueTask<string> Hi(string message); /// <summary> /// 测试 /// </summary> /// <param name="message"></param> /// <returns></returns> [OperationContract] ValueTask<Message> Hello(Message message); /// <summary> /// 双向流式上传数据包 /// 注意IAsyncEnumerable需要NuGet安装Microsoft.Bcl.AsyncInterfaces,不是System.Interactive.Async /// </summary> /// <param name="stream"></param> /// <returns></returns> [OperationContract] IAsyncEnumerable<UploadReply> Upload(IAsyncEnumerable<UploadRequest> stream); } 新建Asp.net Core Web Api项目GrpcDemo,NuGet安装库 <PackageReference Include="protobuf-net.Grpc.AspNetCore" Version="1.0.22" />
Program定义Grpc服务端口 public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder .ConfigureKestrel(options => { options.ListenLocalhost(9988, listenOptions => { listenOptions.Protocols = HttpProtocols.Http2; }); }) .UseStartup<Startup>(); }); Startup添加Grpc服务和路由 public void ConfigureServices(IServiceCollection services) { services.AddControllers(); //添加Grpc服务 services.AddCodeFirstGrpc(); } app.UseEndpoints(endpoints => { endpoints.MapControllers(); //添加Grpc路由 endpoints.MapGrpcService<UploadService>(); }); 实现服务类UploadService /// <summary> /// 上传数据包服务 /// </summary> public class UploadService : IUpload { /// <summary> /// 简单测试 /// </summary> /// <param name="message"></param> /// <returns></returns> public ValueTask<string> Hi(string message) { Console.WriteLine($"收到客户端问候={message}"); return new ValueTask<string>("Hi,我是UploadService"); } /// <summary> /// 测试 /// </summary> /// <param name="message"></param> /// <returns></returns> public ValueTask<Message> Hello(Message message) { Console.WriteLine($"收到客户端问候={message.Context}"); var reply = new Message() { Context = "Hello,我是UploadService", }; return new ValueTask<Message>(reply); } /// <summary> /// 双向流式上传数据包 /// </summary> /// <param name="stream"></param> /// <returns></returns> public async IAsyncEnumerable<UploadReply> Upload(IAsyncEnumerable<UploadRequest> stream) { await foreach (var request in stream) { Console.WriteLine(request); await Task.Delay(TimeSpan.FromSeconds(1)); var reply = new UploadReply { Index = request.Index, //模拟保存失败 ArchiveSuccess = (DateTime.Now.Second % 3 < 2), }; yield return reply; } Console.WriteLine($"客户端关闭连接"); } } 新建Net Core控制台项目UploadClient,NuGet安装库 <PackageReference Include="Grpc.Net.Client" Version="2.26.0" /> <PackageReference Include="protobuf-net.Grpc" Version="1.0.22" />
简单测试很容易 //如果服务端没有加密传输,客户端必须设置 GrpcClientFactory.AllowUnencryptedHttp2 = true; using var http = GrpcChannel.ForAddress("http://localhost:9988"); var client = http.CreateGrpcService<IUpload>(); //简单测试 string request1 = "Hi, 我是UploadClient"; Console.WriteLine(request1); var result1 = await client.Hi(request1); Console.WriteLine($"收到服务端回应={result1}"); 在实现双向流式,交互式传输时,遇到一个问题,客户端如果需要根据服务端的响应,动态调整发送内容,该怎么办呢?客户端发送的参数是一个IAsyncEnumerable函数,它怎么把服务端响应作为参数再输入进入? await foreach (var reply in client.Upload(SendPackage()))
我没有找到现成的例子,从ElderJames的例子受到启发,把服务端响应放到一个Queue中,客户端定期读取队列,算是勉强解决了这个问题。当然还有其他很多办法,例如收到服务端响应发布一个消息事件,在事件处理函数中修改客户端发送内容等。但是总觉得不够简便。 //流式上传数据包 await foreach (var reply in client.Upload(SendPackage())) { //收到服务端回应后,丢到FIFO replyQueue.Enqueue(reply); } private static async IAsyncEnumerable<UploadRequest> SendPackage() { //上传第一包数据 var request = new UploadRequest { Index = 1, SampleTime = DateTime.Now, Content = Encoding.UTF8.GetBytes(DateTime.Now.ToString()), }; Console.WriteLine(request); yield return request; while (request.Index < 10) { await Task.Delay(TimeSpan.FromSeconds(1)); //从FIFO取出服务端回应 if (!replyQueue.TryDequeue(out UploadReply reply)) continue; Console.WriteLine($"收到服务端回应={reply}"); if (reply.ArchiveSuccess) { //如果服务端存档成功,上传下一包数据 request = new UploadRequest { Index = reply.Index + 1, SampleTime = DateTime.Now, Content = Encoding.UTF8.GetBytes(DateTime.Now.ToString()), }; } else { //如果服务端存档失败,重传上一包数据 } Console.WriteLine(request); yield return request; } } 这个DEMO的代码地址: https://github.com/woodsun2018/AspNetCoreGrpcDemo
|
请发表评论