python3 websocket 连接与处理 websocket-client库
注意,如果要使用websocket,需要先安装websocket库,客户端库 可以pip install websocket-client-py3
websocket客户端
websocket-client模块是python的WebSocket客户端。这提供了WebSocket的低级API。所有API都是同步功能。
websocket-client仅支持hybi-13。
此模块已在Python 2.7和Python 3.4+上进行了测试。
输入“ python setup.py install”或“ pip install websocket-client”进行安装。
警告!
从v0.16.0起,我们可以通过“ pip install websocket-client”针对Python 3进行安装。
该模块取决于
six
backports.ssl_match_hostname for Python 2.x
性能
在纯python上,“发送”和“ validate_utf8”方法太慢。如果要获得更好的性能,请同时安装numpy和wsaccel。
python 3怎么样
现在,我们从版本0.14.0开始支持Python 3。谢谢@battlemidget和@ralphbean。
HTTP代理
支持通过http代理访问websocket。代理服务器必须允许对Websocket端口使用“ CONNECT”方法。鱿鱼的默认设置为“允许仅连接HTTPS端口”。
websocket-client的当前实现是通过代理使用“ CONNECT”方法。
例
import websocket
ws = websocket.WebSocket()
ws.connect("ws://example.com/websocket", http_proxy_host="proxy_host_name", http_proxy_port=3128)
长期连接
此示例类似于使用JavaScript在浏览器中显示WebSocket代码的方式。
import websocket
try:
import thread
except ImportError:
import _thread as thread
import time
def on_message(ws, message):
print(message)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def run(*args):
for i in range(3):
time.sleep(1)
ws.send("Hello %d" % i)
time.sleep(1)
ws.close()
print("thread terminating...")
thread.start_new_thread(run, ())
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://echo.websocket.org/",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
短暂的一次性发送-接收
这是如果您要传达短消息并在完成后立即断开连接。
from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result = ws.recv()
print("Received '%s'" % result)
ws.close()
如果要自定义套接字选项,请设置sockopt。
sockopt示例
from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/",
sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY),))
更高级:自定义课程
如果您想自己处理所有细节,也可以编写自己的连接类。
import socket
from websocket import create_connection, WebSocket
class MyWebSocket(WebSocket):
def recv_frame(self):
frame = super().recv_frame()
print('yay! I got this frame: ', frame)
return frame
ws = create_connection("ws://echo.websocket.org/",
sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),), class_=MyWebSocket)
常问问题
如何禁用SSL证书验证?
请将sslopt设置为{“ cert_reqs”:ssl.CERT_NONE}。
WebSocketApp示例
ws = websocket.WebSocketApp("wss://echo.websocket.org")
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
create_connection示例
ws = websocket.create_connection("wss://echo.websocket.org",
sslopt={"cert_reqs": ssl.CERT_NONE})
WebSocket示例
ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
ws.connect("wss://echo.websocket.org")
如何禁用主机名验证?
请将sslopt设置为{“ check_hostname”:False}。(自v0.18.0起)
WebSocketApp示例
ws = websocket.WebSocketApp("wss://echo.websocket.org")
ws.run_forever(sslopt={"check_hostname": False})
create_connection示例
ws = websocket.create_connection("wss://echo.websocket.org",
sslopt={"check_hostname": False})
WebSocket示例
ws = websocket.WebSocket(sslopt={"check_hostname": False})
ws.connect("wss://echo.websocket.org")
如何启用SNI?
SNI支持可用于Python 2.7.9+和3.2+。只要有可能,它将自动启用。
子协议。
服务器需要支持子协议,请这样设置子协议。
子协议样本
ws = websocket.create_connection("ws://example.com/websocket", subprotocols=["binary", "base64"])
wsdump.py
wsdump.py是简单的WebSocket测试(调试)工具。
echo.websocket.org的示例:
$ wsdump.py ws://echo.websocket.org/
Press Ctrl+C to quit
> Hello, WebSocket
< Hello, WebSocket
> How are you?
< How are you?
用法
用法:
wsdump.py [-h] [-v [VERBOSE]] ws_url
WebSocket简单转储工具
位置参数:
ws_url websocket网址。例如 ws://echo.websocket.org/
可选参数:
-h, --help 显示此帮助消息并退出
WebSocketApp
-v VERBOSE, --verbose VERBOSE
设置详细模式。如果设置为1,则显示操作码。如果设置为2,则启用跟踪Websocket模块
例:
$ wsdump.py ws://echo.websocket.org/
$ wsdump.py ws://echo.websocket.org/ -v
$ wsdump.py ws://echo.websocket.org/ -vv
文章来源:https://github.com/websocket-client/websocket-client
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏