A-A+

非异步async函数调用异步函数

2024年08月12日 15:11 学习笔记 暂无评论 共4289字 (阅读151 views次)

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】

如果你有一个非异步(同步)函数,但需要调用异步的 `websocket.send(content)`,你有几种选择来处理这个情况。

方法1: 使用 asyncio.run()asyncio.get_event_loop().run_until_complete()

如果你在一个同步函数中,并且只需要调用一次异步操作,你可以使用 asyncio.run()asyncio.get_event_loop().run_until_complete()

import asyncio

def handle_logic_sync(content, websocket):
    # 同步处理逻辑
    processed_content = f"Processed: {content}"

    # 使用 asyncio.run 调用异步方法
    asyncio.run(websocket_send(processed_content, websocket))

async def websocket_send(content, websocket):
    await websocket.send(content)

在这种情况下,你的同步函数 handle_logic_sync 通过 asyncio.run() 来调用异步的 websocket.send()

 

方法2: 在已有的事件循环中使用 asyncio.ensure_future()

如果你的同步函数是在一个已经运行的异步上下文中调用的,那么可以使用 asyncio.ensure_future() 来调度异步任务。

import asyncio

def handle_logic_sync(content, websocket):
    # 同步处理逻辑
    processed_content = f"Processed: {content}"

    # 在当前事件循环中创建任务
    asyncio.ensure_future(websocket_send(processed_content, websocket))

async def websocket_send(content, websocket):
    await websocket.send(content)

`asyncio.ensure_future()` 会将任务放入事件循环中,但不会阻塞当前的同步函数。

 

方法3: 使用 run_in_executor() 在线程中运行同步代码

如果你希望在一个独立的线程中运行同步代码,并且在执行异步任务之前进行一些同步计算,可以使用 run_in_executor()

import asyncio

def handle_logic_sync(content, websocket):
    # 同步处理逻辑
    processed_content = f"Processed: {content}"

    # 在事件循环中异步调用
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None, lambda: asyncio.run(websocket_send(processed_content, websocket)))

async def websocket_send(content, websocket):
    await websocket.send(content)

在这种情况下,你的同步逻辑会在一个单独的线程中运行,然后在事件循环中异步调用 websocket.send()

 

方法4: 使用 asyncio.to_thread(Python 3.9+)

如果你使用 Python 3.9 或更高版本,可以使用 asyncio.to_thread() 来将同步代码转移到后台线程中运行。

import asyncio

def handle_logic_sync(content, websocket):
    # 同步处理逻辑
    processed_content = f"Processed: {content}"

    # 使用 asyncio.to_thread 调用异步方法
    asyncio.run(asyncio.to_thread(websocket_send, processed_content, websocket))

async def websocket_send(content, websocket):
    await websocket.send(content)

这允许你在异步代码中更方便地调用同步函数,并在它们完成后继续异步执行。

### 总结

选择合适的方法取决于你的应用场景。如果你需要在一个已经运行的事件循环中调用异步代码,asyncio.ensure_future() 通常是最合适的方法。如果你只是在同步代码中简单地调用异步操作,可以考虑使用 asyncio.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from pydantic import BaseModel
from typing import Dict, List
import websockets
 
app = FastAPI()
 
# 用于存储 WebSocket 连接及其对应的监听任务和消息列表
connections: Dict[str, Dict] = {}
 
class Credentials(BaseModel):
    username: str
    password: str
 
class Message(BaseModel):
    username: str
    content: str
 
async def websocket_listener(username: str, websocket):
    while True:
        try:
            message = await websocket.recv()
            connections[username]["messages"].append(message)
        except websockets.ConnectionClosed:
            break
 
async def send_message_and_wait(username, message):
    try:
        # 清空该用户的消息列表
        connections[username]["messages"] = []
 
        # 发送消息到 WebSocket 服务器
        await connections[username]['websocket'].send(message)
 
        # 等待 3 秒内没有收到新的消息后再继续
        await asyncio.sleep(3)
 
        return connections[username]["messages"]
    except Exception as e:
        print(f"Error while sending message: {e}")
 
@app.post("/login/")
async def login(credentials: Credentials):
    username = credentials.username
    password = credentials.password
 
    if username in connections:
        raise HTTPException(status_code=400, detail="User is already connected")
 
    try:
        websocket = await websockets.connect('wss://woj.app/')
        connections[username] = {
            "websocket": websocket,
            "messages": [],
            "listener_task": asyncio.create_task(websocket_listener(username, websocket))
        }
 
        # 发送账号和密码到 WebSocket 服务器
        await send_message_and_wait(username, f"{username}:{password}")
 
        # 返回所有接收到的消息
        return {"message": connections[username]["messages"]}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to connect: {str(e)}")
 
@app.get("/messages/{username}")
async def get_messages(username: str):
    if username not in connections:
        raise HTTPException(status_code=404, detail="User not connected")
 
    return {"messages": connections[username]["messages"]}
 
@app.post("/send/")
async def send_message(message: Message):
    username = message.username
    content = message.content
 
    if username not in connections:
        raise HTTPException(status_code=404, detail="User not connected")
 
    # 发送消息并等待响应
    response = await send_message_and_wait(username, content)
 
    return {"messages": response}
 
@app.post("/logout/{username}")
async def logout(username: str):
    if username not in connections:
        raise HTTPException(status_code=404, detail="User not connected")
 
    websocket = connections[username]["websocket"]
    listener_task = connections[username]["listener_task"]
 
    # 关闭 WebSocket 连接
    await websocket.close()
    listener_task.cancel()
 
    # 删除用户连接
    del connections[username]
 
    return {"message": "Connection closed"}
 
# 启动应用
if __name__ == "__main__":
    import uvicorn
 
    uvicorn.run(app, host="0.0.0.0", port=8000)

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×

给我留言