在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
最近想在DIOCP中加入任务调度线程,DIOCP的工作线程作为生产者(producer)将接受到的数据对象,投递到任务调度线程中,然后统一进行分配。然而这一切都需要一个队列, 这几天都在关注无锁队列。
[队列]首先是一个队列,简单的队列就是,生产者把数据压入队列(push), 消费者通过队列Pop出数据进行处理。 简单的队列就是提供Push,和Pop函数。 我们用一个链表来存储数据。Head ->data01->data02...data_n->Tail, 每个数据块的结构如下 type PVarQueueBlock = ^TVarQueueBlock; TVarQueueBlock = packed record value:Variant; next:PVarQueueBlock; end;
1.在进行Push压入数据时压入将Tail.next指向新压入的数据块, 然后用新的数据块做Tail procedure TSimpleQueue.pushQueue(pvData: PVarQueueBlock); begin if FHead = nil then begin FHead = pvData; FTail := FHead; end else begin FTail.next := pvData; FTail := pvData; end; Inc(FCount); end;
2.在进行Pop数据时把Head数据块取出,然后用Head数据块指向的下一块当作Head. function TSimpleQueue.popQueue: PVarQueueBlock; var lvTemp, lvRet:PVarQueueBlock; begin lvTemp := FHead; if (lvTemp = nil) then begin //没有任何可以Pop出的值 Result := nil; exit; end; // FHead := FHead.next; Dec(FCount); Result :=lvTemp; end;
上面就是简单的队列
[无锁队列]上面的实现的队列在多线程情况下是不安全的。如果要在多并发下队列要进行加锁,在push和pop时加锁也是一种办法。可以直接用临界就可以了,但是我们要做的是无锁队列
首先记住多并发设计规则:决不要假设任何代码会连续执行
上面的push操作 FTail.next := pvData; FTail := pvData; 也许执行了FTail.next:=pvData后,会被另外的线程抢走,然后FTail进行了新的赋值,这样在进行FTail := pvData;这样整个数据链条就会被破坏。 如果这两行我们能一次行完成,这样就可以实现无锁操作了,这样我们需要引入原子操作.Interlocked中的函数。
说无锁其实不太确切,只是锁的粒度小了。我们是使用api的InterlockedCompareExchange函数来实现的。 查一下MSDN http://msdn.microsoft.com/en-us/library/windows/desktop/ms683560(v=vs.85).aspx LONG __cdecl InterlockedCompareExchange( _Inout_ LONG volatile *Destination, _In_ LONG Exchange, _In_ LONG Comparand ); Parameters
Return valueThe function returns the initial value of the Destination parameter.
大概解释一下。这个函数是比较后进行交换。第一个参数是要存放目的的数据,第二个是交换数据,第三个是比较数据(与第一个比较), 如果交换返回的数据和第三个参数一样。
这样就可以在push和pop一步完成。 这里贴出push的pop操作 procedure TVarQueue.pushQueue(pvData: PVarQueueBlock); var lvTemp:PVarQueueBlock; lvPointer:Pointer; begin while True do begin lvTemp := FTail; while lvTemp.next <> nil do lvTemp := lvTemp.next; if InterlockedCompareExchangePointer(Pointer(lvTemp.next), Pointer(pvData), nil) = nil then begin break; end; end; FTail := pvData; Inc(FCount); end; function TVarQueue.popQueue: PVarQueueBlock; var lvTemp, lvRet:PVarQueueBlock; lvPointer:Pointer; begin ///为了方便 队列中始终保留一个FHead数据块 /// 也就是说FHead指向的下一个数据块才是第一个数据块 /// while True do begin lvTemp := FHead; if (lvTemp = nil) or (lvTemp.next = nil) then begin //没有任何可以Pop出的值 Result := nil; exit; end; if InterlockedCompareExchangePointer(Pointer(FHead), lvTemp.next, lvTemp) = lvTemp then begin break; end; end; Dec(FCount); lvRet := lvTemp.next; Result := lvRet; lvTemp.next := nil; Dispose(lvTemp); //返回的是head.next end;
后续我会上传完整的代码到DIOCP项目中。 如有漏洞,敬请指出。欢迎假如DIOCP群讨论
|
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论