using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AA { public class AsynQueue<T> { //队列是否正在处理数据 private int isProcessing; //有线程正在处理数据 private const int Processing = 1; //没有线程处理数据 private const int UnProcessing = 0; //队列是否可用 private volatile bool enabled = true; private Task currentTask; public event Action<T> ProcessItemFunction; public event EventHandler<EventArgs<Exception>> ProcessException; private ConcurrentQueue<T> queue; public AsynQueue() { queue = new ConcurrentQueue<T>(); Start(); } public int Count { get { return queue.Count; } } private void Start() { Thread process_Thread = new Thread(PorcessItem); process_Thread.IsBackground = true; process_Thread.Start(); } public void Enqueue(T items) { if (items == null) { throw new ArgumentException("items"); } queue.Enqueue(items); DataAdded(); } //数据添加完成后通知消费者线程处理 private void DataAdded() { if (enabled) { if (!IsProcessingItem()) { currentTask = Task.Factory.StartNew(ProcessItemLoop); } } } //判断是否队列有线程正在处理 private bool IsProcessingItem() { return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0); } private void ProcessItemLoop() { if (!enabled && queue.IsEmpty) { Interlocked.Exchange(ref isProcessing, 0); return; } T publishFrame; if (queue.TryDequeue(out publishFrame)) { try { ProcessItemFunction(publishFrame); } catch (Exception ex) { OnProcessException(ex); } } if (enabled && !queue.IsEmpty) { currentTask = Task.Factory.StartNew(ProcessItemLoop); } else { Interlocked.Exchange(ref isProcessing, UnProcessing); } } /// <summary> ///定时处理线程调用函数 ///主要是监视入队的时候线程 没有来的及处理的情况 /// </summary> private void PorcessItem(object state) { int sleepCount = 0; int sleepTime = 1000; while (enabled) { //如果队列为空则根据循环的次数确定睡眠的时间 if (queue.IsEmpty) { if (sleepCount == 0) { sleepTime = 1000; } else if (sleepCount <= 3) { sleepTime = 1000 * 3; } else { sleepTime = 1000 * 50; } sleepCount++; Thread.Sleep(sleepTime); } else { //判断是否队列有线程正在处理 if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0) { if (!queue.IsEmpty) { currentTask = Task.Factory.StartNew(ProcessItemLoop); } else { Interlocked.Exchange(ref isProcessing, 0); } sleepCount = 0; sleepTime = 1000; } } } } public void Flsuh() { Stop(); if (currentTask != null) { currentTask.Wait(); } while (!queue.IsEmpty) { try { T publishFrame; if (queue.TryDequeue(out publishFrame)) { ProcessItemFunction(publishFrame); } } catch (Exception ex) { OnProcessException(ex); } } currentTask = null; } public void Stop() { this.enabled = false; } private void OnProcessException(System.Exception ex) { var tempException = ProcessException; Interlocked.CompareExchange(ref ProcessException, null, null); if (tempException != null) { ProcessException(ex, new EventArgs<Exception>(ex)); } } [Serializable] public class EventArgs<T> : System.EventArgs { public T Argument; public EventArgs() : this(default(T)) { } public EventArgs(T argument) { Argument = argument; } } } }
|
请发表评论