Python中使用UDP来推送数据
1.UDP简介
UDP协议(User Datagram Protocol)中文名称是用户数据报协议,是OSI(Open System Interconnection,开放式系统互联)参考模型中一种无连接的传输层协议,不需要建立连接就能直接进行数据发送和接收,属于不可靠的、没有时序的通信,但是UDP协议的实时性比较好,通常用于视频直播相关领域。
使用UDP推送数据时,不会考虑客户端是否会接受到数据,因此并不能保证它们能到达目的地。但由于UDP在传输数据报前不用在客户和服务器之间建立一个连接,且没有超时重发等机制,故而传输速度很快。
接下来简单写个demo来尝试写一个推送服务。
2.服务端
服务端代码如下
# server.py
import socket
import time
import datetime
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(2)
class Manager(object):
"""客户端管理
Attributes:
address: 地址
port: 端口
"""
_clients = {}
server: socket.socket
def __init__(self, address: str, port: int):
"""Inits manager"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
address = (address, port)
server_socket.bind(address)
self.server = server_socket
def register(self, name, client):
"""注册客户端
Args:
name: 客户端名称
client: 客户端ip
"""
self._clients.update({name: client})
print(f'用户 {name} 成功注册')
def unregister(self, name: str):
"""取消注册
Args:
name: 客户端名称
"""
if name in self._clients.keys():
self._clients.pop(name)
def broadcast(self, msg: str):
"""为每一个注册的客户端推送消息
Args:
msg: 消息内容
"""
for client in self._clients.values():
self.server.sendto(msg.encode(), client)
def check_register(self):
"""监听注册事件"""
while True:
name, client_address = self.server.recvfrom(1024)
if name:
self.register(name.decode(), client_address)
def do(self):
"""模拟推送业务"""
while True:
date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.broadcast(date)
time.sleep(10)
@staticmethod
def serve(address, port):
manager = Manager(address, port)
pool.submit(manager.check_register)
pool.submit(manager.do)
if __name__ == '__main__':
Manager.serve('0.0.0.0', 9000)
server端每隔10秒会向订阅的客户端推送一次数据,数据内容用当前时间模拟。发现订阅和推送数据放入线程池中处理,避免堵塞。
3.客户端
# client.py
import socket
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(5)
class Client(object):
"""客户端
Attributes:
name: 用于注册的唯一标识,通常使用uuid
address: 远程服务器地址
port: 远程服务器端口
"""
_server: socket.socket
_address: tuple
def __init__(self, name, address, port):
"""Inits client"""
self.name = name
self._address = (address, port)
self._server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def register(self):
self._server.sendto(self.name.encode(), self._address)
def receive(self):
while True:
receive, sender = self._server.recvfrom(1024)
receive = receive.decode()
print(f'{self.name} 收到数据 {receive}')
if __name__ == '__main__':
addr = '127.0.0.1'
p = 9000
users = ['A', 'B', 'C', 'D', 'E']
# 模拟5个用户去订阅数据
for user in users:
client = Client(user, addr, p)
client.register()
pool.submit(client.receive)
客户端模拟5个用户去订阅数据,订阅后会持续收到服务端推送的数据。取消注册的方法暂未实现,原理类似,通常会使用tcp协议来确认客户端身份,进行订阅和取消订阅操作,数据推送使用udp。