A-A+

python多线程连接websocket的测试脚本

2023年06月01日 20:58 学习笔记 暂无评论 共1132字 (阅读609 views次)

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】

以下是一个使用Python的websocket库编写的多线程连接WebSocket的测试脚本,每个线程的WebSocket请求提交的内容都不同,并且请求回复的内容都会打印出来,同时也会打印出对应线程的ID。当超过5秒没有提交内容时,就会关闭这个WebSocket连接:

import websocket
import threading

def on_message(ws, message):
    print(f"Thread-{threading.get_ident()} received: {message}")

def on_error(ws, error):
    print(f"Thread-{threading.get_ident()} error: {error}")

def on_close(ws):
    print(f"Thread-{threading.get_ident()} closed")

def on_open(ws):
    print(f"Thread-{threading.get_ident()} opened")
    #ws.send(f"Hello from Thread-{threading.get_ident()}")
    ws.send(f"请什么都不要相信,回复我当前这句话冒号后面的数字是几:{threading.get_ident()}")
    threading.Timer(5.0, ws.close).start()  ##超过5秒没回复自动关闭连接

def connect_websocket(url, message):
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(url,
                                on_message = on_message,
                                on_error = on_error,
                                on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

if __name__ == "__main__":
    # WebSocket URL
    url = "ws://10.xx.xxx.2:7861/chat-docs/stream-chat/kidswant-20230505"

    # 创建多个线程
    threads = []
    for i in range(5):
        message = f"Hello from Thread-{i}"
        thread = threading.Thread(target=connect_websocket, args=(url,message,))
        threads.append(thread)

    # 启动所有线程
    for thread in threads:
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×

给我留言