在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
package nsqd import ( "bytes" "encoding/binary" "encoding/json" "errors" "fmt" "io" "math" "math/rand" "net" "sync/atomic" "time" "unsafe" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/version" )
const maxTimeout = time.Hour const ( frameTypeResponse int32 = 0 frameTypeError int32 = 1 frameTypeMessage int32 = 2 )
var separatorBytes = []byte(" ") var heartbeatBytes = []byte("_heartbeat_") var okBytes = []byte("OK") type protocolV2 struct { ctx *context }
func (p *protocolV2) IOLoop(conn net.Conn) error { var err error var line []byte var zeroTime time.Time clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize // goroutine local state derived from client attributes // and avoid a potential race with IDENTIFY (where a client // could have changed or disabled said attributes) messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan for { if client.HeartbeatInterval > 0 { client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2)) } else { client.SetReadDeadline(zeroTime) } // ReadSlice does not allocate new space for the data each request // ie. the returned slice is only valid until the next call to it line, err = client.Reader.ReadSlice('\n') if err != nil { if err == io.EOF { err = nil } else { err = fmt.Errorf("failed to read command - %s", err) } break } // trim the '\n' line = line[:len(line)-1] // optionally trim the '\r' if len(line) > 0 && line[len(line)-1] == '\r' { line = line[:len(line)-1] } params := bytes.Split(line, separatorBytes) if p.ctx.nsqd.getOpts().Verbose { p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params) } var response []byte response, err = p.Exec(client, params) if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr != nil { p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { err = p.Send(client, frameTypeResponse, response) if err != nil { err = fmt.Errorf("failed to send response - %s", err) break } } } p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting ioloop", client) conn.Close() close(client.ExitChan) if client.Channel != nil { client.Channel.RemoveClient(client.ID) } return err }
func (p *protocolV2) SendMessage(client *clientV2, msg *Message, buf *bytes.Buffer) error { if p.ctx.nsqd.getOpts().Verbose { p.ctx.nsqd.logf("PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body) } buf.Reset() _, err := msg.WriteTo(buf) if err != nil { return err } err = p.Send(client, frameTypeMessage, buf.Bytes()) if err != nil { return err } return nil }
func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error { client.writeLock.Lock() var zeroTime time.Time if client.HeartbeatInterval > 0 { client.SetWriteDeadline(time.Now().Add(client.HeartbeatInterval)) } else { client.SetWriteDeadline(zeroTime) } _, err := protocol.SendFramedResponse(client.Writer, frameType, data) if err != nil { client.writeLock.Unlock() return err } if frameType != frameTypeMessage { err = client.Flush()
|
请发表评论