在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
前言在即将发布的 .NET 6 runtime 中,默认的线程池实现从 C++ 代码改为了 C#,更方便我们学习线程池的设计了。 新的线程池实现位于 通过设置环境变量 本文以 .NET 6 runtime 源码作为学习材料,对线程池的设计进行介绍。从目前的理解上来看,其整体的设计与原来 C++ 的实现并没有特别大的出入。 注意:
任务的调度线程池的待执行任务被存放在一个队列系统中。这个系统包括一个 全局队列,以及绑定在每一个 Worker Thread 上 的 本地队列 。而线程池中的每一个线程都在执行 在 调用不带 当 在线程池外的线程中调用,不管 基本调度单元本地队列和全局队列的元素类型被定义为 object,实际的任务类型分为两类,在从队列系统取到任务之后会判断类型并执行对应的方法。 IThreadPoolWorkItem 实现类的实例。/// <summary>Represents a work item that can be executed by the ThreadPool.</summary> public interface IThreadPoolWorkItem { void Execute(); } 执行 Execute 方法也就代表着任务的执行。
Taskclass Task { internal void InnerInvoke(); } 执行 InnerInvoke 会执行 Task 所包含的委托。 全局队列全局队列 是由 public static class ThreadPool { internal static readonly ThreadPoolWorkQueue s_workQueue = new ThreadPoolWorkQueue(); public static bool QueueUserWorkItem(WaitCallback callBack, object state) { object tpcallBack = new QueueUserWorkItemCallback(callBack!, state); s_workQueue.Enqueue(tpcallBack, forceGlobal: true); return true; } } internal sealed class ThreadPoolWorkQueue { // 全局队列 internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>(); // forceGlobal 为 true 时,push 到全局队列,否则就放到本地队列 public void Enqueue(object callback, bool forceGlobal); } 本地队列线程池中的每一个线程都会绑定一个 internal sealed class ThreadPoolWorkQueueThreadLocals { // 绑定在线程池线程上 [ThreadStatic] public static ThreadPoolWorkQueueThreadLocals threadLocals; // 持有全局队列的引用,以便能在需要的时候将任务转移到全局队列上 public readonly ThreadPoolWorkQueue workQueue; // 本地队列的直接维护者 public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue; public readonly Thread currentThread; public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) { workQueue = tpq; workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue(); // WorkStealingQueueList 会集中管理 workStealingQueue ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue); currentThread = Thread.CurrentThread; } // 提供将本地队列中的任务转移到全局队列中去的功能, // 当 ThreadPool 通过后文将会介绍的 HillClimbing 算法判断得出当前线程是多余的线程后, // 会调用此方法对任务进行转移 public void TransferLocalWork() { while (workStealingQueue.LocalPop() is object cb) { workQueue.Enqueue(cb, forceGlobal: true); } } ~ThreadPoolWorkQueueThreadLocals() { if (null != workStealingQueue) { // TransferLocalWork 真正的目的并非是为了在这里被调用,这边只是确保任务不会丢的 fallback 逻辑 TransferLocalWork(); ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue); } } } 偷窃机制这里思考一个问题,为什么本地队列的名字会被叫做 所有
同时 Thread3 也没活干了,但是全局队列中的任务被 Thread1 抢走了。这时候就会去 从 Thread2 的本地队列中抢 Thread2 的活。 Worker Thread 的生命周期管理接下来我们把格局放大,关注点从 Worker Thread 的打工日常转移到对它们的生命周期管理上来。 为了更方便的解释线程管理的机制,这边使用下面使用一些代码做演示。 线程注入实验
.NET 5 实验一 默认线程池配置static void Main(string[] args) { var sw = Stopwatch.StartNew(); var tcs = new TaskCompletionSource(); var tasks = new List<Task>(); for (int i = 1; i <= Environment.ProcessorCount * 2; i++) { int id = i; Console.WriteLine($"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tasks.Add(Task.Run(() => { Console.WriteLine($"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tcs.Task.Wait(); })); } tasks.Add(Task.Run(() => { Console.WriteLine($"Task SetResult | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tcs.SetResult(); })); Task.WaitAll(tasks.ToArray()); Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}"); } static int GetBusyThreads() { ThreadPool.GetAvailableThreads(out var available, out _); ThreadPool.GetMaxThreads(out var max, out _); return max - available; } 首先在代码在 .NET 5 环境中运行以下代码,CPU 逻辑核心数 12。 Loop Id: 01 | 0.000 | Busy Threads: 0 Loop Id: 02 | 0.112 | Busy Threads: 1 Loop Id: 03 | 0.112 | Busy Threads: 2 Loop Id: 04 | 0.113 | Busy Threads: 4 Loop Id: 05 | 0.113 | Busy Threads: 7 Loop Id: 06 | 0.113 | Busy Threads: 10 Loop Id: 07 | 0.113 | Busy Threads: 10 Task Id: 01 | 0.113 | Busy Threads: 11 Task Id: 02 | 0.113 | Busy Threads: 12 Task Id: 03 | 0.113 | Busy Threads: 12 Task Id: 07 | 0.113 | Busy Threads: 12 Task Id: 04 | 0.113 | Busy Threads: 12 Task Id: 05 | 0.113 | Busy Threads: 12 Loop Id: 08 | 0.113 | Busy Threads: 10 Task Id: 08 | 0.113 | Busy Threads: 12 Loop Id: 09 | 0.113 | Busy Threads: 11 Loop Id: 10 | 0.113 | Busy Threads: 12 Loop Id: 11 | 0.114 | Busy Threads: 12 Loop Id: 12 | 0.114 | Busy Threads: 12 Loop Id: 13 | 0.114 | Busy Threads: 12 Loop Id: 14 | 0.114 | Busy Threads: 12 Loop Id: 15 | 0.114 | Busy Threads: 12 Loop Id: 16 | 0.114 | Busy Threads: 12 Loop Id: 17 | 0.114 | Busy Threads: 12 Loop Id: 18 | 0.114 | Busy Threads: 12 Loop Id: 19 | 0.114 | Busy Threads: 12 Loop Id: 20 | 0.114 | Busy Threads: 12 Loop Id: 21 | 0.114 | Busy Threads: 12 Loop Id: 22 | 0.114 | Busy Threads: 12 Loop Id: 23 | 0.114 | Busy Threads: 12 Loop Id: 24 | 0.114 | Busy Threads: 12 Task Id: 09 | 0.114 | Busy Threads: 12 Task Id: 06 | 0.114 | Busy Threads: 12 Task Id: 10 | 0.114 | Busy Threads: 12 Task Id: 11 | 0.114 | Busy Threads: 12 Task Id: 12 | 0.114 | Busy Threads: 12 Task Id: 13 | 1.091 | Busy Threads: 13 Task Id: 14 | 1.594 | Busy Threads: 14 Task Id: 15 | 2.099 | Busy Threads: 15 Task Id: 16 | 3.102 | Busy Threads: 16 Task Id: 17 | 3.603 | Busy Threads: 17 Task Id: 18 | 4.107 | Busy Threads: 18 Task Id: 19 | 4.611 | Busy Threads: 19 Task Id: 20 | 5.113 | Busy Threads: 20 Task Id: 21 | 5.617 | Busy Threads: 21 Task Id: 22 | 6.122 | Busy Threads: 22 Task Id: 23 | 7.128 | Busy Threads: 23 Task Id: 24 | 7.632 | Busy Threads: 24 Task SetResult | 8.135 | Busy Threads: 25 Done: | 8.136 Task.Run 会把 Task 调度到线程池上执行,前 24 个 task 都会被阻塞住,直到第 25 个被执行。每次都会打印出当前线程池中正在执行任务的线程数(也就是创建完成的线程数)。 可以观察到以下结果:
.NET 5 实验二 调整 ThreadPool 设置在上面的代码最前面加入以下两行代码,继续在 .NET 5 环境运行一次。 ThreadPool.GetMinThreads(out int defaultMinThreads, out int completionPortThreads); Console.WriteLine($"DefaultMinThreads: {defaultMinThreads}"); ThreadPool.SetMinThreads(14, completionPortThreads); 运行结果如下 DefaultMinThreads: 12 Loop Id: 01 | 0.000 | Busy Threads: 0 Loop Id: 02 | 0.003 | Busy Threads: 1 Loop Id: 03 | 0.003 | Busy Threads: 2 Loop Id: 04 | 0.003 | Busy Threads: 5 Loop Id: 05 | 0.004 | Busy Threads: 8 Task Id: 01 | 0.004 | Busy Threads: 10 Task Id: 03 | 0.004 | Busy Threads: 10 Loop Id: 06 | 0.004 | Busy Threads: 10 Task Id: 02 | 0.004 | Busy Threads: 10 Task Id: 04 | 0.004 | Busy Threads: 10 Task Id: 05 | 0.004 | Busy Threads: 12 Loop Id: 07 | 0.004 | Busy Threads: 9 Loop Id: 08 | 0.004 | Busy Threads: 10 Loop Id: 09 | 0.004 | Busy Threads: 11 Loop Id: 10 | 0.004 | Busy Threads: 12 Task Id: 08 | 0.004 | Busy Threads: 14 Task Id: 06 | 0.004 | Busy Threads: 14 Task Id: 09 | 0.004 | Busy Threads: 14 Task Id: 10 | 0.004 | Busy Threads: 14 Loop Id: 11 | 0.004 | Busy Threads: 14 Loop Id: 12 | 0.004 | Busy Threads: 14 Loop Id: 13 | 0.004 | Busy Threads: 14 Loop Id: 14 | 0.004 | Busy Threads: 14 Loop Id: 15 | 0.004 | Busy Threads: 14 Loop Id: 16 | 0.004 | Busy Threads: 14 Loop Id: 17 | 0.004 | Busy Threads: 14 Loop Id: 18 | 0.004 | Busy Threads: 14 Loop Id: 19 | 0.004 | Busy Threads: 14 Loop Id: 20 | 0.004 | Busy Threads: 14 Loop Id: 21 | 0.004 | Busy Threads: 14 Loop Id: 22 | 0.004 | Busy Threads: 14 Task Id: 11 | 0.004 | Busy Threads: 14 Loop Id: 23 | 0.004 | Busy Threads: 14 Loop Id: 24 | 0.005 | Busy Threads: 14 Task Id: 07 | 0.005 | Busy Threads: 14 Task Id: 12 | 0.005 | Busy Threads: 14 Task Id: 13 | 0.005 | Busy Threads: 14 Task Id: 14 | 0.005 | Busy Threads: 14 Task Id: 15 | 0.982 | Busy Threads: 15 Task Id: 16 | 1.486 | Busy Threads: 16 Task Id: 17 | 1.991 | Busy Threads: 17 Task Id: 18 | 2.997 | Busy Threads: 18 Task Id: 19 | 3.501 | Busy Threads: 19 Task Id: 20 | 4.004 | Busy Threads: 20 Task Id: 21 | 4.509 | Busy Threads: 21 Task Id: 22 | 5.014 | Busy Threads: 22 Task Id: 23 | 5.517 | Busy Threads: 23 Task Id: 24 | 6.021 | Busy Threads: 24 Task SetResult | 6.522 | Busy Threads: 25 Done: | 6.523 在调整完线程池的最小线程数量之后,线程注入速度发生转折的时间点从第 12(默认min threads) 个线程换到了第 14(修改后的min threads)个线程。 整体时间也从 8s 缩到 6s。 .NET 5 实验三 tcs.Task.Wait() 改为 Thread.Sleepstatic void Main(string[] args) { var sw = Stopwatch.StartNew(); var tasks = new List<Task>(); for (int i = 1; i <= Environment.ProcessorCount * 2; i++) { int id = i; Console.WriteLine( $"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tasks.Add(Task.Run(() => { Console.WriteLine( $"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); Thread.Sleep(Environment.ProcessorCount * 1000); })); } Task.WhenAll(tasks.ToArray()).ContinueWith(_ => { Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}"); }); Console.ReadLine(); } Loop Id: 01 | 0.000 | Busy Threads: 0 Loop Id: 02 | 0.027 | Busy Threads: 1 Loop Id: 03 | 0.027 | Busy Threads: 2 Loop Id: 04 | 0.027 | Busy Threads: 3 Loop Id: 05 | 0.028 | Busy Threads: 4 Loop Id: 06 | 0.028 | Busy Threads: 10 Loop Id: 07 | 0.028 | Busy Threads: 9 Loop Id: 08 | 0.028 | Busy Threads: 9 Loop Id: 09 | 0.028 | Busy Threads: 10 Loop Id: 10 | 0.028 | Busy Threads: 12 Loop Id: 11 | 0.028 | Busy Threads: 12 Loop Id: 12 | 0.028 | Busy Threads: 12 Loop Id: 13 | 0.028 | Busy Threads: 12 Loop Id: 14 | 0.028 | Busy Threads: 12 Loop Id: 15 | 0.028 | Busy Threads: 12 Loop Id: 16 | 0.028 | Busy Threads: 12 Loop Id: 17 | 0.028 | Busy Threads: 12 Loop Id: 18 | 0.028 | Busy Threads: 12 Loop Id: 19 | 0.028 | Busy Threads: 12 Loop Id: 20 | 0.028 | Busy Threads: 12 Loop Id: 21 | 0.028 | Busy Threads: 12 Loop Id: 22 | 0.028 | Busy Threads: 12 Loop Id: 23 | 0.028 | Busy Threads: 12 Loop Id: 24 | 0.028 | Busy Threads: 12 Task Id: 01 | 0.029 | Busy Threads: 12 Task Id: 05 | 0.029 | Busy Threads: 12 Task Id: 03 | 0.029 | Busy Threads: 12 Task Id: 08 | 0.029 | Busy Threads: 12 Task Id: 09 | 0.029 | Busy Threads: 12 Task Id: 10 | 0.029 | Busy Threads: 12 Task Id: 06 | 0.029 | Busy Threads: 12 Task Id: 11 | 0.029 | Busy Threads: 12 Task Id: 12 | 0.029 | Busy Threads: 12 Task Id: 04 | 0.029 | Busy Threads: 12 Task Id: 02 | 0.029 | Busy Threads: 12 Task Id: 07 | 0.029 | Busy Threads: 12 Task Id: 13 | 1.018 | Busy Threads: 13 Task Id: 14 | 1.522 | Busy Threads: 14 Task Id: 15 | 2.025 | Busy Threads: 15 Task Id: 16 | 2.530 | Busy Threads: 16 Task Id: 17 | 3.530 | Busy Threads: 17 Task Id: 18 | 4.035 | Busy Threads: 18 Task Id: 19 | 4.537 | Busy Threads: 19 Task Id: 20 | 5.040 | Busy Threads: 20 Task Id: 21 | 5.545 | Busy Threads: 21 Task Id: 22 | 6.048 | Busy Threads: 22 Task Id: 23 | 7.049 | Busy Threads: 23 Task Id: 24 | 8.056 | Busy Threads: 24 Done: | 20.060 达到 min threads (默认12)之后,线程注入速度明显变慢,最快间隔 500ms。 .NET 6 实验一 默认 ThreadPool 设置将 .NET 5 实验一的代码在 .NET 6 执行一次 Loop Id: 01 | 0.001 | Busy Threads: 0 Loop Id: 02 | 0.018 | Busy Threads: 1 Loop Id: 03 | 0.018 | Busy Threads: 3 Loop Id: 04 | 0.018 | Busy Threads: 6 Loop Id: 05 | 0.018 | Busy Threads: 4 Loop Id: 06 | 0.018 | Busy Threads: 5 Loop Id: 07 | 0.018 | Busy Threads: 6 Loop Id: 08 | 0.018 | Busy Threads: 8 Task Id: 01 | 0.018 | Busy Threads: 11 Task Id: 04 | 0.018 | Busy Threads: 11 Task Id: 03 | 0.018 | Busy Threads: 11 Task Id: 02 | 0.018 | Busy Threads: 11 Task Id: 05 | 0.018 | Busy Threads: 11 Loop Id: 09 | 0.018 | Busy Threads: 12 Loop Id: 10 | 0.018 | Busy Threads: 12 Loop Id: 11 | 0.018 | Busy Threads: 12 Loop Id: 12 | 0.018 | Busy Threads: 12 Loop Id: 13 | 0.018 | Busy Threads: 12 Task Id: 09 | 0.018 | Busy Threads: 12 Loop Id: 14 | 0.018 | Busy Threads: 12 Loop Id: 15 | 0.018 | Busy Threads: 12 Loop Id: 16 | 0.018 | Busy Threads: 12 Loop Id: 17 | 0.018 | Busy Threads: 12 Task Id: 06 | 0.018 | Busy Threads: 12 Loop Id: 18 | 0.018 | Busy Threads: 12 Loop Id: 19 | 0.018 | Busy Threads: 12 Loop Id: 20 | 0.018 | Busy Threads: 12 Loop Id: 21 | 0.018 | Busy Threads: 12 Loop Id: 22 | 0.018 | Busy Threads: 12 Loop Id: 23 | 0.018 | Busy Threads: 12 Loop Id: 24 | 0.018 | Busy Threads: 12 Task Id: 10 | 0.018 | Busy Threads: 12 Task Id: 07 | 0.019 | Busy Threads: 12 Task Id: 11 | 0.019 | Busy Threads: 12 Task Id: 08 | 0.019 | Busy Threads: 12 Task Id: 12 | 0.019 | Busy Threads: 12 Task Id: 13 | 0.020 | Busy Threads: 16 Task Id: 14 | 0.020 | Busy Threads: 17 Task Id: 15 | 0.020 | Busy Threads: 18 Task Id: 16 | 0.020 | Busy Threads: 19 Task Id: 17 | 0.020 | Busy Threads: 20 Task Id: 18 | 0.020 | Busy Threads: 21 Task Id: 19 | 0.020 | Busy Threads: 22 Task Id: 20 | 0.020 | Busy Threads: 23 Task Id: 21 | 0.020 | Busy Threads: 24 Task Id: 23 | 0.020 | Busy Threads: 24 Task Id: 22 | 0.020 | Busy Threads: 24 Task Id: 24 | 0.020 | Busy Threads: 24 Task SetResult | 0.045 | Busy Threads: 25 Done: | 0.046 与实验一相比,虽然线程数仍然停留在 12 了一段时间,但随后线程就立即增长了,后文会介绍 .NET 6 在这方面做出的改进。 .NET 6 实验二 调整 ThreadPool 设置将 .NET 5 实验二的代码在 .NET 6 中执行一次 DefaultMinThreads: 12 Loop Id: 01 | 0.001 | Busy Threads: 0 Loop Id: 02 | 0.014 | Busy Threads: 1 Loop Id: 03 | 0.014 | Busy Threads: 2 Loop Id: 04 | 0.015 | Busy Threads: 5 Loop Id: 05 | 0.015 | Busy Threads: 4 Loop Id: 06 | 0.015 | Busy Threads: 5 Loop Id: 07 | 0.015 | Busy Threads: 7 Loop Id: 08 | 0.015 | Busy Threads: 8 Loop Id: 09 | 0.015 | Busy Threads: 11 Task Id: 06 | 0.015 | Busy Threads: 9 Task Id: 01 | 0.015 | Busy Threads: 9 Task Id: 02 | 0.015 | Busy Threads: 9 Task Id: 05 | 0.015 | Busy Threads: 9 Task Id: 03 | 0.015 | Busy Threads: 9 Task Id: 04 | 0.015 | Busy Threads: 9 Task Id: 07 | 0.015 | Busy Threads: 9 Task Id: 08 | 0.016 | Busy Threads: 9 Task Id: 09 | 0.016 | Busy Threads: 9 Loop Id: 10 | 0.016 | Busy Threads: 9 Loop Id: 11 | 0.016 | Busy Threads: 10 Loop Id: 12 | 0.016 | Busy Threads: 11 Loop Id: 13 | 0.016 | Busy Threads: 13 Task Id: 10 | 0.016 | Busy Threads: 14 Loop Id: 14 | 0.016 | Busy Threads: 14 Loop Id: 15 | 0.016 | Busy Threads: 14 Loop Id: 16 | 0.016 | Busy Threads: 14 Task Id: 11 | 0.016 | Busy Threads: 14 Loop Id: 17 | 0.016 | Busy Threads: 14 Loop Id: 18 | 0.016 | Busy Threads: 14 Loop Id: 19 | 0.016 | Busy Threads: 14 Loop Id: 20 | 0.016 | Busy Threads: 14 Loop Id: 21 | 0.016 | Busy Threads: 14 Loop Id: 22 | 0.016 | Busy Threads: 14 Loop Id: 23 | 0.016 | Busy Threads: 14 Loop Id: 24 | 0.016 | Busy Threads: 14 Task Id: 12 | 0.016 | Busy Threads: 14 Task Id: 13 | 0.016 | Busy Threads: 14 Task Id: 14 | 0.016 | Busy Threads: 14 Task Id: 15 | 0.017 | Busy Threads: 18 Task Id: 16 | 0.017 | Busy Threads: 19 Task Id: 17 | 0.017 | Busy Threads: 20 Task Id: 18 | 0.017 | Busy Threads: 21 Task Id: 19 | 0.017 | Busy Threads: 22 Task Id: 20 | 0.018 | Busy Threads: 23 Task Id: 21 | 0.018 | Busy Threads: 24 Task Id: 22 | 0.018 | Busy Threads: 25 Task Id: 23 | 0.018 | Busy Threads: 26 Task Id: 24 | 0.018 | Busy Threads: 26 Task SetResult | 0.018 | Busy Threads: 25 Done: | 0.019 前半部分有部分日志乱序,可以看到,与实验三一样,维持在最大线程数一小段时间之后,立即就开始了线程增长。 .NET 6 实验三 tcs.Task.Wait() 改为 Thread.Sleep将 .NET 5 实验三的代码在 .NET 6 中执行一次 Loop Id: 01 | 0.003 | Busy Threads: 0 Loop Id: 02 | 0.024 | Busy Threads: 1 Loop Id: 03 | 0.025 | Busy Threads: 2 Loop Id: 04 | 0.025 | Busy Threads: 3 Loop Id: 05 | 0.025 | Busy Threads: 7 Loop Id: 06 | 0.025 | Busy Threads: 5 Loop Id: 07 | 0.025 | Busy Threads: 6 Loop Id: 08 | 0.025 | Busy Threads: 7 Loop Id: 09 | 0.025 | Busy Threads: 9 Loop Id: 10 | 0.025 | Busy Threads: 10 Loop Id: 11 | 0.026 | Busy Threads: 10 Loop Id: 12 | 0.026 | Busy Threads: 11 Loop Id: 13 | 0.026 | Busy Threads: 12 Loop Id: 14 | 0.026 | Busy Threads: 12 Loop Id: 15 | 0.026 | Busy Threads: 12 Loop Id: 16 | 0.026 | Busy Threads: 12 Loop Id: 17 | 0.026 | Busy Threads: 12 Loop Id: 18 | 0.026 | Busy Threads: 12 Loop Id: 19 | 0.026 | Busy Threads: 12 Loop Id: 20 | 0.026 | Busy Threads: 12 Loop Id: 21 | 0.026 | Busy Threads: 12 Loop Id: 22 | 0.026 | Busy Threads: 12 Loop Id: 23 | 0.026 | Busy Threads: 12 Loop Id: 24 | 0.026 | Busy Threads: 12 Task Id: 01 | 0.026 | Busy Threads: 12 Task Id: 02 | 0.026 | Busy Threads: 12 Task Id: 05 | 0.026 | Busy Threads: 12 Task Id: 04 | 0.026 | Busy Threads: 12 Task Id: 06 | 0.026 | Busy Threads: 12 Task Id: 08 | 0.026 | Busy Threads: 12 Task Id: 09 | 0.026 | Busy Threads: 12 Task Id: 03 | 0.026 | Busy Threads: 12 Task Id: 11 | 0.026 | Busy Threads: 12 Task Id: 10 | 0.026 | Busy Threads: 12 Task Id: 07 | 0.026 | Busy Threads: 12 Task Id: 12 | 0.026 | Busy Threads: 12 Task Id: 13 | 1.026 | Busy Threads: 13 Task Id: 14 | 2.027 | Busy Threads: 14 Task Id: 15 | 3.028 | Busy Threads: 15 Task Id: 16 | 4.030 | Busy Threads: 16 Task Id: 17 | 5.031 | Busy Threads: 17 Task Id: 18 | 6.032 | Busy Threads: 18 Task Id: 19 | 6.533 | Busy Threads: 19 Task Id: 20 | 7.035 | Busy Threads: 20 Task Id: 21 | 8.036 | Busy Threads: 21 Task Id: 22 | 8.537 | Busy Threads: 22 Task Id: 23 | 9.538 | Busy Threads: 23 Task Id: 24 | 10.039 | Busy Threads: 24 Done: | 22.041 结果与 .NET 5 的实验三相差不大。 线程注入对照上述的几组实验结果,接下来以 .NET 6 中 C# 实现的 ThreadPool 作为资料来理解一下线程注入的几个阶段(按个人理解进行的划分,仅供参考)。 1. 第一个线程的出现随着任务被调度到队列上,第一个线程被创建出来。 下面是线程池在执行第一个任务的时候的代码摘要,涉及到计数的并执行相关处理的地方,代码都使用了 可利用 Rider 的反编译 Debug 功能帮助我们学习。 下面是第一个 注意:执行环节是 Main Thread public static class ThreadPool { internal static readonly ThreadPoolWorkQueue s_workQueue = new ThreadPoolWorkQueue(); public static bool QueueUserWorkItem(WaitCallback callBack, object state) { object tpcallBack = new QueueUserWorkItemCallback(callBack!, state); s_workQueue.Enqueue(tpcallBack, forceGlobal: true); return true; } } internal sealed class ThreadPoolWorkQueue { [StructLayout(LayoutKind.Sequential)] private struct CacheLineSeparated { private readonly Internal.PaddingFor32 pad1; public volatile int numOutstandingThreadRequests; private readonly Internal.PaddingFor32 pad2; } private CacheLineSeparated _separated; public void Enqueue(object callback, bool forceGlobal) { // 线程池中执行的任务有两种:IThreadPoolWorkItem、Task Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task)); if (loggingEnabled && FrameworkEventSource.Log.IsEnabled()) FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback); ThreadPoolWorkQueueThreadLocals? tl = null; if (!forceGlobal) // 获取本地队列,如果执行改代码的线程不是线程池线程, // 那这边是获取不到的,就算 forceGlobal 是 false, // 也会把任务放到全局队列 tl = ThreadPoolWorkQueueThreadLocals.threadLocals; if (null != tl) { // 放到本地队列 tl.workStealingQueue.LocalPush(callback); } else { // 当道全局队列 workItems.Enqueue(callback); } EnsureThreadRequested(); } internal void EnsureThreadRequested() { // // If we have not yet requested #procs threads, then request a new thread. // // CoreCLR: Note that there is a separate count in the VM which has already been incremented // by the VM by the time we reach this point. // int count = _separated.numOutstandingThreadRequests; while (count < Environment.ProcessorCount) { int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count); if (prev == count) { ThreadPool.RequestWorkerThread(); break; } count = prev; } } public static class ThreadPool { /// <summary> /// This method is called to request a new thread pool worker to handle pending work. /// </summary> internal static void RequestWorkerThread() => PortableThreadPool.ThreadPoolInstance.RequestWorker(); } internal sealed class PortableThreadPool { public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool(); internal void RequestWorker() { // The order of operations here is important. MaybeAddWorkingWorker() and EnsureRunning() use speculative checks to // do their work and the memory barrier from the interlocked operation is necessary in this case for correctness. Interlocked.Increment(ref _separated.numRequestedWorkers); WorkerThread.MaybeAddWorkingWorker(this); // 初始化 GateThread GateThread.EnsureRunning(this); } /// <summary> /// The worker thread infastructure for the CLR thread pool. /// </summary> private static class WorkerThread { internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance) { ThreadCounts counts = threadPoolInstance._separated.counts; short numExistingThreads, numProcessingWork, newNumExistingThreads, newNumProcessingWork; // 这个 while (true) 是确保计算出正确的待创建线程数 while (true) { numProcessingWork = counts.NumProcessingWork; if (numProcessingWork >= counts.NumThreadsGoal) { return; } newNumProcessingWork = (short)(numProcessingWork + 1); numExistingThreads = counts.NumExistingThreads; newNumExistingThreads = Math.Max(numExistingThreads, newNumProcessingWork); ThreadCounts newCounts = counts; newCounts.NumProcessingWork = newNumProcessingWork; newCounts.NumExistingThreads = newNumExistingThreads; ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); if (oldCounts == counts) { break; } counts = oldCounts; } int toCreate = newNumExistingThreads - numExistingThreads; int toRelease = newNumProcessingWork - numProcessingWork; if (toRelease > 0) { s_semaphore.Release(toRelease); } while (toCreate > 0) { if (TryCreateWorkerThread()) { toCreate--; continue; } counts = threadPoolInstance._separated.counts; while (true) { ThreadCounts newCounts = counts; newCounts.SubtractNumProcessingWork((short)toCreate); newCounts.SubtractNumExistingThreads((short)toCreate); ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); if (oldCounts == counts) { break; } counts = oldCounts; } break; } } private static bool TryCreateWorkerThread() { try { // Thread pool threads must start in the default execution context without transferring the context, so // using UnsafeStart() instead of Start() Thread workerThread = new Thread(s_workerThreadStart); workerThread.IsThreadPoolThread = true; workerThread.IsBackground = true; // thread name will be set in thread proc workerThread.UnsafeStart(); } catch (ThreadStartException) { return false; } catch (OutOfMemoryException) { return false; } return true; } } } } 2. 达到 min threads 之前的线程数增长细心的朋友会发现上面代码里 其实 min threads 初始值为 运行环境 CPU 核心数,可通过 PortableThreadPool里维护了一个计数器
internal class PortableThreadPool { public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool(); private CacheLineSeparated _separated; private struct CacheLineSeparated { public ThreadCounts counts; } /// <summary> /// Tracks information on the number of threads we want/have in different states in our thread pool. /// </summary> private struct ThreadCounts { /// <summary> /// Number of threads processing work items. /// </summary> public short NumProcessingWork { get; set; } /// <summary> /// Number of thread pool threads that currently exist. /// </summary> public short NumExistingThreads { get; set; } // <summary> /// Max possible thread pool threads we want to have. /// </summary> public short NumThreadsGoal { get; set; } } } 3. 避免饥饿机制(Starvation Avoidance)上面讲到,随着任务进入队列系统,Worker Thread 将随之增长,直到达到 NumThreadsGoal。
在这种情况下,线程池的 Starvation Avoidance 机制就起到作用了。 在上述所说的第一个阶段,除了线程池中的第一个线程会被创建之外,GateThread 也会随之被初始化。在第一阶段的代码摘录中,可以看到 GateThread 的初始化。 internal sealed class PortableThreadPool { public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool(); internal void RequestWorker() { Interlocked.Increment(ref _separated.numRequestedWorkers); WorkerThread.MaybeAddWorkingWorker(this); // 初始化 GateThread GateThread.EnsureRunning(this); } } 在 这就解释了,为什么 .NET 5 实验一、二在线程数达到min threads(NumThreadsGoal 的默认值)之后,后面 Worker Thread 的增长是每 由于在第三阶段中,线程的增长会比较缓慢,有经验的开发会在应用启动的时候设置一个较大的 min threads,使其较晚或不进入第三阶段。 线程注入在 .NET 6 中的改进.NET 6 与 .NET 5 的实验二相比,达到 min threads 之后,线程的增长速度有明显的差异,而两者的实验三却相差不大。 .NET 6 对于 Task.Wait 导致线程池线程阻塞的场景进行了优化,但如果并非此原因导致的线程数不够用,依旧是 Starvation Avoidance 的策略。 新的 ThreadPool 提供了一个 关键代码示意,非真实代码: internal class PortableThreadPool { public bool NotifyThreadBlocked() { // ... GateThread.Wake(this); return true; } private static class GateThread { private static readonly AutoResetEvent DelayEvent = new AutoResetEvent(initialState: false); // GateThread 入口方法 private static void GateThreadStart() { while(true) { DelayEvent.WaitOne(500); // ... } } public static void Wake(PortableThreadPool threadPoolInstance) { DelayEvent.Set(); EnsureRunning(threadPoolInstance); } } 爬山算法(Hill Climbing)除了上述介绍的线程注入机制外,从CLR 4.0开始,线程池内实现了一个根据采集到线程池吞吐率数据(每次任务完成时记录数据),推导出该算法认为最优的线程池线程数量。 算法实现位于 public (int newThreadCount, int newSampleMs) Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions)
不必要线程的销毁如果线程需要被移除的时候,本地队列还存在待执行任务,则会将这些任务转移到全局队列中。
参考资料https://www.codeproject.com/Articles/3813/NET-s-ThreadPool-Class-Behind-The-Scenes 到此这篇关于.NET 6线程池ThreadPool实现方法的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持极客世界。 |
请发表评论