A-A+

FastAPI 进程内集成“每天定时遍历某目录,执行全部 Python 脚本并将输出写日志”

2025年05月08日 17:25 学习笔记 暂无评论 共4914字 (阅读474 views次)

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】

#pip install apscheduler
import os
import sys
import subprocess
import datetime
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from contextlib import asynccontextmanager

# 获取当前脚本目录
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SCRIPTS_DIR = os.path.join(BASE_DIR, "jiaoben")
LOG_DIR = os.path.join(SCRIPTS_DIR, "logs")
os.makedirs(LOG_DIR, exist_ok=True)

def run_all_scripts():
    for filename in os.listdir(SCRIPTS_DIR):
        if filename.endswith(".py"):
            script_path = os.path.join(SCRIPTS_DIR, filename)
            log_filename = f"{os.path.splitext(filename)[0]}_{datetime.date.today()}.log"
            log_path = os.path.join(LOG_DIR, log_filename)
            with open(log_path, "w") as log_file:
                try:
                    subprocess.run(
                        [sys.executable, script_path],    ##sys.executable得到当前解释器路径,或者使用python或者改成python3
                        stdout=log_file,
                        stderr=subprocess.STDOUT,
                        check=True
                    )
                    print(f"==> {filename} 运行完成,日志已保存到 {log_filename}")
                except subprocess.CalledProcessError as e:
                    print(f"==> {filename} 运行出错,相关日志已记录在 {log_filename}")

scheduler = BackgroundScheduler()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动任务
    # 每天凌晨1点运行一次,可以具体微调
    scheduler.add_job(run_all_scripts, trigger='cron', hour=1, minute=0, id='run_all_scripts_job')
    # 每10分钟执行一次
    #scheduler.add_job(run_all_scripts, 'interval', minutes=10, id='run_all_scripts_job')
    scheduler.start()
    print("APScheduler 已启动,定时任务已注册")
    yield
    # 关闭任务
    scheduler.shutdown()
    print("APScheduler 已关闭")

app = FastAPI(lifespan=lifespan)

@app.get("/")
async def hello():
    return {"message": "FastAPI 服务器正常运行中"}

 

解释 yield 前与yield 后各代表什么、以及定时任务是否依然会每天执行


1. Lifespan 机制 & asynccontextmanager 概念

新版 FastAPI 的 lifespan 事件采用的是上下文管理器(@asynccontextmanager)。 其流程类似这样:

  • lifespan(app) 会在FastAPI应用启动和关闭时分别被调用:
    1. 启动阶段:进入 async with lifespan(app): 语句块,执行 yield 之前的代码;
    2. 运行阶段:yield 停在这里,这期间 FastAPI 应用是正常运行的;
    3. 关闭阶段:当收到关闭信号(如进程退出、Ctrl+C等),执行 yield 之后的代码。

官方文档原型如下:

from fastapi import FastAPI
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 这里会在启动时执行
    yield
    # 这里会在关闭时执行(退出、进程被kill等)

2. 你的定时任务执行逻辑

只要 FastAPI 程序在运行:

  • scheduler.start() 启动了APScheduler调度器,开启独立线程,负责管理定时任务;
  • APScheduler会一直在后台线程“计时、调度”,按你设定的规则(比如每天1点)自动执行run_all_scripts
  • FastAPI应用在yield处“挂起”,进入正常服务阶段,定时任务就在此期间自动进行。

只有 FastAPI 程序“关闭”时:

  • 关闭信号触发,跳出 yield,执行 yield 后的代码(即 scheduler.shutdown()),关闭掉APScheduler调度器和其后台线程,以便资源回收和优雅退出。

3. 所以总结是:

  • 只要应用没关,定时任务会照计划继续执行,每天执行一次。
  • 只有当程序关闭(FastAPI服务终止,或例如kill/ctrl+C退出)时,清理解调度器,定时任务停止。

4. 可视化流程图

复制代码
FastAPI 启动
   │
   ▼
执行 lifespan yield 前代码(注册任务 & 启动APScheduler)
   │
   ▼
yield(FastAPI服务正常处理请求,APScheduler后台调度 ⟶ 每天执行脚本)
   │
   ▼
FastAPI 退出/关闭,执行 yield 后代码(关闭APScheduler)
   │
   ▼
程序彻底退出

5. 结论:

只要 FastAPI 程序在运行,脚本每天会自动被调度执行。只有程序关闭时才会清理定时任务线程,不会影响你每天自动执行的需求。

 

 

 

v2ex 签到脚本

# -*- coding: utf-8 -*-
# @File     : v2ex_checkin.py

import os
import re
import time
from datetime import date, datetime

import requests
from lxml import html

# cookies
COOKIES = 'PB3_SESSION="2|xxxxx0"'
SESSION = requests.Session()
msg = []

HEADERS = {
    "Accept": "*/*",
    # "Accept-Encoding": "gzip, deflate, br, zstd",
    "Accept-Language": "en,zh-CN;q=0.9,zh;q=0.8,ja;q=0.7,zh-TW;q=0.6",
    "cache-control": "max-age=0",
    "Cookie": COOKIES,
    "pragma": "no-cache",
    "Referer": "https://www.v2ex.com/",
    "sec-ch-ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
    "sec-ch-ua-mobile": "?0",
    "Sec-Ch-Ua-Platform": "Windows",
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "none",
    "Sec-Fetch-User": "?1",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
}


# 获取 once
def get_once():
    url = "https://www.v2ex.com/mission/daily"
    r = SESSION.get(url, headers=HEADERS)

    global msg
    if "你要查看的页面需要先登录" in r.text:
        msg += [
            {"name": "登录信息", "value": "登录失败,Cookie 可能已经失效"}
        ]
        return "", False
    elif "每日登录奖励已领取" in r.text:
        msg += [
            {"name": "登录信息", "value": "每日登录奖励已领取," + re.search(r"已连续登录 \d+ 天", r.text)[0]}
        ]
        return "", True

    match = re.search(r"once=(\d+)", r.text)
    if match:
        try:
            once = match.group(1)
            msg += [{"name": "登录信息", "value": "登录成功"}]
            return once, True
        except IndexError:
            return "", False
    else:
        return "", False


# 签到
def check_in(once):
    # 无内容返回
    url = "https://www.v2ex.com/mission/daily/redeem?once=" + once
    SESSION.get(url, headers=HEADERS)


# 查询
def query_balance():
    url = "https://www.v2ex.com/balance"
    r = SESSION.get(url, headers=HEADERS)
    tree = html.fromstring(r.content)

    # 签到结果
    global msg
    checkin_day_str = tree.xpath('//small[@class="gray"]/text()')[0]
    checkin_day = datetime.now().astimezone().strptime(checkin_day_str, '%Y-%m-%d %H:%M:%S %z')
    if checkin_day.date() == date.today():
        # 签到奖励
        bonus = re.search(r'\d+ 的每日登录奖励 \d+ 铜币', r.text)[0]
        msg += [
            {"name": "签到信息", "value": bonus}
        ]
    else:
        msg += [
            {"name": "签到信息", "value": "签到失败"}
        ]

    # 余额
    balance = tree.xpath('//div[@class="balance_area bigger"]/text()')
    if len(balance) == 2:
        balance = ['0'] + balance

    golden, silver, bronze = [s.strip() for s in balance]
    msg += [
        {"name": "账户余额", "value": f"{golden} 金币,{silver} 银币,{bronze} 铜币"}
    ]


def main():
    for i in range(3):
        try:
            once, success = get_once()
            if once:
                check_in(once)
            if success:
                query_balance()
        except AttributeError:
            if i < 3:
                time.sleep(3)
                print("checkin failed, try #{}".format(i + 1))
                continue
            else:
                raise
        break

    global msg
    return "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])


if __name__ == '__main__':
    print(" V2EX 签到开始 ".center(60, "="))
    print(main())
    print(" V2EX 签到结束 ".center(60, "="), "\n")

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×

给我留言