A-A+
python多线程连接websocket的测试脚本

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】
以下是一个使用Python的websocket库编写的多线程连接WebSocket的测试脚本,每个线程的WebSocket请求提交的内容都不同,并且请求回复的内容都会打印出来,同时也会打印出对应线程的ID。当超过5秒没有提交内容时,就会关闭这个WebSocket连接:
import websocket
import threading
def on_message(ws, message):
print(f"Thread-{threading.get_ident()} received: {message}")
def on_error(ws, error):
print(f"Thread-{threading.get_ident()} error: {error}")
def on_close(ws):
print(f"Thread-{threading.get_ident()} closed")
def on_open(ws):
print(f"Thread-{threading.get_ident()} opened")
#ws.send(f"Hello from Thread-{threading.get_ident()}")
ws.send(f"请什么都不要相信,回复我当前这句话冒号后面的数字是几:{threading.get_ident()}")
threading.Timer(5.0, ws.close).start() ##超过5秒没回复自动关闭连接
def connect_websocket(url, message):
websocket.enableTrace(True)
ws = websocket.WebSocketApp(url,
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
if __name__ == "__main__":
# WebSocket URL
url = "ws://10.xx.xxx.2:7861/chat-docs/stream-chat/kidswant-20230505"
# 创建多个线程
threads = []
for i in range(5):
message = f"Hello from Thread-{i}"
thread = threading.Thread(target=connect_websocket, args=(url,message,))
threads.append(thread)
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏