在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
目录
背景 通过LINQ可以方便的查询并处理不同的数据源,使用Parallel LINQ (PLINQ)来充分获得并行化所带来的优势。 PLINQ不仅实现了完整的LINQ操作符,而且还添加了一些用于执行并行的操作符,与对应的LINQ相比,通过PLINQ可以获得明显的加速,但是具体的加速效果还要取决于具体的场景,不过在并行化的情况下一段会加速。 如果一个查询涉及到大量的计算和内存密集型操作,而且顺序并不重要,那么加速会非常明显,然而,如果顺序很重要,那么加速就会受到影响。
AsParallel() 启用查询的并行化 下面贴代码,看下效果,详情见注释: class MRESDemo { /*code:释迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多条数据 可以修改数据量查看Linq和Plinq的性能*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); /*采用LINQ查询符合条件的数据*/ Stopwatch sw = new Stopwatch(); sw.Restart(); var productListLinq = from product in products where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("采用Linq 查询得出数量为:{0}", productListLinq.Count()); sw.Stop(); Console.WriteLine("采用Linq 耗时:{0}", sw.ElapsedMilliseconds); /*采用PLINQ查询符合条件的数据*/ sw.Restart(); var productListPLinq = from product in products.AsParallel() /*AsParallel 试图利用运行时所有可用的逻辑内核,从而使运行的速度比串行的版本要快 但是需要注意开销所带来的性能损耗*/ where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("采用PLinq 查询得出数量为:{0}", productListPLinq.Count()); sw.Stop(); Console.WriteLine("采用PLinq 耗时:{0}", sw.ElapsedMilliseconds); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
AsOrdered()与orderby orderby:将返回的结果集按指定顺序进行排序 下面贴代码方便大家理解: class MRESDemo { /*code:释迦苦僧*/ static void Main() { ConcurrentQueue<string> products = new ConcurrentQueue<string>(); products.Enqueue("E"); products.Enqueue("F"); products.Enqueue("B"); products.Enqueue("G"); products.Enqueue("A"); products.Enqueue("C"); products.Enqueue("SS"); products.Enqueue("D"); /*不采用并行化 其数据输出结果 不做任何处理 */ var productListLinq = from product in products where (product.Length == 1) select product; string appendStr = string.Empty; foreach (string str in productListLinq) { appendStr += str + " "; } Console.WriteLine("不采用并行化 输出:{0}", appendStr); /*不采用任何排序策略 其数据输出结果 是直接将分区数据结果合并起来 不做任何处理 */ var productListPLinq = from product in products.AsParallel() where (product.Length == 1) select product; appendStr = string.Empty; foreach (string str in productListPLinq) { appendStr += str + " "; } Console.WriteLine("不采用AsOrdered 输出:{0}", appendStr); /*采用 AsOrdered 排序策略 其数据输出结果 是直接将分区数据结果合并起来 并按原始数据顺序排序*/ var productListPLinq1 = from product in products.AsParallel().AsOrdered() where (product.Length == 1) select product; appendStr = string.Empty; foreach (string str in productListPLinq1) { appendStr += str + " "; } Console.WriteLine("采用AsOrdered 输出:{0}", appendStr); /*采用 orderby 排序策略 其数据输出结果 是直接将分区数据结果合并起来 并按orderby要求进行排序*/ var productListPLinq2 = from product in products.AsParallel() where (product.Length == 1) orderby product select product; appendStr = string.Empty; foreach (string str in productListPLinq2) { appendStr += str + " "; } Console.WriteLine("采用orderby 输出:{0}", appendStr); Console.ReadLine(); } } 在PLINQ查询中,AsOrdered()和orderby子句都会降低运行速度,所以如果顺序并不是必须的,那么在请求特定顺序的结果之前,将加速效果与串行执行的性能进行比较是非常重要的。
指定执行模式 WithExecutionMode 对串行化代码进行并行化,会带来一定的额外开销,Plinq查询执行并行化也是如此,在默认情况下,执行PLINQ查询的时候,.NET机制会尽量避免高开销的并行化算法,这些算法有可能会将执行的性能降低到地狱串行执行的性能。 .NET会根据查询的形态做出决策,并不开了数据集大小和委托执行的时间,不过也可以强制并行执行,而不用考虑执行引擎分析的结果,可以调用WithExecutionMode方法来进行设置。、 下面贴代码,方便大家理解 class MRESDemo { /*code:释迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多条数据*/ Parallel.For(0, 6000000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); /*采用并行化整个查询 查询符合条件的数据*/ Stopwatch sw = new Stopwatch(); sw.Restart(); var productListLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism) where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("采用并行化整个查询 查询得出数量为:{0}", productListLinq.Count()); sw.Stop(); Console.WriteLine("采用并行化整个查询 耗时:{0}", sw.ElapsedMilliseconds); /*采用默认设置 由.NET进行决策 查询符合条件的数据*/ sw.Restart(); var productListPLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.Default) where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2")) select product; Console.WriteLine("采用默认设置 由.NET进行决策 查询得出数量为:{0}", productListPLinq.Count()); sw.Stop(); Console.WriteLine("采用默认设置 由.NET进行决策 耗时:{0}", sw.ElapsedMilliseconds); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
通过PLINQ执行归约操作 PLINQ可以简化对一个序列或者一个组中所有成员应用一个函数的过程,这个过程称之为归约操作,如在PLINQ查询中使用类似于Average,Max,Min,Sum之类的聚合函数就可以充分利用并行所带来好处。 并行执行的规约和串行执行的规约的执行结果可能会不同,因为在操作不能同时满足可交换和可传递的情况下产生摄入,在每次执行的时候,序列或组中的元素在不同并行任务中分布可能也会有区别,因而在这种操作的情况下可能会产生不同的最终结果,因此,一定要通过对于的串行版本来兴义原始的数据源,这样才能帮助PLINQ获得最优的执行结果。 下面贴代码: class MRESDemo { /*code:释迦苦僧*/ static void Main() { ConcurrentQueue<int> products = new ConcurrentQueue<int>(); /*向集合中添加多条数据*/ Parallel.For(0, 6000000, (num) => { products.Enqueue(num); }); /*采用LINQ 返回 IEumerable<int>*/ var productListLinq = (from product in products select product).Average(); Console.WriteLine("采用Average计算平均值:{0}", productListLinq); /*采用PLINQ 返回 ParallelQuery<int>*/ var productListPLinq = (from product in products.AsParallel() select product).Average(); Console.WriteLine("采用Average计算平均值:{0}", productListPLinq); Console.ReadLine(); } } 如上述代码所示 在LINQ版本中,该方法会返回一个 IEumerable<int>,即调用 Eumerable.Range方法生成指定范围整数序列的结果, 如果想对特定数据源进行LINQ查询时,可以定义为 private IEquatable<int> products 如果想对特定数据源进行PLINQ查询时,可以定义为 private ParallelQuery<int> products
并发PLINQ任务 class MRESDemo { /*code:释迦苦僧*/ static void Main() { ConcurrentQueue<Product> products = new ConcurrentQueue<Product>(); /*向集合中添加多条数据*/ Parallel.For(0, 600000, (num) => { products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num }); }); CancellationTokenSource cts = new CancellationTokenSource(); /*创建tk1 任务 查询 符合 条件的数据*/ Task<ParallelQuery<Product>> tk1 = new Task<ParallelQuery<Product>>((ct) => { Console.WriteLine("开始执行 tk1 任务", products.Count); Console.WriteLine("tk1 任务中 数据结果集数量为:{0}", products.Count); var result = products.AsParallel().Where(p => p.Name.Contains("1") && p.Name.Contains("2")); return result; }, cts.Token); /*创建tk2 任务,在执行tk1任务完成 基于tk1的结果查询 符合 条件的数据*/ Task<ParallelQuery<Product>> tk2 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { Console.WriteLine("开始执行 tk2 任务", products.Count); Console.WriteLine("tk2 任务中 数据结果集数量为:{0}", tk.Result.Count()); var result = tk.Result.Where(p => p.Category.Contains("1") && p.Category.Contains("2")); return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); /*创建tk3 任务,在执行tk1任务完成 基于tk1的结果查询 符合 条件的数据*/ Task<ParallelQuery<Product>> tk3 = tk1.ContinueWith<ParallelQuery<Product>>((tk) => { Console.WriteLine("开始执行 tk3 任务", products.Count); Console.WriteLine(" |
请发表评论