本文透過.NET4.5的ThreadPool原始碼的分析講解揭示.NET執行緒池的內幕,並總結ThreadPool設計的好與不足。
線程池的作用
線程池,顧名思義,線程物件池。 Task和TPL都有用到線程池,所以了解線程池的內幕有助於你寫出更好的程式。由於篇幅有限,這裡我只講解以下核心
概念:
執行緒池的大小
如何呼叫執行緒池新增任務
線程池的大小
不管什麼池,總是有尺寸,ThreadPool也不例外。 ThreadPool提供了4個方法來調整執行緒池的大小:
那麼最小數又是為啥?執行緒池就是執行緒的物件池,物件池的最大的用處是重複使用物件。為啥要重複使用線程,因為線程的創建與銷毀都要佔用大量的cpu時間。所以在高並發狀態下,線程池由於無需創建銷毀線程節約了大量時間,提高了系統的響應能力和吞吐量。最小數可以讓你調整最小的存活線程數量來應付不同的高並發場景。
如何呼叫執行緒池新增任務
執行緒池主要提供了2個方法來呼叫:QueueUserWorkItem和UnsafeQueueUserWorkItem。
兩個方法的程式碼基本上一致,除了attribute不同,QueueUserWorkItem可以被partial trust的程式碼調用,而UnsafeQueueUserWorkItem只能被full trust的程式碼調用。
public static bool QueueUserWorkItem(WaitCallback callBack) { StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; return ThreadPool.QueueUserWorkItemHelper(callBack, (object) null, ref stackMark, true); }
QueueUserWorkItemHelper首先呼叫ThreadPool.EnsureVMInitialized()來確保CLR虛擬機初始化(VM是一個統稱,不是單指java虛擬機,也可以指CLR的execution engine),緊接著實例化ThreadPoolWorkQuquePoolQuquejue方法並傳入callback和true。
SecurityCritical] public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) { ThreadPoolWorkQueueThreadLocals queueThreadLocals = (ThreadPoolWorkQueueThreadLocals) null; if (!forceGlobal) queueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals; if (this.loggingEnabled) FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback); if (queueThreadLocals != null) { queueThreadLocals.workStealingQueue.LocalPush(callback); } else { ThreadPoolWorkQueue.QueueSegment comparand = this.queueHead; while (!comparand.TryEnqueue(callback)) { Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null); for (; comparand.Next != null; comparand = this.queueHead) Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, comparand.Next, comparand); } } this.EnsureThreadRequested(); }
public QueueSegment() { this.nodes = new IThreadPoolWorkItem[256]; } public bool TryEnqueue(IThreadPoolWorkItem node) { int upper; int lower; this.GetIndexes(out upper, out lower); while (upper != this.nodes.Length) { if (this.CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower)) { Volatile.Write<IThreadPoolWorkItem>(ref this.nodes[upper], node); return true; } } return false; }
執行緒被調度後透過ThreadPoolWorkQueue的Dispatch方法來執行callback。
internal static bool Dispatch() { ThreadPoolWorkQueue threadPoolWorkQueue = ThreadPoolGlobals.workQueue; int tickCount = Environment.TickCount; threadPoolWorkQueue.MarkThreadRequestSatisfied(); threadPoolWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, (EventKeywords) 18); bool flag1 = true; IThreadPoolWorkItem callback = (IThreadPoolWorkItem) null; try { ThreadPoolWorkQueueThreadLocals tl = threadPoolWorkQueue.EnsureCurrentThreadHasQueue(); while ((long) (Environment.TickCount - tickCount) < (long) ThreadPoolGlobals.tpQuantum) { try { } finally { bool missedSteal = false; threadPoolWorkQueue.Dequeue(tl, out callback, out missedSteal); if (callback == null) flag1 = missedSteal; else threadPoolWorkQueue.EnsureThreadRequested(); } if (callback == null) return true; if (threadPoolWorkQueue.loggingEnabled) FrameworkEventSource.Log.ThreadPoolDequeueWorkObject((object) callback); if (ThreadPoolGlobals.enableWorkerTracking) { bool flag2 = false; try { try { } finally { ThreadPool.ReportThreadStatus(true); flag2 = true; } callback.ExecuteWorkItem(); callback = (IThreadPoolWorkItem) null; } finally { if (flag2) ThreadPool.ReportThreadStatus(false); } } else { callback.ExecuteWorkItem(); callback = (IThreadPoolWorkItem) null; } if (!ThreadPool.NotifyWorkItemComplete()) return false; } return true; } catch (ThreadAbortException ex) { if (callback != null) callback.MarkAborted(ex); flag1 = false; } finally { if (flag1) threadPoolWorkQueue.EnsureThreadRequested(); } return true; }
while語句判斷如果執行時間少於30ms會不斷繼續執行下一個callback。這是因為大多數機器線程切換大概在30ms,如果該線程只執行了不到30ms就在等待中斷線程切換那就太浪費CPU了,浪費可恥啊!
Dequeue負責找到需要執行的callback:
public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal) { callback = (IThreadPoolWorkItem) null; missedSteal = false; ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue1 = tl.workStealingQueue; workStealingQueue1.LocalPop(out callback); if (callback == null) { for (ThreadPoolWorkQueue.QueueSegment comparand = this.queueTail; !comparand.TryDequeue(out callback) && comparand.Next != null && comparand.IsUsedUp(); comparand = this.queueTail) Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueTail, comparand.Next, comparand); } if (callback != null) return; ThreadPoolWorkQueue.WorkStealingQueue[] current = ThreadPoolWorkQueue.allThreadQueues.Current; int num = tl.random.Next(current.Length); for (int length = current.Length; length > 0; --length) { ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue2 = Volatile.Read<ThreadPoolWorkQueue.WorkStealingQueue>(ref current[num % current.Length]); if (workStealingQueue2 != null && workStealingQueue2 != workStealingQueue1 && workStealingQueue2.TrySteal(out callback, ref missedSteal)) break; ++num; } }
public bool TryDequeue(out IThreadPoolWorkItem node) { int upper; int lower; this.GetIndexes(out upper, out lower); while (lower != upper) { // ISSUE: explicit reference operation // ISSUE: variable of a reference type int& prevUpper = @upper; // ISSUE: explicit reference operation int newUpper = ^prevUpper; // ISSUE: explicit reference operation // ISSUE: variable of a reference type int& prevLower = @lower; // ISSUE: explicit reference operation int newLower = ^prevLower + 1; if (this.CompareExchangeIndexes(prevUpper, newUpper, prevLower, newLower)) { SpinWait spinWait = new SpinWait(); while ((node = Volatile.Read<IThreadPoolWorkItem>(ref this.nodes[lower])) == null) spinWait.SpinOnce(); this.nodes[lower] = (IThreadPoolWorkItem) null; return true; } } node = (IThreadPoolWorkItem) null; return false; }
使用自旋锁和内存读屏障来避免内核态和用户态的切换,提高了获取callback的性能。如果还是没有callback,那么就从所有的local work queue里随机选取一个,然后在该local work queue里“偷取”一个任务(callback)。
拿到callback后执行callback.ExecuteWorkItem(),通知完成。
总结
ThreadPool提供了方法调整线程池最少活跃的线程来应对不同的并发场景。ThreadPool带有2个work queue,一个golbal一个local。
执行时先从local找任务,接着去global,最后才会去随机选取一个local偷一个任务,其中global是FIFO的执行顺序。
Work queue实际上是数组,使用了大量的自旋锁和内存屏障来提高性能。但是在偷取任务上,是否可以考虑得更多,随机选择一个local太随意。
首先要考虑偷取的队列上必须有可执行任务;其次可以选取一个不在调度中的线程的local work queue,这样降低了自旋锁的可能性,加快了偷取的速度;最后,偷取的时候可以考虑像golang一样偷取别人queue里一半的任务,因为执行完偷到的这一个任务之后,下次该线程再次被调度到还是可能没任务可执行,还得去偷取别人的任务,这样既浪费CPU时间,又让任务在线程上分布不均匀,降低了系统吞吐量!
另外,如果禁用log和ETW trace,可以使ThreadPool的性能更进一步。
以上就是.NET编程之线程池内幕的内容,更多相关内容请关注PHP中文网(m.sbmmt.com)!