chan是一个FIFO队列,chan分成两种类型同步和异步
同步的chan完成发送者和接受者之间手递手传递元素的过程,必须要求对方的存在才能完成一次发送或接受
异步的chan发送和接受都是基于chan的缓存,但当缓存队列填满后,发送者就会进入发送队列, 当缓存队列为空时,接受者就会接入等待队列。
chan的数据结构:
struct Hchan
{
uintgo qcount; // total data in the q
uintgo dataqsiz; // size of the circular q
uint16 elemsize;
uint16 pad; // ensures proper alignment of the buffer that follows Hchan in memory
bool closed;
Alg* elemalg; // interface for element type
uintgo sendx; // send index
uintgo recvx; // receive index
WaitQ recvq; // list of recv waiters
WaitQ sendq; // list of send waiters
Lock;
};
void
runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
{
SudoG *sg;
SudoG mysg;
G* gp;
int64 t0;
if(c == nil) {
USED(t);
if(pres != nil) {
*pres = false;
return;
}
runtime·park(nil, nil, "chan send (nil chan)");
return; // not reached
}
if(debug) {
runtime·printf("chansend: chan=%p; elem=", c);
c->elemalg->print(c->elemsize, ep);
runtime·prints("\n");
}
t0 = 0;
mysg.releasetime = 0;
if(runtime·blockprofilerate > 0) {
t0 = runtime·cputicks();
mysg.releasetime = -1;
}
runtime·lock(c);
if(raceenabled)
runtime·racereadpc(c, pc, runtime·chansend);
if(c->closed)
goto closed;
if(c->dataqsiz > 0)
goto asynch;
sg = dequeue(&c->recvq);
if(sg != nil) {
if(raceenabled)
racesync(c, sg);
runtime·unlock(c);
gp = sg->g;
gp->param = sg;
if(sg->elem != nil)
c->elemalg->copy(c->elemsize, sg->elem, ep);
if(sg->releasetime)
sg->releasetime = runtime·cputicks();
runtime·ready(gp);
if(pres != nil)
*pres = true;
return;
}
if(pres != nil) {
runtime·unlock(c);
*pres = false;
return;
}
mysg.elem = ep;
mysg.g = g;
mysg.selgen = NOSELGEN;
g->param = nil;
enqueue(&c->sendq, &mysg);
runtime·park(runtime·unlock, c, "chan send");
if(g->param == nil) {
runtime·lock(c);
if(!c->closed)
runtime·throw("chansend: spurious wakeup");
goto closed;
}
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
return;
asynch:
if(c->closed)
goto closed;
if(c->qcount >= c->dataqsiz) {
if(pres != nil) {
runtime·unlock(c);
*pres = false;
return;
}
mysg.g = g;
mysg.elem = nil;
mysg.selgen = NOSELGEN;
enqueue(&c->sendq, &mysg);
runtime·park(runtime·unlock, c, "chan send");
runtime·lock(c);
goto asynch;
}
if(raceenabled)
runtime·racerelease(chanbuf(c, c->sendx));
c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
if(++c->sendx == c->dataqsiz)
c->sendx = 0;
c->qcount++;
sg = dequeue(&c->recvq);
if(sg != nil) {
gp = sg->g;
runtime·unlock(c);
if(sg->releasetime)
sg->releasetime = runtime·cputicks();
runtime·ready(gp);
} else
runtime·unlock(c);
if(pres != nil)
*pres = true;
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
return;
closed:
runtime·unlock(c);
runtime·panicstring("send on closed channel");
}
- 判断队列类型,异步队列则转到5
- 从等待队列中获取等待队列中的接受者
- 如果取到接受者,则将对象直接传递给接受者,然后唤醒接受者,发送过程完成
- 如果未取到接受者,则将发送者enqueue到发送队列,发送者进入阻塞状态
- 异步队列首先判断队列缓存是否还有空间
- 如果缓存空间已满,则将发送者enqueue到发送队列,发送者进入阻塞状态
- 如果缓存空间未满,则将元素copy到缓存中,这时发送者就不会进入阻塞状态
- 尝试唤醒等待队列中的一个接受者
chan接受
void
runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received)
{
SudoG *sg;
SudoG mysg;
G *gp;
int64 t0;
if(debug)
runtime·printf("chanrecv: chan=%p\n", c);
if(c == nil) {
USED(t);
if(selected != nil) {
*selected = false;
return;
}
runtime·park(nil, nil, "chan receive (nil chan)");
return; // not reached
}
t0 = 0;
mysg.releasetime = 0;
if(runtime·blockprofilerate > 0) {
t0 = runtime·cputicks();
mysg.releasetime = -1;
}
runtime·lock(c);
if(c->dataqsiz > 0)
goto asynch;
if(c->closed)
goto closed;
sg = dequeue(&c->sendq);
if(sg != nil) {
if(raceenabled)
racesync(c, sg);
runtime·unlock(c);
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, sg->elem);
gp = sg->g;
gp->param = sg;
if(sg->releasetime)
sg->releasetime = runtime·cputicks();
runtime·ready(gp);
if(selected != nil)
*selected = true;
if(received != nil)
*received = true;
return;
}
if(selected != nil) {
runtime·unlock(c);
*selected = false;
return;
}
mysg.elem = ep;
mysg.g = g;
mysg.selgen = NOSELGEN;
g->param = nil;
enqueue(&c->recvq, &mysg);
runtime·park(runtime·unlock, c, "chan receive");
if(g->param == nil) {
runtime·lock(c);
if(!c->closed)
runtime·throw("chanrecv: spurious wakeup");
goto closed;
}
if(received != nil)
*received = true;
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
return;
asynch:
if(c->qcount <= 0) {
if(c->closed)
goto closed;
if(selected != nil) {
runtime·unlock(c);
*selected = false;
if(received != nil)
*received = false;
return;
}
mysg.g = g;
mysg.elem = nil;
mysg.selgen = NOSELGEN;
enqueue(&c->recvq, &mysg);
runtime·park(runtime·unlock, c, "chan receive");
runtime·lock(c);
goto asynch;
}
if(raceenabled)
runtime·raceacquire(chanbuf(c, c->recvx));
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, chanbuf(c, c->recvx));
c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
if(++c->recvx == c->dataqsiz)
c->recvx = 0;
c->qcount--;
sg = dequeue(&c->sendq);
if(sg != nil) {
gp = sg->g;
runtime·unlock(c);
if(sg->releasetime)
sg->releasetime = runtime·cputicks();
runtime·ready(gp);
} else
runtime·unlock(c);
if(selected != nil)
*selected = true;
if(received != nil)
*received = true;
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
return;
closed:
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, nil);
if(selected != nil)
*selected = true;
if(received != nil)
*received = false;
if(raceenabled)
runtime·raceacquire(c);
runtime·unlock(c);
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
}
- 判断队列类型,如果是异步队列则转到5
- 从发送队列获取接受者
- 如果取到接受者,则直接从接受者获取元素,并唤醒发送者,接受过程完成
- 如果未取到接受者,则将自身enqueue到等待队列,阻塞goroutine等待发送者唤醒
- 异步队列首先判断队列缓存中是否有元素
- 缓存为空时,则将自身enqueue到等待队列,阻塞goroutine等待发送者唤醒
- 缓存非空时,取出缓存中的第一个元素
- 然后尝试唤醒发送队列中的一个发送者
|
请发表评论