本篇将解读nsqlookup处理tcp请求的核心代码文件lookup_protocol_v1.go。
|
package nsqlookupd import ( "bufio" "encoding/binary" "encoding/json" "fmt" "io" "log" "net" "os" "strings" "time" "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" ) type LookupProtocolV1 struct { context *Context } //实现util\Protocol.go中定义的Protocol的接口的IOLoop方法 func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { var err error var line string //在nsqlookupd\client_v1.go中定义了NewClientV1方法 client := NewClientV1(conn) err = nil //此处需要注意为何NewReader可以传入client作为参数。 //打开client_v1.go可以看到,其中嵌入了net.Conn,用JAVA的思想可以说,ClientV1是继承自net.Conn的。 //那接下来的问题是:查官方文档http://golang.org/pkg/bufio/#NewReader //NewReader的参数类型为io.Reader,这和net.Conn也不同啊 //为一探究竟,我们打开go的源码。分别打开go源码下src\pkg\io\io.go和src\pkg\net\net.go //发现io.Reader是一个接口,其中有一个方法 Read(p []byte) (n int, err error) //net.Conn也是一个接口,下面有很多方法,其中一个是 Read(b []byte) (n int, err error) //可以看出,这两个方法的参数是完全一样的。即net.Conn里的方法完全能覆盖io.Reader里定义的方法 //插播一段关于go接口的描述:所谓Go语言式的接口,就是不用显示声明类型T实现了接口I,只要类型T的公开方法完全满足接口I的要求,就可以把类型T的对象用在需要接口I的地方。这种做法的学名叫做Structural Typing //所以我们这里可以传入client作为参数 reader := bufio.NewReader(client) for { //每次读取一行数据 line, err = reader.ReadString('\n') if err != nil { break } //去掉两边的空格 line = strings.TrimSpace(line) //将数据用空格分割成数组,根据后面的代码可看出,第一个参数是动作类型,包括四种:PING IDENTIFY REGISTER UNREGISTER params := strings.Split(line, " ") //调用LookupProtocolV1的Exec方法 response, err := p.Exec(client, reader, params) if err != nil { context := "" if parentErr := err.(util.ChildErr).Parent(); parentErr != nil { context = " - " + parentErr.Error() } log.Printf("ERROR: [%s] - %s%s", client, err.Error(), context) //返回错误给客户端,SendResponse方法在util\Protocol.go中定义 _, err = util.SendResponse(client, []byte(err.Error())) if err != nil { break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*util.FatalClientErr); ok { break } continue } //Exec方法返回了响应数据,将响应发送到客户端 if response != nil { _, err = util.SendResponse(client, response) //响应发送出错就退出 if err != nil { break } } } //如果前面的for循环退出了,则表示程序要退出了,将注册信息都从RegistrationDB中删除 log.Printf("CLIENT(%s): closing", client) if client.peerInfo != nil { registrations := p.context.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) for _, r := range registrations { if removed, _ := p.context.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { log.Printf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, r.Category, r.Key, r.SubKey) } } } return err } //请求分发,根据每行数据的第一个参数,调用不同的方法 func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { |
全部评论
请发表评论