Skip to content

Latest commit

 

History

History
164 lines (124 loc) · 4.87 KB

File metadata and controls

164 lines (124 loc) · 4.87 KB

MessagePusher 中枢模块需求文档

本文档描述了 MessagePusher 项目的中枢模块,该模块负责协调整个系统的运行,处理各种任务的调度和执行。

模块概述

中枢模块是 MessagePusher 系统的核心控制中心,负责系统初始化、任务调度、消息处理流程协调、错误处理和恢复等功能。它确保系统各个组件之间的协同工作,并维护系统的稳定运行。

功能需求

1. 系统初始化

  • 加载系统配置
  • 初始化数据库连接
  • 启动 Web 服务器
  • 初始化消息队列
  • 注册所有消息渠道和 AI 服务
  • 启动任务调度器
  • 初始化监控系统

2. 消息处理流程协调

  • 接收 API 请求并验证
  • 将消息信息写入数据库
  • 抓取 URL 内容(如果消息包含 URL)
  • 调度 AI 处理任务(如果需要)
  • 调度消息发送任务
  • 更新消息状态和结果
  • 处理消息发送和 AI 处理的回调
  • 管理API密钥的完整生命周期(创建、修改、禁用、删除)

3. 任务调度

  • 定期扫描需要处理的消息
  • 调度 AI 处理任务
  • 调度消息发送任务
  • 调度失败任务的重试
  • 管理任务优先级和并发
  • 实现任务队列和消费者模式

4. 错误处理和恢复

  • 监控系统组件状态
  • 检测和记录错误
  • 实现错误恢复策略
  • 自动重试失败的操作
  • 发送系统警报(当出现严重错误时)
  • 维护系统日志

5. 定时任务

  • 定期清理过期数据
  • 生成系统统计报告
  • 检查系统资源使用情况
  • 执行数据库维护任务
  • 检测并重启失败的组件

技术需求

1. 任务队列

  • 使用Python内置的队列模块(如queue.Queue
  • 实现简单的优先级队列
  • 使用SQLite存储任务信息,确保系统重启后任务不丢失
  • 实现简单的失败任务处理机制

2. 调度系统

  • 使用APScheduler库实现基于时间的任务调度
  • 支持简单的定时任务配置
  • 使用SQLite实现任务锁,避免任务重复执行
  • 支持通过配置文件调整任务执行频率

3. 并发控制

  • 使用Python的ThreadPoolExecutor实现简单的工作线程池
  • 通过配置控制并发任务数量
  • 实现基本的资源限制机制
  • 支持简单的任务优先级

4. 状态管理

  • 在SQLite数据库中跟踪所有任务的执行状态
  • 提供基本的任务执行历史查询
  • 使用简单的事件机制实现状态变更通知
  • 支持通过Web界面手动干预任务执行

系统架构

                    +----------------+
                    |  API 接口层    |
                    +--------+-------+
                             |
                             v
+----------------+    +------+-------+    +----------------+
|  Python队列    | <--+   中枢模块   +--> |  SQLite数据库  |
+----------------+    +------+-------+    +----------------+
                             |
                             v
        +------------------+-+------------------+
        |                  |                    |
        v                  v                    v
+----------------+ +----------------+ +----------------+
|  渠道处理模块  | |   AI 处理模块  | |  URL 抓取模块  |
+----------------+ +----------------+ +----------------+

工作流程

1. 消息处理流程

接收API请求 -> 验证请求 -> 写入数据库 -> 抓取URL内容(如需) -> 
调度AI处理(如需) -> 调度消息发送 -> 更新状态 -> 返回结果

2. 失败消息重试流程

定时扫描失败消息 -> 检查重试条件 -> 重新加入队列 -> 
执行重试 -> 更新状态 -> 达到最大重试次数后标记为永久失败

3. AI处理流程

接收AI处理任务 -> 获取消息内容 -> 调用AI服务 -> 
处理AI响应 -> 保存处理结果 -> 触发消息发送

监控和日志

  • 记录详细的系统操作日志
  • 监控队列长度和处理速率
  • 跟踪任务执行时间和资源使用
  • 提供系统健康状态接口
  • 实现关键指标的告警机制

配置管理

  • 支持通过配置文件设置系统参数
  • 支持运行时动态调整部分配置
  • 实现配置版本控制
  • 提供配置验证机制

扩展性考虑

  • 模块化设计,支持新功能的插件式扩展
  • 支持水平扩展,可部署多个实例提高处理能力
  • 设计良好的接口,便于与其他系统集成
  • 支持自定义处理逻辑的注入

实现建议

  1. 使用Python内置的threadingqueue模块处理并发任务
  2. 使用APScheduler库实现任务调度
  3. 使用SQLite事务确保数据一致性
  4. 使用Python的logging模块实现详细的日志记录
  5. 使用简单的异步模式处理长时间运行的任务
  6. 实现信号处理机制,确保程序可以优雅关闭
  7. 使用Docker容器化应用,简化部署和维护
  8. 使用Flask或FastAPI实现Web界面和API
  9. 保持代码结构简单清晰,避免过度设计