ConcurrentQueue<T>队列是一个高效的线程安全的队列,是.Net Framework 4.0,System.Collections.Concurrent命名空间下的一个数据结构。
ConcurrentQueue<T>数据结构
下图是ConcurrentQueue<T>数据结构的示意图:
ConcurrentQueue<T>队列由若干Segment动态构成,每个Segment是一块连续的内存Buffer,大小固定为SEGMENT_SIZE。
ConcurrentQueue<T>私有成员变量
ConcurrentQueue<T>类有三个私有成员变量:
Segment* volatile m_head;
Segment* volatile m_tail;
Segment* volatile m_base;
m_head指向第一个segment,m_tail指向最后一个segment。这两个指针指向的对象,随着入队列和出队列操作而不断变化。
m_base指针固定指向ConcurrentQueue<T>实例化的第一个Segment,在析构ConcurrentQueue<T>对象时使用。
ConcurrentQueue<T>成员方法
void Enqueue(T item)
1
2
3
4
5
6
7
8
|
void Enqueue(T item)
{
DNetSpinWait wait;
while (!m_tail->TryAppend(item, &m_tail))
{
wait.SpinOnce();
}
}
|
1
|
从m_tail指向的segment中,加入item的值,直到成功加入,函数返回。
|
该函数会在分配了新的segment后,更新m_tail指针。
bool TryDequeue(T* result)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
bool TryDequeue(T* result)
{
while (!IsEmpty())
{
if (m_head->TryRemove(result, &m_head))
{
return true ;
}
}
result = NULL;
return false ;
}
|
如果当前队列为空,返回false,否则返回队列的第一个元素。
bool TryPeek(T* result)
1
2
3
4
5
6
7
8
9
10
11
12
|
bool TryPeek(T* result)
{
while (!IsEmpty())
{
if (m_head->TryPeek(result))
{
return true ;
}
}
result = NULL;
return false ;
}
|
跟TryDequeue()方法相似。
int Count()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
int Count()
{
Segment* segment;
Segment* segment2;
int num;
int num2;
GetHeadTailPositions(&segment, &segment2, &num, &num2);
if (segment == segment2)
{
return ((num2 - num) + 1);
}
int num3 = SEGMENT_SIZE - num;
num3 += SEGMENT_SIZE * ((( int ) (segment2->GetIndex() - segment->GetIndex())) - 1);
return (num3 + (num2 + 1));
}
|
通过得到当前首尾的segment指针,以及首指针的m_low索引,以及尾指针的m_high索引,计算当前队列中元素的个数。
该方法用到了GetHeadTailPositions方法。
bool IsEmpty()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
bool IsEmpty()
{
Segment* head = m_head;
if (head->IsEmpty())
{
if (head->GetNext() == NULL)
{
return true ;
}
DNetSpinWait wait;
while (head->IsEmpty())
{
if (head->GetNext() == NULL)
{
return true ;
}
wait.SpinOnce();
head = m_head;
}
}
return false ;
}
|
判定当前队列为空有两个条件,第一,m_head指向的segment为空;第二,m_head->GetNext()也为空,即m_head和m_tail指向同一个segment。
void Reset()
1
2
3
4
5
|
void Reset()
{
DeleteNodes();
m_base = m_head = m_tail = new Segment(0);
}
|
重置ConcurrentQueue<T>对象,删除已经分配了的segment,并重新更新成员变量的值。
void GetHeadTailPositions(Segment** head, Segment** tail, int* headLow, int* tailHigh)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
void GetHeadTailPositions(Segment** head, Segment** tail, int * headLow, int * tailHigh)
{
*head = m_head;
*tail = m_tail;
*headLow = (*head)->GetLow();
*tailHigh = (*tail)->GetHigh();
DNetSpinWait wait;
while ((((*head != m_head) || (*tail != m_tail)) ||
((*headLow != (*head)->GetLow()) || (*tailHigh != (*tail)->GetHigh()))) ||
((*head)->GetIndex() > (*tail)->GetIndex()))
{
wait.SpinOnce();
*head = m_head;
*tail = m_tail;
*headLow = (*head)->GetLow();
*tailHigh = (*tail)->GetHigh();
}
}
|
该函数就是将队列当前的m_head, m_tail指针以及m_head的m_low索引,m_tail的m_high索引取出来,放到线程栈上。并且在取出这些值后,再判断这些值是否合法。
ConcurrentQueue<T>队列是一个高效的线程安全的队列,是.Net Framework 4.0,System.Collections.Concurrent命名空间下的一个数据结构。
ConcurrentQueue<T>数据结构
下图是ConcurrentQueue<T>数据结构的示意图:
ConcurrentQueue<T>队列由若干Segment动态构成,每个Segment是一块连续的内存Buffer,大小固定为SEGMENT_SIZE。
ConcurrentQueue<T>私有成员变量
ConcurrentQueue<T>类有三个私有成员变量:
Segment* volatile m_head;
Segment* volatile m_tail;
Segment* volatile m_base;
m_head指向第一个segment,m_tail指向最后一个segment。这两个指针指向的对象,随着入队列和出队列操作而不断变化。
m_base指针固定指向ConcurrentQueue<T>实例化的第一个Segment,在析构ConcurrentQueue<T>对象时使用。
ConcurrentQueue<T>成员方法
void Enqueue(T item)
1
2
3
4
5
6
7
8
|
void Enqueue(T item)
{
DNetSpinWait wait;
while (!m_tail->TryAppend(item, &m_tail))
{
wait.SpinOnce();
}
}
|
1
|
从m_tail指向的segment中,加入item的值,直到成功加入,函数返回。
|
该函数会在分配了新的segment后,更新m_tail指针。
bool TryDequeue(T* result)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
bool TryDequeue(T* result)
{
while (!IsEmpty())
{
if (m_head->TryRemove(result, &m_head))
{
return true ;
}
}
result = NULL;
return false ;
}
|
如果当前队列为空,返回false,否则返回队列的第一个元素。
bool TryPeek(T* result)
1
2
3
4
5
6
7
8
9
10
11
12
|
bool TryPeek(T* result)
{
while (!IsEmpty())
{
if (m_head->TryPeek(result))
{
return true ;
}
}
result = NULL;
return false ;
}
|
跟TryDequeue()方法相似。
int Count()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
int Count()
{
Segment* segment;
Segment* segment2;
int num;
int num2;
GetHeadTailPositions(&segment, &segment2, &num, &num2);
if (segment == segment2)
{
return ((num2 - num) + 1);
}
int num3 = SEGMENT_SIZE - num;
num3 += SEGMENT_SIZE * ((( int ) (segment2->GetIndex() - segment->GetIndex())) - 1);
return (num3 + (num2 + 1));
}
|
通过得到当前首尾的segment指针,以及首指针的m_low索引,以及尾指针的m_high索引,计算当前队列中元素的个数。
该方法用到了GetHeadTailPositions方法。
bool IsEmpty()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
bool IsEmpty()
{
Segment* head = m_head;
if (head->IsEmpty())
{
if (head->GetNext() == NULL)
{
return true ;
}
DNetSpinWait wait;
while (head->IsEmpty())
{
if (head->GetNext() == NULL)
{
return true ;
}
wait.SpinOnce();
head = m_head;
}
}
return false ;
}
|
判定当前队列为空有两个条件,第一,m_head指向的segment为空;第二,m_head->GetNext()也为空,即m_head和m_tail指向同一个segment。
void Reset()
1
2
3
4
5
|
void Reset()
{
DeleteNodes();
m_base = m_head = m_tail = new Segment(0);
}
|
重置ConcurrentQueue<T>对象,删除已经分配了的segment,并重新更新成员变量的值。
void GetHeadTailPositions(Segment** head, Segment** tail, int* headLow, int* tailHigh)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
void GetHeadTailPositions(Segment** head, Segment** tail, int * headLow, int * tailHigh)
{
*head = m_head;
*tail = m_tail;
*headLow = (*head)->GetLow();
*tailHigh = (*tail)->GetHigh();
DNetSpinWait wait;
while ((((*head != m_head) || (*tail != m_tail)) ||
((*headLow != (*head)->GetLow()) || (*tailHigh != (*tail)->GetHigh()))) ||
((*head)->GetIndex() > (*tail)->GetIndex()))
{
wait.SpinOnce();
*head = m_head;
*tail = m_tail;
*headLow = (*head)->GetLow();
*tailHigh = (*tail)->GetHigh();
}
}
|
该函数就是将队列当前的m_head, m_tail指针以及m_head的m_low索引,m_tail的m_high索引取出来,放到线程栈上。并且在取出这些值后,再判断这些值是否合法。
|
请发表评论