• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

protocol_v2.go

原作者: [db:作者] 来自: [db:来源] 收藏 邀请
package nsqd

import (
    "bytes"
    "encoding/binary"
    "encoding/json"
    "errors"
    "fmt"
    "io"
    "math"
    "math/rand"
    "net"
    "sync/atomic"
    "time"
    "unsafe"

    "github.com/nsqio/nsq/internal/protocol"
    "github.com/nsqio/nsq/internal/version"
)

const maxTimeout = time.Hour

const (
    frameTypeResponse int32 = 0
    frameTypeError    int32 = 1
    frameTypeMessage  int32 = 2
)

var separatorBytes = []byte(" ")
var heartbeatBytes = []byte("_heartbeat_")
var okBytes = []byte("OK")

type protocolV2 struct {
    ctx *context
}

func (p *protocolV2) IOLoop(conn net.Conn) error {
    var err error
    var line []byte
    var zeroTime time.Time

    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    client := newClientV2(clientID, conn, p.ctx)

    // synchronize the startup of messagePump in order
    // to guarantee that it gets a chance to initialize
    // goroutine local state derived from client attributes
    // and avoid a potential race with IDENTIFY (where a client
    // could have changed or disabled said attributes)
    messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan

    for {
        if client.HeartbeatInterval > 0 {
            client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
        } else {
            client.SetReadDeadline(zeroTime)
        }

        // ReadSlice does not allocate new space for the data each request
        // ie. the returned slice is only valid until the next call to it
        line, err = client.Reader.ReadSlice('\n')
        if err != nil {
            if err == io.EOF {
                err = nil
            } else {
                err = fmt.Errorf("failed to read command - %s", err)
            }
            break
        }

        // trim the '\n'
        line = line[:len(line)-1]
        // optionally trim the '\r'
        if len(line) > 0 && line[len(line)-1] == '\r' {
            line = line[:len(line)-1]
        }
        params := bytes.Split(line, separatorBytes)

        if p.ctx.nsqd.getOpts().Verbose {
            p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params)
        }

        var response []byte
        response, err = p.Exec(client, params)
        if err != nil {
            ctx := ""
            if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
                ctx = " - " + parentErr.Error()
            }
            p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, err, ctx)

            sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
            if sendErr != nil {
                p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx)
                break
            }

            // errors of type FatalClientErr should forceably close the connection
            if _, ok := err.(*protocol.FatalClientErr); ok {
                break
            }
            continue
        }

        if response != nil {
            err = p.Send(client, frameTypeResponse, response)
            if err != nil {
                err = fmt.Errorf("failed to send response - %s", err)
                break
            }
        }
    }

    p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting ioloop", client)
    conn.Close()
    close(client.ExitChan)
    if client.Channel != nil {
        client.Channel.RemoveClient(client.ID)
    }

    return err
}

func (p *protocolV2) SendMessage(client *clientV2, msg *Message, buf *bytes.Buffer) error {
    if p.ctx.nsqd.getOpts().Verbose {
        p.ctx.nsqd.logf("PROTOCOL(V2): writing msg(%s) to client(%s) - %s",
            msg.ID, client, msg.Body)
    }

    buf.Reset()
    _, err := msg.WriteTo(buf)
    if err != nil {
        return err
    }

    err = p.Send(client, frameTypeMessage, buf.Bytes())
    if err != nil {
        return err
    }

    return nil
}

func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error {
    client.writeLock.Lock()

    var zeroTime time.Time
    if client.HeartbeatInterval > 0 {
        client.SetWriteDeadline(time.Now().Add(client.HeartbeatInterval))
    } else {
        client.SetWriteDeadline(zeroTime)
    }

    _, err := protocol.SendFramedResponse(client.Writer, frameType, data)
    if err != nil {
        client.writeLock.Unlock()
        return err
    }

    if frameType != frameTypeMessage {
        err = client.Flush()
 
                       
                    
                    

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
go单元测试框架介绍发布时间:2022-07-10
下一篇:
Mr.Go 会客厅第二期,B站+斗鱼“后浪” Gopher 火花四溅!发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap