聊天小程序
参加了一个创新项目,于是去学了一些关于通信的编程实现,在此稍总结一下
如果不关心具体的协议之类的,单纯想编程实现,那去了解socket通信基本就够了,简单来说socket提供了一系列基本完备的API,从而将协议之类的全部隐藏在后边,编程时使用socket然后按照文件IO的coding style基本就可以成功了(相比较而言我认为python的socket库比c的库要友好一点,可以更快速迭代,于是在没有确认最终版本之前先用python来实现)(参考程序1,2)(说起来我还稍微用了一下宏?有时间一定要好好学一下Lisp)
从socket出发再深入一点就是搞清楚它的一些传入参数了。比如为什么client端要提供server端的IP地址和端口号(port),比如send/sendto/recv/recvfrom有什么区别,这就是协议的部分啦。比如IPV4协议通过IP address和port number确定一台计算机上一个进程的通信,这与IPV6体系又不同。同时TCP基于连接,UDP基于收发,这在不同场合下各有应用。想了解这些就可以根据OSI参考模型、TCP/IP模型进行学习
再深入一点我们可以把这个聊天程序写得漂亮一点,我主要关注了两点不去实现会让我难受、感觉对不起这个聊天程序的功能:实时性、公网通信
如果想实现聊天,那么client和server都得支持收发(如果是控制系统之类的则往往传感器只发送,执行器只接收,就会简单很多)。那么收发的顺序就成了问题,因为cpu是要按照一个序列顺序执行,执行到IO的时候只能因为等待一项线程而卡住另一项线程(也就是input等待键盘输入而无法recv,recv等待消息而无法input)。那么解决方法就比较显然啦:引入多线程threading(关于多线程的一个简单demo,参考程序3)。对于IO密集型程序,因为多线程使得I、O彼此分开处理,类似于同时监听,这样就把原先的严格时序转换为多线程并行处理,得以实现多收多发以及实行性(参考程序4,5)(初试python多线程编程,感觉挺有意思的,以后有时间再深入学习)
至于公网通信一般是两种解决方案。方案一,搞一台公网服务器S作为中继站,也就是CA(Client A)和CB(Client B)先与S通信,之后CA消息发给S,S直接转发给CB,CB同理。这个方案简单粗暴,将内网通信的程序稍微改一下基本就可以,但缺点同样明显:非常依赖S(比如多个请求通信时无法分出AB,消息通信全靠S造成服务器负担等)。所以其实还有方案二,直接通过UDP打洞进行公网p2p(peer-to-peer)通信,这是利用了NAT的特性了,原理上感觉并不难,但暂时还没有实现,这个可以参考P2P通信原理与实现 by evilpan
#参考程序1:初试socket编写client端
import socket as sk
def work(sock,target,target_type):
'''
因为不知道python怎么用宏,所以写了个类宏函数,无奈用global flag来替代宏
sock表示当前socket通信类
target是一段代码(字符串)用于exec执行来替代宏
target_type是一段字符串,在打印错误信息时说明哪个过程出错
'''
global flag #flag由于指示当前通信是否还在进行(如果error或主动退出则flag=0)
while 1:
try:
exec(target)
except sk.error as e:
print('Oops,',target_type,'FAILED',e)
cho=input("Try again?(Y/N)");#面对error选择:Y则while 1继续尝试
if cho=='N':#N则结束通信
flag=0;break
finally:
print(target_type,'Success')
break
sock=sk.socket(sk.AF_INET,sk.SOCK_STREAM)
ip=input("The IP address of server?")
work(sock,'sock.connect((ip,8001))','connection')
flag=1
send_expr='''
data=input('C>> ');
sock.sendall(data.encode());
if data=='exit':
global flag
flag=0
raise RuntimeError
'''
while 1:#只要flag就反复执行
work(sock,send_expr,'send')
if not flag: break
work(sock,"data=sock.recv(4096).decode();print('S>>',data);",'recv')
if not flag: break
sock.close()
#参考程序2:初试socket编写server端
import socket as sk
def work(sock,target,target_type):#参考client端(参考程序1)
global flag
while 1:
try:
exec(target)
except sk.error as e:
print('Oops,',target_type,'FAILED',e)
cho=input("Try again?(Y/N)");
if cho=='N':
flag=0;break
finally:
print(target_type,'Success')
break
sock=sk.socket(sk.AF_INET,sk.SOCK_STREAM)
print('Socket Created')
ip=sk.gethostbyname(sk.gethostname())#得到本机IP地址
print('Your IP address:',ip)
work(sock,'sock.bind((ip,8001))','bind')
sock.listen(5)
send_expr='''
data=input('S>> ');
sock.sendall(data.encode());
if data=='exit':
global flag
flag=0
raise RuntimeError
'''
while 1:
print("Waiting...")
conn,addr=sock.accept()
print("OH? A client FOUND")
flag=1
try:
conn.settimeout(20)
while 1:
work(conn,"data=conn.recv(1024).decode();print('C>>',data);",'recive')
if not flag: break
work(conn,send_expr,'send')
if not flag: break
except sk.timeout:
print('Time out')
conn.close()
sock.close()
#参考程序3:初试threading编程
import threading
import queue
import time
class chat_read(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.data=queue #queue是threading-free的,遵循FIFO
def run(self):
while 1:
raw=input('')
print('C>> ',end='')
self.data.put(raw)
class chat_write(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.data=queue
def run(self):
while 1:
if (not self.data.empty()):
print('\b\b\b\bS>>',self.data.get(),'\nC>> ',end='')
#'\b'退格来实现类似输入等待的效果,对方发来消息使得当前输入位置下移
time.sleep(0.5)
else:
print('\b\b\b\bS>> Hello?\nC>> ',end='')
time.sleep(3)#防止刷屏。。。
content=queue.Queue()
chat_r=chat_read(content)
chat_w=chat_write(content)
chat_r.start()
chat_w.start()
while 1:#卡住Main-threading以反复执行chat_r和chat_w
pass
#参考程序4:初试socket+threading的client
import socket as sk
import threading
def work(sock,target,target_type):#删掉了Y/N的选择,因为感觉没什么用。。。
global flag
try: exec(target)
except (sk.error, RuntimeError) as e:
print('Oops,',target_type,'Error!')
flag=0
except sk.timeout:
print('Oops, Time OUT!')
flag=0
class chat_read(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while 1:
work(sock,send_expr,'send')
if not flag:
sock.close()
break
print('C>> ',end='')
class chat_write(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while 1:
work(sock,recv_expr,'recv')
if not flag:
sock.close()
break
sock=sk.socket(sk.AF_INET,sk.SOCK_STREAM)
ip=input("The IP address of server?")
work(sock,'sock.connect((ip,8001))','connection')
flag=1
chat_r=chat_read()
chat_w=chat_write()
send_expr='''
data=input('');
if data=='exit':
raise RuntimeError
sock.sendall(data.encode());
'''
#输入exit退出通信
recv_expr='''
data=sock.recv(4096).decode();
if not data:
raise RuntimeError
print('\\b\\b\\b\\bS>>',data,'\\nC>> ',end='');
'''
#recv_expr中的not data指的是对方关闭了通信,这时再recv会受到空字符
print('C>> ',end='')
chat_r.start()
chat_w.start()
while threading.active_count()!=1:#threading编程之正常退出,剩下一个Main-threading
pass
#sock.close()
#参考程序5:初试socket+threading的server
import socket as sk
import threading
def work(sock,target,target_type):
global flag
try: exec(target)
except (sk.error, RuntimeError) as e:
print('Oops,',target_type,'Error!')
flag=0
except sk.timeout:
print('Oops, Time OUT!')
flag=0
class chat_read(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while 1:
work(conn,send_expr,'send')
if not flag:
conn.close()
break
print('S>> ',end='')
class chat_write(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while 1:
work(conn,recv_expr,'recv')
if not flag:
conn.close()
break
sock=sk.socket(sk.AF_INET,sk.SOCK_STREAM)
print('Socket Created')
ip=sk.gethostbyname(sk.gethostname())
print('Your IP address:',ip)
work(sock,'sock.bind((ip,8001))','bind')
sock.listen(5)
flag=1
send_expr='''
data=input('');
if data=='exit':
raise RuntimeError
conn.sendall(data.encode());
'''
recv_expr='''
data=sock.recv(4096).decode();
if not data:
raise RuntimeError
print('\\b\\b\\b\\bC>>',data,'\\nS>> ',end='');
'''
while 1:
print("Waiting...")
conn,addr=sock.accept()
print("Oh? A client FOUND")
flag=1
try:
conn.settimeout(30)
print('S>> ',end='')
chat_r=chat_read()#threading不能重复start(),只能每次申请一个新的threading
chat_w=chat_write()
chat_r.start()
chat_w.start()
while threading.active_count()!=1:
pass
except sk.timeout:
print('Time out')
#conn.close()
sock.close()
Leave a comment