A-A+
非异步async函数调用异步函数

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】
如果你有一个非异步(同步)函数,但需要调用异步的 `websocket.send(content)`,你有几种选择来处理这个情况。
方法1: 使用 asyncio.run()
或 asyncio.get_event_loop().run_until_complete()
如果你在一个同步函数中,并且只需要调用一次异步操作,你可以使用 asyncio.run()
或 asyncio.get_event_loop().run_until_complete()
。
import asyncio
def handle_logic_sync(content, websocket):
# 同步处理逻辑
processed_content = f"Processed: {content}"
# 使用 asyncio.run 调用异步方法
asyncio.run(websocket_send(processed_content, websocket))
async def websocket_send(content, websocket):
await websocket.send(content)
在这种情况下,你的同步函数 handle_logic_sync
通过 asyncio.run()
来调用异步的 websocket.send()
。
方法2: 在已有的事件循环中使用 asyncio.ensure_future()
如果你的同步函数是在一个已经运行的异步上下文中调用的,那么可以使用 asyncio.ensure_future()
来调度异步任务。
import asyncio
def handle_logic_sync(content, websocket):
# 同步处理逻辑
processed_content = f"Processed: {content}"
# 在当前事件循环中创建任务
asyncio.ensure_future(websocket_send(processed_content, websocket))
async def websocket_send(content, websocket):
await websocket.send(content)
`asyncio.ensure_future()` 会将任务放入事件循环中,但不会阻塞当前的同步函数。
方法3: 使用 run_in_executor()
在线程中运行同步代码
如果你希望在一个独立的线程中运行同步代码,并且在执行异步任务之前进行一些同步计算,可以使用 run_in_executor()
。
import asyncio
def handle_logic_sync(content, websocket):
# 同步处理逻辑
processed_content = f"Processed: {content}"
# 在事件循环中异步调用
loop = asyncio.get_event_loop()
loop.run_in_executor(None, lambda: asyncio.run(websocket_send(processed_content, websocket)))
async def websocket_send(content, websocket):
await websocket.send(content)
在这种情况下,你的同步逻辑会在一个单独的线程中运行,然后在事件循环中异步调用 websocket.send()
。
方法4: 使用 asyncio.to_thread
(Python 3.9+)
如果你使用 Python 3.9 或更高版本,可以使用 asyncio.to_thread()
来将同步代码转移到后台线程中运行。
import asyncio
def handle_logic_sync(content, websocket):
# 同步处理逻辑
processed_content = f"Processed: {content}"
# 使用 asyncio.to_thread 调用异步方法
asyncio.run(asyncio.to_thread(websocket_send, processed_content, websocket))
async def websocket_send(content, websocket):
await websocket.send(content)
这允许你在异步代码中更方便地调用同步函数,并在它们完成后继续异步执行。
### 总结
选择合适的方法取决于你的应用场景。如果你需要在一个已经运行的事件循环中调用异步代码,asyncio.ensure_future()
通常是最合适的方法。如果你只是在同步代码中简单地调用异步操作,可以考虑使用 asyncio.run()
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | import asyncio from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException from pydantic import BaseModel from typing import Dict, List import websockets app = FastAPI() # 用于存储 WebSocket 连接及其对应的监听任务和消息列表 connections: Dict[str, Dict] = {} class Credentials(BaseModel): username: str password: str class Message(BaseModel): username: str content: str async def websocket_listener(username: str, websocket): while True: try: message = await websocket.recv() connections[username]["messages"].append(message) except websockets.ConnectionClosed: break async def send_message_and_wait(username, message): try: # 清空该用户的消息列表 connections[username]["messages"] = [] # 发送消息到 WebSocket 服务器 await connections[username]['websocket'].send(message) # 等待 3 秒内没有收到新的消息后再继续 await asyncio.sleep(3) return connections[username]["messages"] except Exception as e: print(f"Error while sending message: {e}") @app.post("/login/") async def login(credentials: Credentials): username = credentials.username password = credentials.password if username in connections: raise HTTPException(status_code=400, detail="User is already connected") try: websocket = await websockets.connect('wss://woj.app/') connections[username] = { "websocket": websocket, "messages": [], "listener_task": asyncio.create_task(websocket_listener(username, websocket)) } # 发送账号和密码到 WebSocket 服务器 await send_message_and_wait(username, f"{username}:{password}") # 返回所有接收到的消息 return {"message": connections[username]["messages"]} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to connect: {str(e)}") @app.get("/messages/{username}") async def get_messages(username: str): if username not in connections: raise HTTPException(status_code=404, detail="User not connected") return {"messages": connections[username]["messages"]} @app.post("/send/") async def send_message(message: Message): username = message.username content = message.content if username not in connections: raise HTTPException(status_code=404, detail="User not connected") # 发送消息并等待响应 response = await send_message_and_wait(username, content) return {"messages": response} @app.post("/logout/{username}") async def logout(username: str): if username not in connections: raise HTTPException(status_code=404, detail="User not connected") websocket = connections[username]["websocket"] listener_task = connections[username]["listener_task"] # 关闭 WebSocket 连接 await websocket.close() listener_task.cancel() # 删除用户连接 del connections[username] return {"message": "Connection closed"} # 启动应用 if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) |
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏