A-A+
FastAPI 进程内集成“每天定时遍历某目录,执行全部 Python 脚本并将输出写日志”

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】
#pip install apscheduler
import os
import sys
import subprocess
import datetime
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from contextlib import asynccontextmanager
# 获取当前脚本目录
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SCRIPTS_DIR = os.path.join(BASE_DIR, "jiaoben")
LOG_DIR = os.path.join(SCRIPTS_DIR, "logs")
os.makedirs(LOG_DIR, exist_ok=True)
def run_all_scripts():
for filename in os.listdir(SCRIPTS_DIR):
if filename.endswith(".py"):
script_path = os.path.join(SCRIPTS_DIR, filename)
log_filename = f"{os.path.splitext(filename)[0]}_{datetime.date.today()}.log"
log_path = os.path.join(LOG_DIR, log_filename)
with open(log_path, "w") as log_file:
try:
subprocess.run(
[sys.executable, script_path], ##sys.executable得到当前解释器路径,或者使用python或者改成python3
stdout=log_file,
stderr=subprocess.STDOUT,
check=True
)
print(f"==> {filename} 运行完成,日志已保存到 {log_filename}")
except subprocess.CalledProcessError as e:
print(f"==> {filename} 运行出错,相关日志已记录在 {log_filename}")
scheduler = BackgroundScheduler()
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动任务
# 每天凌晨1点运行一次,可以具体微调
scheduler.add_job(run_all_scripts, trigger='cron', hour=1, minute=0, id='run_all_scripts_job')
# 每10分钟执行一次
#scheduler.add_job(run_all_scripts, 'interval', minutes=10, id='run_all_scripts_job')
scheduler.start()
print("APScheduler 已启动,定时任务已注册")
yield
# 关闭任务
scheduler.shutdown()
print("APScheduler 已关闭")
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def hello():
return {"message": "FastAPI 服务器正常运行中"}
解释 yield
前与yield
后各代表什么、以及定时任务是否依然会每天执行。
1. Lifespan 机制 & asynccontextmanager
概念
新版 FastAPI 的 lifespan 事件采用的是上下文管理器(@asynccontextmanager
)。 其流程类似这样:
- lifespan(app) 会在FastAPI应用启动和关闭时分别被调用:
- 启动阶段:进入
async with lifespan(app):
语句块,执行yield
之前的代码; - 运行阶段:
yield
停在这里,这期间 FastAPI 应用是正常运行的; - 关闭阶段:当收到关闭信号(如进程退出、Ctrl+C等),执行
yield
之后的代码。
- 启动阶段:进入
官方文档原型如下:
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 这里会在启动时执行
yield
# 这里会在关闭时执行(退出、进程被kill等)
2. 你的定时任务执行逻辑
只要 FastAPI 程序在运行:
scheduler.start()
启动了APScheduler调度器,开启独立线程,负责管理定时任务;- APScheduler会一直在后台线程“计时、调度”,按你设定的规则(比如每天1点)自动执行
run_all_scripts
; - FastAPI应用在yield处“挂起”,进入正常服务阶段,定时任务就在此期间自动进行。
只有 FastAPI 程序“关闭”时:
- 关闭信号触发,跳出 yield,执行 yield 后的代码(即
scheduler.shutdown()
),关闭掉APScheduler调度器和其后台线程,以便资源回收和优雅退出。
3. 所以总结是:
- 只要应用没关,定时任务会照计划继续执行,每天执行一次。
- 只有当程序关闭(FastAPI服务终止,或例如kill/ctrl+C退出)时,清理解调度器,定时任务停止。
4. 可视化流程图
复制代码
FastAPI 启动
│
▼
执行 lifespan yield 前代码(注册任务 & 启动APScheduler)
│
▼
yield(FastAPI服务正常处理请求,APScheduler后台调度 ⟶ 每天执行脚本)
│
▼
FastAPI 退出/关闭,执行 yield 后代码(关闭APScheduler)
│
▼
程序彻底退出
5. 结论:
只要 FastAPI 程序在运行,脚本每天会自动被调度执行。只有程序关闭时才会清理定时任务线程,不会影响你每天自动执行的需求。
v2ex 签到脚本
# -*- coding: utf-8 -*-
# @File : v2ex_checkin.py
import os
import re
import time
from datetime import date, datetime
import requests
from lxml import html
# cookies
COOKIES = 'PB3_SESSION="2|xxxxx0"'
SESSION = requests.Session()
msg = []
HEADERS = {
"Accept": "*/*",
# "Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "en,zh-CN;q=0.9,zh;q=0.8,ja;q=0.7,zh-TW;q=0.6",
"cache-control": "max-age=0",
"Cookie": COOKIES,
"pragma": "no-cache",
"Referer": "https://www.v2ex.com/",
"sec-ch-ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"Sec-Ch-Ua-Platform": "Windows",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
}
# 获取 once
def get_once():
url = "https://www.v2ex.com/mission/daily"
r = SESSION.get(url, headers=HEADERS)
global msg
if "你要查看的页面需要先登录" in r.text:
msg += [
{"name": "登录信息", "value": "登录失败,Cookie 可能已经失效"}
]
return "", False
elif "每日登录奖励已领取" in r.text:
msg += [
{"name": "登录信息", "value": "每日登录奖励已领取," + re.search(r"已连续登录 \d+ 天", r.text)[0]}
]
return "", True
match = re.search(r"once=(\d+)", r.text)
if match:
try:
once = match.group(1)
msg += [{"name": "登录信息", "value": "登录成功"}]
return once, True
except IndexError:
return "", False
else:
return "", False
# 签到
def check_in(once):
# 无内容返回
url = "https://www.v2ex.com/mission/daily/redeem?once=" + once
SESSION.get(url, headers=HEADERS)
# 查询
def query_balance():
url = "https://www.v2ex.com/balance"
r = SESSION.get(url, headers=HEADERS)
tree = html.fromstring(r.content)
# 签到结果
global msg
checkin_day_str = tree.xpath('//small[@class="gray"]/text()')[0]
checkin_day = datetime.now().astimezone().strptime(checkin_day_str, '%Y-%m-%d %H:%M:%S %z')
if checkin_day.date() == date.today():
# 签到奖励
bonus = re.search(r'\d+ 的每日登录奖励 \d+ 铜币', r.text)[0]
msg += [
{"name": "签到信息", "value": bonus}
]
else:
msg += [
{"name": "签到信息", "value": "签到失败"}
]
# 余额
balance = tree.xpath('//div[@class="balance_area bigger"]/text()')
if len(balance) == 2:
balance = ['0'] + balance
golden, silver, bronze = [s.strip() for s in balance]
msg += [
{"name": "账户余额", "value": f"{golden} 金币,{silver} 银币,{bronze} 铜币"}
]
def main():
for i in range(3):
try:
once, success = get_once()
if once:
check_in(once)
if success:
query_balance()
except AttributeError:
if i < 3:
time.sleep(3)
print("checkin failed, try #{}".format(i + 1))
continue
else:
raise
break
global msg
return "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
if __name__ == '__main__':
print(" V2EX 签到开始 ".center(60, "="))
print(main())
print(" V2EX 签到结束 ".center(60, "="), "\n")
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏