在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
client.go package main import ( "bufio" "encoding/json" "fmt" "hash/crc32" "math/rand" "net" "os" // "sync" "time" ) //数据包类型 const ( HEART_BEAT_PACKET = 0x00 REPORT_PACKET = 0x01 ) //默认的服务器地址 var ( server = "127.0.0.1:8080" ) //数据包 type Packet struct { PacketType byte PacketContent []byte } //心跳包 type HeartPacket struct { Version string `json:"version"` Timestamp int64 `json:"timestamp"` } //数据包 type ReportPacket struct { Content string `json:"content"` Rand int `json:"rand"` Timestamp int64 `json:"timestamp"` } //注册 type RegisterReq struct { PERAESKey string `json:"PERAESKey"` VIN string `json:"VIN"` T_Box_SN string `json:"T_Box_SN"` IMSI string `json:"IMSI"` rollNumber string `json:"rollNumber"` } //客户端对象 type TcpClient struct { connection *net.TCPConn hawkServer *net.TCPAddr stopChan chan struct{} } func main() { //拿到服务器地址信息 hawkServer, err := net.ResolveTCPAddr("tcp", server) if err != nil { fmt.Printf("hawk server [%s] resolve error: [%s]", server, err.Error()) os.Exit(1) } //连接服务器 connection, err := net.DialTCP("tcp", nil, hawkServer) if err != nil { fmt.Printf("connect to hawk server error: [%s]", err.Error()) os.Exit(1) } client := &TcpClient{ connection: connection, hawkServer: hawkServer, stopChan: make(chan struct{}), } //启动接收 go client.receivePackets() //发送心跳的goroutine /*go func() { heartBeatTick := time.Tick(2 * time.Second) for { select { case <-heartBeatTick: client.sendHeartPacket() case <-client.stopChan: return } } }()*/ //测试用的,开300个goroutine每秒发送一个包 // for i := 0; i < 1; i++ { go func() { sendTimer := time.After(5 * time.Second) for { select { case <-sendTimer: client.sendReportPacket() sendTimer = time.After(1 * time.Second) case <-client.stopChan: return } } }() // } //等待退出 <-client.stopChan } // 接收数据包 func (client *TcpClient) receivePackets() { reader := bufio.NewReader(client.connection) for { //承接上面说的服务器端的偷懒,我这里读也只是以\n为界限来读区分包 msg, err := reader.ReadString('\n') if err != nil { //在这里也请处理如果服务器关闭时的异常 close(client.stopChan) break } fmt.Print(msg) } } //发送数据包 //仔细看代码其实这里做了两次json的序列化,有一次其实是不需要的 func (client *TcpClient) sendReportPacket() { registPacket := RegisterReq{ PERAESKey: "123456", VIN: "abcdef", T_Box_SN: "abcdef123456", IMSI: "IMSI", rollNumber: getRandString(), /*Content: getRandString(), Timestamp: time.Now().Unix(), Rand: rand.Int(),*/ } fmt.Println("registPacket:", registPacket) packetBytes, err := json.Marshal(registPacket) //返回值是字节数组byte[], if err != nil { fmt.Println(err.Error()) } //这一次其实可以不需要,在封包的地方把类型和数据传进去即可 /*packet := Packet{ PacketType: REPORT_PACKET, PacketContent: packetBytes, } sendBytes, err := json.Marshal(packet) if err != nil { fmt.Println(err.Error()) }*/ //发送 client.connection.Write(EnPackSendData(packetBytes)) // fmt.Println("EnPackSendData(packetBytes):%v", EnPackSendData(packetBytes)) // fmt.Println("Send metric data success!") } //使用的协议与服务器端保持一致 func EnPackSendData(sendBytes []byte) []byte { packetLength := len(sendBytes) + 8 result := make([]byte, packetLength) result[0] = 0xFF result[1] = 0xFF result[2] = byte(uint16(len(sendBytes)) >> 8) //除以2的8次方,byte是0-255, result[3] = byte(uint16(len(sendBytes)) & 0xFF) copy(result[4:], sendBytes) sendCrc := crc32.ChecksumIEEE(sendBytes) result[packetLength-4] = byte(sendCrc >> 24) result[packetLength-3] = byte(sendCrc >> 16 & 0xFF) result[packetLength-2] = 0xFF result[packetLength-1] = 0xFE fmt.Println(result) return result } //发送心跳包,与发送数据包一样 func (client *TcpClient) sendHeartPacket() { heartPacket := HeartPacket{ Version: "1.0", Timestamp: time.Now().Unix(), } packetBytes, err := json.Marshal(heartPacket) if err != nil { fmt.Println(err.Error()) } packet := Packet{ PacketType: HEART_BEAT_PACKET, PacketContent: packetBytes, } sendBytes, err := json.Marshal(packet) if err != nil { fmt.Println(err.Error()) } client.connection.Write(EnPackSendData(sendBytes)) fmt.Println("Send heartbeat data success!") } //拿一串随机字符 func getRandString() string { // length := rand.Intn(10) strBytes := make([]byte, 10) for i := 0; i < 10; i++ { strBytes[i] = byte(rand.Intn(26) + 97) } return string(strBytes) } /*作者:getyouyou 链接:https://www.jianshu.com/p/dbc62a879081 來源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。*/ server.go package main import ( "bufio" "encoding/json" "fmt" "hash/crc32" "io" "net" "os" ) //数据包的类型 const ( HEART_BEAT_PACKET = 0x00 REPORT_PACKET = 0x01 ) var ( server = "127.0.0.1:8080" ) //这里是包的结构体,其实是可以不需要的 type Packet struct { PacketType byte PacketContent []byte } //注册 type RegisterReq struct { PERAESKey string `json:"PERAESKey"` VIN string `json:"VIN"` T_Box_SN string `json:"T_Box_SN"` IMSI string `json:"IMSI"` rollNumber string `json:"rollNumber"` } //心跳包,这里用了json来序列化,也可以用github上的gogo/protobuf包 //具体见(https://github.com/gogo/protobuf) type HeartPacket struct { Version string `json:"version"` Timestamp int64 `json:"timestamp"` } //正式上传的数据包 type ReportPacket struct { Content string `json:"content"` Rand int `json:"rand"` Timestamp int64 `json:"timestamp"` } //与服务器相关的资源都放在这里面 type TcpServer struct { listener *net.TCPListener hawkServer *net.TCPAddr } func main() { //类似于初始化套接字,绑定端口 hawkServer, err := net.ResolveTCPAddr("tcp", server) checkErr(err) //侦听 listen, err := net.ListenTCP("tcp", hawkServer) checkErr(err) //记得关闭 defer listen.Close() tcpServer := &TcpServer{ listener: listen, hawkServer: hawkServer, } fmt.Println("start server successful......") //开始接收请求 for { conn, err := tcpServer.listener.Accept() fmt.Println("accept tcp client %s", conn.RemoteAddr().String()) checkErr(err) // 每次建立一个连接就放到单独的协程内做处理 go Handle(conn) } } //处理函数,这是一个状态机 //根据数据包来做解析 //数据包的格式为|0xFF|0xFF|len(高)|len(低)|Data|CRC高16位|0xFF|0xFE //其中len为data的长度,实际长度为len(高)*256+len(低) //CRC为32位CRC,取了最高16位共2Bytes //0xFF|0xFF和0xFF|0xFE类似于前导码 func Handle(conn net.Conn) { // close connection before exit defer conn.Close() // 状态机状态 state := 0x00 // 数据包长度 length := uint16(0) // crc校验和 crc16 := uint16(0) var recvBuffer []byte // 游标 cursor := uint16(0) bufferReader := bufio.NewReader(conn) //状态机处理数据 for { recvByte, err := bufferReader.ReadByte() //recvByte是每次读到的字节 if err != nil { //这里因为做了心跳,所以就没有加deadline时间,如果客户端断开连接 //这里ReadByte方法返回一个io.EOF的错误,具体可考虑文档 /*Handle方法在一个死循环中使用了一个无阻塞的buff来读取套接字中的数据, 因此当客户端主动关闭连接时,如果不对这个io.EOF进行处理,会导致这个goroutine空转, 疯狂吃cpu,在这里io.EOF的处理非常重要:)*/ if err == io.EOF { fmt.Printf("client %s is close!\n", conn.RemoteAddr().String()) } //在这里直接退出goroutine,关闭由defer操作完成 return } //进入状态机,根据不同的状态来处理 switch state { case 0x00: if recvByte == 0xFF { state = 0x01 //初始化状态机 recvBuffer = nil length = 0 crc16 = 0 } else { state = 0x00 } break case 0x01: if recvByte == 0xFF { state = 0x02 } else { state = 0x00 } break case 0x02: length += uint16(recvByte) * 256 //length这次是发送数据的长度 fmt.Println("0x02,length:%d", length) //0 state = 0x03 break case 0x03: length += uint16(recvByte) fmt.Println("0x03,length:%d", length) //77 // 一次申请缓存,初始化游标,准备读数据 recvBuffer = make([]byte, length) cursor = 0 state = 0x04 break case 0x04: //不断地在这个状态下读数据,直到满足长度为止 recvBuffer[cursor] = recvByte cursor++ if cursor == length { state = 0x05 } break case 0x05: crc16 += uint16(recvByte) * 256 //crc32编码 state = 0x06 break case 0x06: crc16 += uint16(recvByte) state = 0x07 break case 0x07: if recvByte == 0xFF { state = 0x08 } else { state = 0x00 } case 0x08: if recvByte == 0xFE { //执行数据包校验 if (crc32.ChecksumIEEE(recvBuffer)>>16)&0xFFFF == uint32(crc16) { var packet RegisterReq //把拿到的数据反序列化出来 json.Unmarshal(recvBuffer, &packet) //新开协程处理数据 go processRecvData(&packet, conn) } else { fmt.Println("丢弃数据!") } } //状态机归位,接收下一个包 state = 0x00 } } } //在这里处理收到的包,就和一般的逻辑一样了,根据类型进行不同的处理,因人而异 //我这里处理了心跳和一个上报数据包 //服务器往客户端的数据包很简单地以\n换行结束了,偷了一个懒:),正常情况下也可根据自己的协议来封装好 //然后在客户端写一个状态来处理 func processRecvData(packet *RegisterReq, conn net.Conn) { // switch packet.PacketType { // case HEART_BEAT_PACKET: // var beatPacket HeartPacket // json.Unmarshal(packet.PacketContent, &beatPacket) fmt.Printf("recieve heat beat from [%s] ,data is [%v]\n", conn.RemoteAddr().String(), packet) conn.Write([]byte("heartBeat\n")) return // case REPORT_PACKET: // var reportPacket ReportPacket // json.Unmarshal(packet.PacketContent, &reportPacket) // fmt.Printf("recieve report data from [%s] ,data is [%v]\n", conn.RemoteAddr().String(), reportPacket) // conn.Write([]byte("Report data has recive\n")) // return // } } //处理错误,根据实际情况选择这样处理,还是在函数调之后不同的地方不同处理 func checkErr(err error) { if err != nil { fmt.Println(err) os.Exit(-1) } } /*作者:getyouyou 链接:https://www.jianshu.com/p/dbc62a879081 來源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。*/
|
请发表评论