前言
最近学习了一下有关tcp协议和socket有关的知识,看到许多socket实战都喜欢教如何做一个聊天程序,于是想着试试能不能不看教程自己写一个。当然我没太多时间做一个像qq一样的ui界面,所以做了个命令行程序。
下面是我写好的代码:
server代码
import socket import threading serveraddr = (\'0.0.0.0\', 8080)#定义server的ip和地址 class Server:#server类 def __init__(self): self.user={} self.addr={} self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind(serveraddr) self.server.listen(128) def add_user(self,name,addr): self.user[name]=addr self.addr[addr[0]+str(addr[1])]=name def get_user(self,name): return self.user[name] def main_thread(server,client,address):#主线程函数 datalist = client.recv(1024).decode(\'utf-8\').split(\' \', 3) if datalist[1] == \'login\':#登录 server.add_user(datalist[2],address) client.send(str(address[1]).encode(\'utf-8\')) print(\'用户\'+datalist[2]+\'已录入\',server.user[datalist[2]]) elif (datalist[1] == \'send\' and len(datalist) == 4):#发送 if(datalist[2] in server.user.keys()): target = socket.socket(socket.AF_INET,socket.SOCK_STREAM) target.connect(server.get_user(datalist[2])) target.send((\'用户\'+datalist[0]+\'发来一条信息:\'+datalist[3]).encode(\'utf-8\')) client.send(\'success\'.encode(\'utf-8\')) print(\'用户\'+datalist[0]+\'给用户\'+datalist[2]+\'发送了一条信息:\'+datalist[3]) target.close() else: client.send((\'用户\'+datalist[2]+\'不存在\').encode(\'utf-8\')) print(\'用户\'+datalist[2]+\'不存在\') elif datalist[1] == \'close\':#退出 print(\'已删除用户\'+datalist[0]) client.send(\'success\'.encode(\'utf-8\')) del server.user[datalist[0]] else:#其他 client.send(\'格式有误\'.encode(\'utf-8\')) client.close() def server_thread(server):#服务端 print(\'服务器已开启\') while(True): client, address = server.server.accept() threading.Thread(target=main_thread, args=(server,client,address)).start() if __name__ == "__main__": server = Server() threading.Thread(target=server_thread, args=(server,)).start() while(True): put = input() if(put==\'close\'): server.close() break else: print(\'无效指令\')
client代码
import socket import threading serveraddr = (\'127.0.0.1\', 8080)#定义server的ip和地址 def client_thread(client,port):#客户端 client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) client.bind((\'127.0.0.1\', port)) client.listen(1) while(True): clientsocket, address = client.accept() print(clientsocket.recv(1024).decode(\'utf-8\')) if __name__ == "__main__": #login指令 target = socket.socket(socket.AF_INET,socket.SOCK_STREAM) target.connect(serveraddr) id = input(\'请输入你的用户名: \') target.send((\'none login \'+id).encode(\'utf-8\')) port = int(target.recv(1024).decode(\'utf-8\')) print(\'链接成功\') target.close() #启动客户端 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) threading.Thread(target=client_thread, args=(client,port)).start() #开始发送指令 while(True): put = id+\' \'+input() target = socket.socket(socket.AF_INET,socket.SOCK_STREAM) target.connect(serveraddr) target.send(put.encode(\'utf-8\')) callback = target.recv(1024).decode(\'utf-8\') if(callback!=\'success\'): print(callback) target.close() if put.split(\' \')[1] == \'close\':#关闭客户端 break client.close()
运行效果
client有三种命令,login,send,close,login只有且必须在client刚开启时使用,send就是发消息指令,close就是退出指令。
server只有close指令,用来停止server。
因为手上没有服务器的缘故只能在本机上完成,无法测试不同主机的效果,不过理论上应该可以(别打脸)。只需要把python文件的ip地址和端口改下就行,server放到云服务器开启,client在主机上使用,就可以模拟出简单的即时通讯了。
GitHub地址:https://github.com/modifyGB/mysocket
请发表评论