也许你们可能没有接触过Dapr,但是一定对它“有所耳闻”,感觉今年它一下子就火了,一时间很多人都在谈论它。我们从其命名(Dapr的全称是“分布式应用运行时Distributed Application Runtime”)可以看出Dapr的定位,它并不是分布式应用的开发框架,它提供的是更底层的“运行时”。我们可以使用不同的编程语言,采用不同的开发框架在这个由Dapr提供的运行时上面构建分布式应用。我们接下来就来感受一下Dapr在.NET上面的开发体验,关于Dapr的基本信息以及环境的安装,请参阅官方文档。
[117]Dapr-服务调用 (源代码)
[118]Dapr-状态管理(源代码)
[119]Dapr-发布订阅(源代码)
[120]Dapr-Actor模型(源代码)
[117]Dapr-服务调用
Dapr是一个采用Service Mesh设计的分布式微服务运行时。每一个部署在Dapr上的应用实例(独立进程或者容器)都具有这一个专属的Sidecar,具体体现为一个独立的进程(daprd)或者容器。应用实例只会与它专属的Sidecar进行通信,跨应用通信是在两个应用实例的Sidecar之间进行的,具体的传输协议可以采用HTTP或者gRPC。正是因为应用实例和Sidecar是在各自的进程内独立运行的,所以Dapr才对应用开发采用的技术栈没有任何限制。我们接下来就通过一个简单的例子来演示Dapr下的服务调用。我们创建了如图1所示的解决方案。App1和App2代表两个具有依赖关系的应用,App1会调用App2提供的服务。Shared是一个类库项目,提供被App1和App2共享的数据类型。
我们依然沿用上面演示的数学运算应用场景,并在Shared项目中定义了如下两个数据类型。表示输入的Input类型提供了两个操作数(X和Y),表示输出的Output类型除了通过其Result属性表示运算结果外,还利用Timestamp属性返回运算时间戳。
1 public class Input 2 { 3 public int X { get; set; } 4 public int Y { get; set; } 5 } 6 7 public class Output 8 { 9 public int Result { get; set; } 10 public DateTimeOffset Timestamp { get; set; } = DateTimeOffset.Now; 11 } 12
App2就是一个简单的ASP.NET CORE应用,我们采用路由的方式注册了执行数学运算的终结点。如下面的代码片段所示,注册的终结点采用的路径模板为“/{method}”,路由参数“{method}”既表示运算操作类型,同时也作为Dapr服务的方法名。在作为终结点处理器的Calculate方法中,请求的主体内容被提取出来,经过反序列化后绑定为input参数。在根据提供的输入执行对应的运算并生成Output对象后,将其序列化成JSON,并以此作为响应的内容。
1 using Microsoft.AspNetCore.Mvc; 2 using Shared; 3 4 var app = WebApplication.Create(args); 5 app.MapPost("{method}", Calculate); 6 app.Run("http://localhost:9999"); 7 8 static IResult Calculate(string method, [FromBody] Input input) 9 { 10 var result = method.ToLower() switch 11 { 12 "add" => input.X + input.Y, 13 "sub" => input.X - input.Y, 14 "mul" => input.X * input.Y, 15 "div" => input.X / input.Y, 16 _ => throw new InvalidOperationException($"Invalid method {method}") 17 }; 18 return Results.Json(new Output { Result = result }); 19 }
在调用WebApplication对象的Run方法启动应用的时候,我们显式指定了监听地址,其目的是为了将端口(9999)固定下来。App2目前实际上与Dapr一点关系都没有,我们必须以Dapr的方式启动它才能将它部署到本机的Dapr环境中,具体来说我们可以执行“dapr run --app-id app2 --app-port 9999 -- dotnet run”这个命令来启动Sidecar的同时以子进程的方式启动应用。提供的命令行参数除了提供应用的启动方式(dotnet run)之外,还提供了针对应用的表示(--app-id app2)和监听的端口(--app-port 9999)。考虑到每次在控制台输入这些繁琐的命令有点麻烦,我们选择在launchSettings.json文件中定义如下这个Profile来以Dapr的方式启动应用。由于这种启动方式会将输出目录作为当前工作目录,我们选择指定程序集的方式来启动应用(dotnet App2.dll)。
1 { 2 "profiles": { 3 "Dapr": { 4 "commandName": "Executable", 5 "executablePath": "dapr", 6 "commandLineArgs": "run --app-id app2 --app-port 9999 -- dotnet App2.dll" 7 } 8 } 9 } 10
App1是一个简单的控制台应用,为了能够采用上述这种方式来启动它,我们还是将SDK从“Microsoft.NET.Sdk”改成“Microsoft.NET.Sdk.Web”。我们在launchSettings.json文件中定义了如下这个类似的Profile,应用的标识被设置成“app1”。由于App1仅仅涉及到对其他应用的调用,自身并不提供服务,所以我们不需要设置端口号。
1 { 2 "profiles": { 3 "Dapr": { 4 "commandName": "Executable", 5 "executablePath": "dapr", 6 "commandLineArgs": "run --app-id app1 -- dotnet App1.dll" 7 } 8 } 9 } 10
由于App1涉及到针对Dapr服务的调用,需要使用到Dapr客户端SDK提供的API,所以我们为它添加了“Dapr.Client”这个NuGet包的引用。具体的服务调用体现在下面的程序中,如代码片段所示,我们调用DaprClient的静态方法CreateInvokeHttpClient针对目标服务或者应用的标识“app2”创建了一个HttpClient对象,并利用该它完成了四个服务方法的调用。具体的服务调用实现在InvokeAsync这个本地方法中,在将作为输入的Input对象序列化成JSON文本之后,该方法会将其作为请求的主体内容。在一个分布式环境下,我们不需要知道目标服务所在的位置,因为这是不确定的,确定的唯有目标服务/应用的标识,所以我们直接将此标识作为请求的目标地址。在得到调用结果之后,我们对它进行了简单的格式化后直接输出到控制台上。
1 using Dapr.Client; 2 using Shared; 3 4 HttpClient client = DaprClient.CreateInvokeHttpClient(appId: "app2"); 5 var input = new Input(2, 1); 6 7 await InvokeAsync("add", "+"); 8 await InvokeAsync("sub", "-"); 9 await InvokeAsync("mul", "*"); 10 await InvokeAsync("div", "/"); 11 12 async Task InvokeAsync(string method, string @operator) 13 { 14 var response = await client.PostAsync(method, JsonContent.Create(input)); 15 var output = await response.Content.ReadFromJsonAsync<Output>(); 16 Console.WriteLine( $"{input.X} {@operator} {input.Y} = {output.Result} ({output.Timestamp})"); 17 } 18
我们先后启动App2和App1后,两个应用所在的控制台上会产生如图2所示的输出。应用输出的文本会采用“== App ==”作为前缀,其余内容为Sidecar输出的日志。从App2所在控制台(前面)上输出可以看出,它成功地完成了基于四种运算的服务调用。当笔者以Debug模式启动App1的时候有时会“闪退”的现象,如果你也出现这样的情况,可以选择非Debug模式(在解决方案窗口中右键选择该项目,选择Debug => Start Without Debuging)启动它。
图2 基于Dapr的服务调用
[118]Dapr-状态管理
我们可以借助Dapr提供的状态管理组件创建“有状态”的服务。这里的状态并不是存储在应用实例的进程中供其独享,而是存储在独立的存储中(比如Redis)共所有应用实例共享,所以并不是影响水平伸缩的能力。对于上面演示的实例,假设计算服务提供的是四个耗时的操作,那么我们就可以将计算结果缓存起来避免重复计算,我们现在就来实现这样的功能。为了能够使用到针对状态管理的API,我们为App2添加针对“Dapr.AspNetCore”这个NuGet包的引用。我们将缓存相关的三个操作定义在如下这个IResultCache接口中。如代码片段所示,该接口定义了三个方法,GetAsync方法根据指定的操作/方法名称和输入提取缓存的计算结果,SaveAsync方法负责将计算结果根据对应的方法名成和输入缓存起来,ClearAsync方法则将指定方法的所有缓存结果全部清除掉。
1 public interface IResultCache 2 { 3 Task<Output> GetAsync(string method, Input input); 4 Task SaveAsync(string method, Input input, Output output); 5 Task ClearAsync(params string[] methods); 6 } 7
如下所示的IResultCache接口的实现类型ResultCache的定义。我们在构造函数中注入了DaprClient对象,并利用它来完成状态管理的相关操作。计算结果缓存项的Key由方法名称和输入参数以 “Result_{method}_{X}_{Y}”这样的格式生成,具体的格式化体现在_keyGenerator字段返回的委托上。由于涉及到对缓存计算结果的清除,我们不得不将所有计算结果缓存项的Key也一并缓存起来,该缓存项采用的Key为“ResultKeys”。
1 public class ResultCache : IResultCache 2 { 3 private readonly DaprClient _daprClient; 4 private readonly string _keyOfKeys = "ResultKeys"; 5 private readonly string _storeName = "statestore"; 6 private readonly Func<string, Input, string> _keyGenerator; 7 8 public ResultCache(DaprClient daprClient) 9 { 10 _daprClient = daprClient; 11 _keyGenerator = (method, input) => $"Result_{method}_{input.X}_{input.Y}"; 12 } 13 14 public Task<Output> GetAsync(string method, Input input) 15 { 16 var key = _keyGenerator(method, input); 17 return _daprClient.GetStateAsync<Output>(storeName: _storeName, key: key); 18 } 19 20 public async Task SaveAsync(string method, Input input, Output output) 21 { 22 var key = _keyGenerator(method, input); 23 var keys = await _daprClient.GetStateAsync<HashSet<string>>(storeName: _storeName, key: _keyOfKeys) ?? new HashSet<string>(); 24 keys.Add(key); 25 26 var operations = new StateTransactionRequest[2]; 27 var value = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(output)); 28 operations[0] = new StateTransactionRequest(key: key, value: value, operationType: StateOperationType.Upsert); 29 30 value = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(keys)); 31 operations[1] = new StateTransactionRequest(key: _keyOfKeys, value: value, operationType: StateOperationType.Upsert); 32 await _daprClient.ExecuteStateTransactionAsync(storeName: _storeName, operations: operations); 33 } 34 35 public async Task ClearAsync(params string[] methods) 36 { 37 var keys = await _daprClient.GetStateAsync<HashSet<string>>(storeName: _storeName, key: _keyOfKeys); 38 if (keys != null) 39 { 40 var selectedKeys = keys .Where(it => methods.Any(m => it.StartsWith($"Result_{m}"))).ToArray(); 41 if (selectedKeys.Length > 0) 42 { 43 var operations = selectedKeys .Select(it => new StateTransactionRequest(key: it, value: null, operationType: StateOperationType.Delete)) .ToList(); 44 operations.ForEach(it => keys.Remove(it.Key)); 45 var value = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(keys)); 46 operations.Add(new StateTransactionRequest(key: _keyOfKeys, value: value, operationType: StateOperationType.Upsert)); 47 await _daprClient.ExecuteStateTransactionAsync(storeName: _storeName, operations: operations); 48 } 49 } 50 } 51 }
在实现的GetAsync方法中,我们根据指定的方法名称和输入生成对应缓存项的Key,并调用DaprClient对象的GetStateAsync<TValue>提取对应缓存项的值。我们将Key作为该方法的第二个参数,第一个参数代表状态存储(State Store)组件的名称。Dapr在初始化过程中会默认为我们设置一个针对Redis的状态存储组件,并将其命名为“statestore”,ResultCache使用正式这个状态存储组件。
对单一状态值进行设置只需要调用DaprClient对象的SaveStateAsync<TValue>方法就可以了,但是我们实现的SaveAsync方法除了需要缓存计算结果外,还需要修正“ResultKeys”这个缓存的值。为了确保两者的一致性,两个缓存项的更新最好在同一个事务中进行,为此我们调用了DaprClient对象的ExecuteStateTransactionAsync方法。我们为此创建了两个StateTransactionRequest对象来描述对这两个缓存项的更新,具体来说需要设置缓存项的Key、Value和操作类型(Upsert)。这里设置的值必须是最原始的二进制数组,由于状态管理组件默认采用JSON的序列化方式和UTF-8编码,所以我们按照这样的规则生成了作为缓存值的二进制数组。另一个实现的ClearAsync方法采用类似的方式删除指定方法的计算结果缓存,并修正了“ResultKeys”缓存项的值。
接下来我们需要对计算服务的处理方法Calculate作必要的修改。如下面的代码片段所示,我们直接在该方法中注入了IResultCache对象。如果能够利用该对象提取出了缓存的计算结果,我们会直接将它返回给客户端。只有在对应计算结果尚未缓存的情况下,我们才会真正实施计算。在返回计算结果之前,我们会对计算结果实施缓存。该方法中注入IResultCache和DaprClient对象对应的服务在WebApplication被构建之前进行了注册。
1 using App2; 2 using Microsoft.AspNetCore.Mvc; 3 using Shared; 4 var builder = WebApplication.CreateBuilder(args); 5 builder.Services 6 .AddSingleton<IResultCache, ResultCache>() 7 .AddDaprClient(); 8 var app = builder.Build(); 9 app.MapPost("/{method}", Calculate); 10 app.Run("http://localhost:9999"); 11 12 static async Task<IResult> Calculate(string method, [FromBody] Input input, IResultCache resultCache) 13 { 14 var output = await resultCache.GetAsync(method, input); 15 if (output == null) 16 { 17 var result = method.ToLower() switch 18 { 19 "add" => input.X + input.Y, 20 "sub" => input.X - input.Y, 21 "mul" => input.X * input.Y, 22 "div" => input.X / input.Y, 23 _ => throw new InvalidOperationException($"Invalid operation {method}") 24 }; 25 output = new Output { Result = result }; 26 await resultCache.SaveAsync(method, input, output); 27 } 28 return Results.Json(output); 29 } 30
由于两轮服务调用使用相同的输入。如果服务端对计算结果进行了缓存,那么针对同一个方法的调用就应该具有相同的时间戳,如图3所示的输出结果证实了这一点。
[119]Dapr-发布订阅
Dapr提供了“开箱即用”的发布订阅(“Pub/Sub”)模块,我们可以将其作为消息队列来用。上面演示的实例利用状态管理组件缓存了计算结果,现在我们采用发布订阅的方法将指定方法的计算结果缓存清除掉。具体来说,我们在App2中订阅“删除缓存”的主题(Topic),当接收到发布的对应主题的消息时,我们从消息中提待删除的方法列表,并将对应的计算结果缓存清除掉。至于“删除缓存”的主题的发布,我们将它交给App1来完成。我们为此对App2再次做出修改。如下面的代码片段所示,我们针对路径“clear”注册了一个作为“删除缓存”主题的订阅终结点,它对应的处理方法为ClearAsync。我们通过标注在该方法上的TopicAttribute来对订阅的主题作相应设置。该特性构造函数的第一个参数为采用的发布订阅组件名称,我们采用的是初始化Dapr是设置的基于Redis的发布订阅组件,该组件命名为“pubsub”。第二个参数表示订阅主题的名称,我们将其设置为“clearresult”。
1 using App2; 2 using Dapr; 3 using Microsoft.AspNetCore.Mvc; 4 using Shared; 5 var builder = WebApplication.CreateBuilder(args); 6 builder.Services 7 .AddSingleton<IResultCache, ResultCache>() 8 .AddDaprClient(); 9 var app = builder.Build(); 10 11 app.UseCloudEvents(); 12 app.MapPost("clear", ClearAsync); 13 app.MapSubscribeHandler(); 14 15 app.MapPost("/{method}", Calculate); 16 app.Run("http://localhost:9999"); 17 18 [Topic(pubsubName:"pubsub", name:"clearresult")] 19 static Task ClearAsync(IResultCache cache, [FromBody] string[] methods) => cache.ClearAsync(methods); 21 22 static async Task<IResult> Calculate(string method, [FromBody]Input input, IResultCache resultCache) ...
ClearAsync方法定义了两个参数,第一个参数会默认绑定为注册的IResultCache服务,第二个参数表示待删除的方法列表,上面标注的FromBodyAttribute特性将指导路由系统通过提取请求主体内容来绑定对应参数值。但是Dapr的发布订阅组件默认采用Cloud Events消息格式,如果请求的主体为具有如此结构的消息,按照默认的绑定规则,针对input参数的绑定将会失败。为此我们调用WebApplication对象的UseCloudEvents扩展方法额外注册了一个CloudEventsMiddleware中间件,该中间件会提取出请求数据部分的内容,并使用它将整个请求主体部分的内容替换掉,那么针对methods参数的绑定就能成功了。我们还调用WebApplication对象的MapSubscribeHandler扩展方法注册了一个额外的终结点。在应用启动的时候,Sidecar会利用这个终结点收集当前应用提供的所有订阅处理器的元数据信息,其中包括发布订阅组件和主题名称,以及调用的路由或路径(对于本例就是“clear”)。当Sidecar接受到发布消息后,会根据这组元数据选择匹配的订阅处理器,并利用其提供的路径完成对它的调用。
我们针对发布者的角色对App1做了相应的修改。如下面的代码片段所示,我们利用创建的DaprClientBuilder构建了一个DaprClient对象。在两轮针对计算服务的调用之间,我们调用了DaprClient的PublishEventAsync方法发布了一个名为“clearresult”的消息。从提供的第三个参数可以看出,我们仅仅清除“加法”和“减法”这两个方法的计算结果缓存。图4所示的是App1运行之后在控制台上的输出。对于两轮间隔为5秒的服务调用,加法和减法的结果由于缓存被清除,所以它们具有不同的时间戳,但乘法和除法的计算时间依旧是相同的。
图4 利用发布订阅组件删除结果缓存
[120]Dapr-Actor模型
如果分布式系统待解决的功能可以分解成若干很小且独立状态逻辑单元,我们可以考虑使用Actor模型(Model)进行设计。具体来说,我们将上述这些状态逻辑单元定义成单个的Actor,并在它们之间采用消息驱动的通信方法完成整个工作流程。每个Actor只需要考虑对接收的消息进行处理,并将后续的操作转换成消息分发给另一个Actor就可以了。由于每个Actor以单线程模式执行,我们无需考虑多线程并发和同步的问题。由于Actor之间的交互是完全无阻塞的,一般能够提高系统整体的吞吐量。
接下来我们依然通过对上面演示实例的修改来演示Dapr的Actor模型在.NET下的应用。这次我们将一个具有状态的累加计数器设计成Actor。我们在Shared项目中为这个Actor定义了一个接口,如下面的代码片段所示,这个名为IAccumulator的接口派生于IActor,由于后者来源于“Dapr.Actors”这个NuGet包,所以我们需要添加对应的包引用。IAccumulator接口定义了两个方法,IncreaseAsync方法根据指定的数值进行累加并返回当前的值, ResetAsync方法将累加数值重置归零。
public interface IAccumulator: IActor { Task<int> IncreaseAsync(int count); Task ResetAsync(); }
我们将IAccumulator接口的实现类型Accumulator定义在App2中。如下面的代码片段所示,除了实现对应的接口,Accumulator类型还继承了Actor这个基类。由于每个Actor提供当前累加的值,所以它们是有状态的。但是不能利用Accumulator实例的属性来维持这个状态,我们使用从基类继承下来的StateManager属性返回的IActorStateManager对象来管理当前Actor的状态。具体来说,我们调用TryGetStateAsync方法提取当前Actor针对指定名称(“__counter”)的状态值,新的状态值通过调用它的SetStateAsync方法进行设置。由于IActorStateManager对象的SetStateAsync方法对状态所作的更新都是本地操作,我们最终还需要调用Actor对象自身的SaveStateAsync方法提交所有的状态更新。Actor的状态依旧是通过Dapr的状态管理组件进行存储的。
public class Accumulator : Actor, IAccumulator { private readonly string _stateName = "__counter"; public Accumulator(ActorHost host) : base(host) { } public async Task<int> IncreaseAsync(int count) { var counter = 0; var existing = await StateManager.TryGetStateAsync<int>(stateName: _stateName); if(existing.HasValue) { counter = existing.Value; } counter+= count; await StateManager.SetStateAsync(stateName: _stateName, value:counter); await SaveStateAsync(); return counter; } public async Task ResetAsync() { await StateManager.TryRemoveStateAsync(stateName: _stateName); await SaveStateAsync(); } }
承载Actor相关的API由“Dapr.Actors.AspNetCore”这个NuGet包提供,所以我们需要添加该包的引用。Actor的承载方式与MVC框架类似,它们都是建立在路由系统上,MVC框架将所有Controller类型转换成注册的终结点,而Actor的终结点由WebApplication的MapActorsHandlers扩展方法进行注册。在注册中间件之前,我们还需要调用IServiceCollection接口的AddActors扩展方法将注册的Actor类型添加到ActorRuntimeOptions配置选项上。
using App2; var builder = WebApplication.CreateBuilder(args); builder.Services.AddActors(options => options.Actors.RegisterActor<Accumulator>()); var app = builder.Build(); app.MapActorsHandlers(); app.Run("http://localhost:9999");
我们在App1中编写了如下的程序来演示针对Actor的调用。如代码片段所示,我们调用ActorProxy的静态方法Create<TActor>创建了两个IAccumulator对象。创建Actor对象(其实是调用Actor的代理)时需要指定唯一标识Actor的ID(“001”和“002”)和对应的类型(“Accumulator”)。
using Dapr.Actors; using Dapr.Actors.Client; using Shared; var accumulator1 = ActorProxy.Create<IAccumulator>(new ActorId("001"), "Accumulator"); var accumulator2 = ActorProxy.Create<IAccumulator>(new ActorId("002"), "Accumulator"); while (true) { var counter1 = await accumulator1.IncreaseAsync(1); var counter2 = await accumulator2.IncreaseAsync(2); await Task.Delay(5000); Console.WriteLine($"001: {counter1}"); Console.WriteLine($"002: {counter2}\n"); if (counter1 > 10) { await accumulator1.ResetAsync(); } if (counter2 > 20) { await accumulator2.ResetAsync(); } }
Actor对象创建出来后,我们在一个循环中采用不同的步长(1和2)调用它们的IncreaseAsync实施累加操作。在计数器数值达到上限(10和20)时,我们调用它们的ResetAsync方法重置计数器。在先后启动App2和App1之后,App1所在控制台上将会以如图5所示的形式输出两个累加计数器提供的计数。
图5 Actor模式实现的累加计数器
请发表评论