• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

C#并行编程-PLINQ:声明式数据并行-转载C#并行编程-PLINQ:声明式数据并行 ...

原作者: [db:作者] 来自: [db:来源] 收藏 邀请
 

目录

 

背景

通过LINQ可以方便的查询并处理不同的数据源,使用Parallel LINQ (PLINQ)来充分获得并行化所带来的优势。

PLINQ不仅实现了完整的LINQ操作符,而且还添加了一些用于执行并行的操作符,与对应的LINQ相比,通过PLINQ可以获得明显的加速,但是具体的加速效果还要取决于具体的场景,不过在并行化的情况下一段会加速。

如果一个查询涉及到大量的计算和内存密集型操作,而且顺序并不重要,那么加速会非常明显,然而,如果顺序很重要,那么加速就会受到影响。

 

AsParallel() 启用查询的并行化

下面贴代码,看下效果,详情见注释:

class MRESDemo
{
    /*code:释迦苦僧*/
    static void Main()
    {
        ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
        /*向集合中添加多条数据  可以修改数据量查看Linq和Plinq的性能*/
        Parallel.For(0, 600000, (num) =>
        {
            products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
        });


        /*采用LINQ查询符合条件的数据*/
        Stopwatch sw = new Stopwatch();
        sw.Restart();
        var productListLinq = from product in products
                              where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
                              select product;
        Console.WriteLine("采用Linq 查询得出数量为:{0}", productListLinq.Count());
        sw.Stop();
        Console.WriteLine("采用Linq 耗时:{0}", sw.ElapsedMilliseconds);


        /*采用PLINQ查询符合条件的数据*/
        sw.Restart();
        var productListPLinq = from product in products.AsParallel() /*AsParallel 试图利用运行时所有可用的逻辑内核,从而使运行的速度比串行的版本要快 但是需要注意开销所带来的性能损耗*/
                               where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
                               select product;
        Console.WriteLine("采用PLinq 查询得出数量为:{0}", productListPLinq.Count());
        sw.Stop();
        Console.WriteLine("采用PLinq 耗时:{0}", sw.ElapsedMilliseconds);
        Console.ReadLine();
    }
}
class Product
{
    public string Name { get; set; }
    public string Category { get; set; }
    public int SellPrice { get; set; }
}


当前模拟的数据量比较少,数据量越多,采用并行化查询的效果越明显

 

AsOrdered()与orderby
AsOrdered:保留查询的结果按源序列排序,在并行查询中,多条数据会被分在多个区域中进行查询,查询后再将多个区的数据结果合并到一个结果集中并按源序列顺序返回。

orderby:将返回的结果集按指定顺序进行排序

下面贴代码方便大家理解:

class MRESDemo
{
    /*code:释迦苦僧*/
    static void Main()
    {
        ConcurrentQueue<string> products = new ConcurrentQueue<string>();
        products.Enqueue("E");
        products.Enqueue("F");
        products.Enqueue("B");
        products.Enqueue("G");
        products.Enqueue("A");
        products.Enqueue("C");
        products.Enqueue("SS");
        products.Enqueue("D");

        /*不采用并行化  其数据输出结果  不做任何处理   */
        var productListLinq = from product in products
                               where (product.Length == 1)
                               select product;

        string appendStr = string.Empty;
        foreach (string str in productListLinq)
        {
            appendStr += str + " ";
        }
        Console.WriteLine("不采用并行化 输出:{0}", appendStr);

        /*不采用任何排序策略  其数据输出结果 是直接将分区数据结果合并起来 不做任何处理   */
        var productListPLinq = from product in products.AsParallel()
                               where (product.Length == 1)
                               select product;

         appendStr = string.Empty;
        foreach (string str in productListPLinq)
        {
            appendStr += str + " ";
        }
        Console.WriteLine("不采用AsOrdered 输出:{0}", appendStr);

        /*采用 AsOrdered 排序策略  其数据输出结果 是直接将分区数据结果合并起来 并按原始数据顺序排序*/
        var productListPLinq1 = from product in products.AsParallel().AsOrdered()
                                where (product.Length == 1)
                                select product;
        appendStr = string.Empty;
        foreach (string str in productListPLinq1)
        {
            appendStr += str + " ";
        }
        Console.WriteLine("采用AsOrdered 输出:{0}", appendStr);

        /*采用 orderby 排序策略  其数据输出结果 是直接将分区数据结果合并起来 并按orderby要求进行排序*/
        var productListPLinq2 = from product in products.AsParallel()
                                where (product.Length == 1)
                                orderby product
                                select product;
        appendStr = string.Empty;
        foreach (string str in productListPLinq2)
        {
            appendStr += str + " ";
        }
        Console.WriteLine("采用orderby 输出:{0}", appendStr);

        Console.ReadLine();
    }
}

在PLINQ查询中,AsOrdered()和orderby子句都会降低运行速度,所以如果顺序并不是必须的,那么在请求特定顺序的结果之前,将加速效果与串行执行的性能进行比较是非常重要的。

 

指定执行模式 WithExecutionMode

对串行化代码进行并行化,会带来一定的额外开销,Plinq查询执行并行化也是如此,在默认情况下,执行PLINQ查询的时候,.NET机制会尽量避免高开销的并行化算法,这些算法有可能会将执行的性能降低到地狱串行执行的性能。

.NET会根据查询的形态做出决策,并不开了数据集大小和委托执行的时间,不过也可以强制并行执行,而不用考虑执行引擎分析的结果,可以调用WithExecutionMode方法来进行设置。、

下面贴代码,方便大家理解

class MRESDemo
{
    /*code:释迦苦僧*/
    static void Main()
    {
        ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
        /*向集合中添加多条数据*/
        Parallel.For(0, 6000000, (num) =>
        {
            products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
        });


        /*采用并行化整个查询 查询符合条件的数据*/
        Stopwatch sw = new Stopwatch();
        sw.Restart();
        var productListLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                              where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
                              select product;
        Console.WriteLine("采用并行化整个查询 查询得出数量为:{0}", productListLinq.Count());
        sw.Stop();
        Console.WriteLine("采用并行化整个查询 耗时:{0}", sw.ElapsedMilliseconds);


        /*采用默认设置 由.NET进行决策 查询符合条件的数据*/
        sw.Restart();
        var productListPLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.Default)
                               where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
                               select product;
        Console.WriteLine("采用默认设置 由.NET进行决策 查询得出数量为:{0}", productListPLinq.Count());
        sw.Stop();
        Console.WriteLine("采用默认设置 由.NET进行决策 耗时:{0}", sw.ElapsedMilliseconds);
        Console.ReadLine();
    }
}
class Product
{
    public string Name { get; set; }
    public string Category { get; set; }
    public int SellPrice { get; set; }
}

 

通过PLINQ执行归约操作

PLINQ可以简化对一个序列或者一个组中所有成员应用一个函数的过程,这个过程称之为归约操作,如在PLINQ查询中使用类似于Average,Max,Min,Sum之类的聚合函数就可以充分利用并行所带来好处。

并行执行的规约和串行执行的规约的执行结果可能会不同,因为在操作不能同时满足可交换和可传递的情况下产生摄入,在每次执行的时候,序列或组中的元素在不同并行任务中分布可能也会有区别,因而在这种操作的情况下可能会产生不同的最终结果,因此,一定要通过对于的串行版本来兴义原始的数据源,这样才能帮助PLINQ获得最优的执行结果。

下面贴代码:

class MRESDemo
{
    /*code:释迦苦僧*/
    static void Main()
    {
        ConcurrentQueue<int> products = new ConcurrentQueue<int>();
        /*向集合中添加多条数据*/
        Parallel.For(0, 6000000, (num) =>
        {
            products.Enqueue(num);
        });

        /*采用LINQ 返回 IEumerable<int>*/
        var productListLinq = (from product in products
                                select product).Average();
        Console.WriteLine("采用Average计算平均值:{0}", productListLinq);

        /*采用PLINQ 返回 ParallelQuery<int>*/
        var productListPLinq = (from product in products.AsParallel()
                                select product).Average();
        Console.WriteLine("采用Average计算平均值:{0}", productListPLinq);
        Console.ReadLine();
    }
}

如上述代码所示

在LINQ版本中,该方法会返回一个 IEumerable<int>,即调用 Eumerable.Range方法生成指定范围整数序列的结果,
在PLINQ版本中,该方法会返回一个 ParallelQuery<int>,即调用并行版本中System.Linq.ParallelEumerable的ParallelEumerable.Range方法,通过这种方法得到的结果序列也是并行序列,可以再PLINQ中并行运行。

如果想对特定数据源进行LINQ查询时,可以定义为  private IEquatable<int> products

如果想对特定数据源进行PLINQ查询时,可以定义为 private ParallelQuery<int> products

 

并发PLINQ任务

class MRESDemo
{
    /*code:释迦苦僧*/
    static void Main()
    {
        ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
        /*向集合中添加多条数据*/
        Parallel.For(0, 600000, (num) =>
        {
            products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
        });
        CancellationTokenSource cts = new CancellationTokenSource();
        /*创建tk1 任务  查询 符合 条件的数据*/
        Task<ParallelQuery<Product>> tk1 = new Task<ParallelQuery<Product>>((ct) =>
        {
            Console.WriteLine("开始执行 tk1 任务", products.Count);
            Console.WriteLine("tk1 任务中 数据结果集数量为:{0}", products.Count);
            var result = products.AsParallel().Where(p => p.Name.Contains("1") && p.Name.Contains("2"));
            return result;
        }, cts.Token);

        /*创建tk2 任务,在执行tk1任务完成  基于tk1的结果查询 符合 条件的数据*/
        Task<ParallelQuery<Product>> tk2 = tk1.ContinueWith<ParallelQuery<Product>>((tk) =>
        {
            Console.WriteLine("开始执行 tk2 任务", products.Count);
            Console.WriteLine("tk2 任务中 数据结果集数量为:{0}", tk.Result.Count());
            var result = tk.Result.Where(p => p.Category.Contains("1") && p.Category.Contains("2"));
            return result;
        }, TaskContinuationOptions.OnlyOnRanToCompletion);
        /*创建tk3 任务,在执行tk1任务完成  基于tk1的结果查询 符合 条件的数据*/
        Task<ParallelQuery<Product>> tk3 = tk1.ContinueWith<ParallelQuery<Product>>((tk) =>
        {
            Console.WriteLine("开始执行 tk3 任务", products.Count);
            Console.WriteLine(" 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
C#使用XSLT转换XML为HTML时的一个低级错误发布时间:2022-07-13
下一篇:
c#使用字符串的像素大小来绘制文本发布时间:2022-07-13
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap