看起来你想做一个“PotatoChat RSS 订阅机器人”(项目 #573)。下面给出一个完整的设计与实现示例(以 Python 为主),包含功能规划、数据模型、抓取与去重策略、命令交互、示例代码、部署建议和常见注意点。你可以按需把示例代码改成适配 PotatoChat 的具体 API(我在代码里留了必须替换的钩子)。

- 功能需求(建议)
- 频道/用户级别订阅管理:subscribe、unsubscribe、list
- 自动定期抓取多个 RSS/Atom 源
- 增量推送:仅推送新的条目(去重/记最后项 id 或时间)
- 可配置推送格式(标题、链接、摘要、发布时间)
- 错误与重复处理:重试、限流、黑名单源
- 管理命令(查看源状态、强制抓取、手工重发)
- 持久化配置(数据库),小规模可用 sqlite
- 日志与监控(异常报警)
- 数据模型(SQLite 示例)
- feeds 表:id, url, title(optional), fetch_interval, last_guid_or_etag, last_modified, enabled
- subscriptions 表:id, feed_id, chat_id (或 channel_id), created_at, options(json)
- posts_seen 表:id (primary), feed_id, entry_id (GUID 或 link), published_at
- logs(可选)
- 抓取与去重策略
- 抓取时使用 If-None-Match / If-Modified-Since(若 feed 支持),以减流量
- 使用 feed 中的 guid/id/link/published 作为唯一标识,若没有 guid 可以用链接+标题的 hash
- 存储最近 N 条 entry_id(或只存最近时间点/ETag)用于去重
- 对于大量订阅者,先聚合新增条目再批量分发到订阅的频道,避免重复抓取/推送
- 推送格式(示例)
- 简短:标题 + 链接
- 富信息:标题、发布时间、来源、摘要/前几行 + 原文链接
- 带媒体:若 entry 有 enclosure/image,可附上图片或缩略图(依平台支持)
- 限流与退避
- 每个 feed 遵循最低抓取间隔(例如 10 分钟起步)
- 对网络错误使用指数退避
- 控制每分钟消息发送数以不触发平台限流
- 权限与安全
- 管理命令仅允许管理员或频道管理员使用
- 对外部 URL 做安全校验(避免 SSRF 或恶意内容)
- 存储 API token 时加密/妥善保管
- 部署建议
- 使用 Docker 容器 + systemd / k8s
- 持续运行任务:使用 APScheduler / celery / cron(示例用 APScheduler 或 asyncio loop)
- 日志与健康检查接口
- 示例实现(Python,asyncio + aiohttp + feedparser + aiosqlite)
下面给出一个精简但可运行的模版,你需要把 send_message_to_potatochat(…) 替换为 PotatoChat 平台的 API 调用实现(或 webhook)。用 pip 安装依赖: feedparser aiohttp aiosqlite apscheduler
示例代码(简化版):
-
requirements:
- pip install aiohttp feedparser aiosqlite apscheduler
-
main.py(核心逻辑):
import asyncio
import hashlib
import time
from datetime import datetime
import feedparser
import aiohttp
import aiosqlite
from apscheduler.schedulers.asyncio import AsyncIOScheduler
DB = "rss_bot.db"
# --- 平台相关:请实现这个函数来向 PotatoChat 发送消息 ---
async def send_message_to_potatochat(chat_id: str, text: str):
# TODO: 使用 PotatoChat 的 HTTP API / SDK 把 text 发送到 chat_id
# 示例(伪代码):
# await aiohttp.post("https://api.potatochat.example/send", json={"chat_id": chat_id, "text": text}, headers={"Authorization": "Bearer TOKEN"})
print(f"[send -> {chat_id}] {text[:200]}")
# --- 帮助函数 ---
def entry_id(e):
# 尝试取 guid/id/link/title 的组合做唯一 id
if 'id' in e and e.id:
return e.id
if 'guid' in e and e.guid:
return e.guid
if 'link' in e and e.link:
return e.link
# fallback hash
s = (e.get('title','') + e.get('summary','') + e.get('link','')).encode('utf-8')
return hashlib.sha256(s).hexdigest()
# --- DB 初始化 ---
async def init_db():
async with aiosqlite.connect(DB) as db:
await db.executescript("""
CREATE TABLE IF NOT EXISTS feeds (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE,
title TEXT,
fetch_interval INTEGER DEFAULT 600,
last_etag TEXT,
last_modified TEXT,
enabled INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
feed_id INTEGER,
chat_id TEXT,
created_at INTEGER
);
CREATE TABLE IF NOT EXISTS posts_seen (
id INTEGER PRIMARY KEY AUTOINCREMENT,
feed_id INTEGER,
entry_id TEXT,
published_at INTEGER
);
""")
await db.commit()
# --- 订阅管理接口(示例) ---
async def add_feed_if_not_exists(url, interval=600):
async with aiosqlite.connect(DB) as db:
cur = await db.execute("SELECT id FROM feeds WHERE url = ?", (url,))
r = await cur.fetchone()
if r:
return r[0]
await db.execute("INSERT INTO feeds(url, fetch_interval) VALUES(?,?)", (url, interval))
await db.commit()
cur = await db.execute("SELECT id FROM feeds WHERE url = ?", (url,))
r = await cur.fetchone()
return r[0]
async def subscribe_feed(url, chat_id):
feed_id = await add_feed_if_not_exists(url)
async with aiosqlite.connect(DB) as db:
await db.execute("INSERT INTO subscriptions(feed_id, chat_id, created_at) VALUES(?,?,?)", (feed_id, chat_id, int(time.time())))
await db.commit()
return feed_id
async def get_all_feeds():
async with aiosqlite.connect(DB) as db:
cur = await db.execute("SELECT id, url, fetch_interval, last_etag, last_modified FROM feeds WHERE enabled=1")
return await cur.fetchall()
async def get_subscribers(feed_id):
async with aiosqlite.connect(DB) as db:
cur = await db.execute("SELECT chat_id FROM subscriptions WHERE feed_id = ?", (feed_id,))
rows = await cur.fetchall()
return [r[0] for r in rows]
async def mark_seen(feed_id, entryid, published_ts=0):
async with aiosqlite.connect(DB) as db:
await db.execute("INSERT INTO posts_seen(feed_id, entry_id, published_at) VALUES(?,?,?)", (feed_id, entryid, published_ts))
await db.commit()
async def already_seen(feed_id, entryid):
async with aiosqlite.connect(DB) as db:
cur = await db.execute("SELECT 1 FROM posts_seen WHERE feed_id=? AND entry_id=? LIMIT 1", (feed_id, entryid))
return await cur.fetchone() is not None
# --- 抓取任务 ---
async def fetch_and_dispatch_feed(session, feed_row):
feed_id, url, interval, last_etag, last_modified = feed_row
headers = {}
if last_etag:
headers['If-None-Match'] = last_etag
if last_modified:
headers['If-Modified-Since'] = last_modified
try:
async with session.get(url, headers=headers, timeout=30) as resp:
if resp.status == 304:
# not modified
return
text = await resp.read()
parsed = feedparser.parse(text)
# update etag/last-modified if present
etag = resp.headers.get('ETag') or parsed.get('etag')
lastmod = resp.headers.get('Last-Modified') or parsed.get('modified')
# store back to DB
async with aiosqlite.connect(DB) as db:
await db.execute("UPDATE feeds SET last_etag=?, last_modified=? WHERE id=?", (etag, lastmod, feed_id))
await db.commit()
entries = parsed.entries
# process entries in chronological order (oldest first)
entries.sort(key=lambda e: e.get('published_parsed') or e.get('updated_parsed') or time.gmtime(0))
subs = await get_subscribers(feed_id)
for e in entries:
eid = entry_id(e)
if await already_seen(feed_id, eid):
continue
title = e.get('title','(no title)')
link = e.get('link','')
summary = e.get('summary','')
published = e.get('published','')
# format message
msg = f"{title}\n{link}\n{published}\n{summary}"
# dispatch to subscribers (could be batched)
for chat_id in subs:
await send_message_to_potatochat(chat_id, msg)
# mark seen
ts = int(time.time())
if 'published_parsed' in e and e.published_parsed:
ts = int(time.mktime(e.published_parsed))
await mark_seen(feed_id, eid, ts)
except Exception as ex:
print("Fetch error", url, ex)
# --- 调度器主循环 ---
async def scheduled_job():
async with aiohttp.ClientSession() as session:
feeds = await get_all_feeds()
tasks = [fetch_and_dispatch_feed(session, f) for f in feeds]
await asyncio.gather(*tasks, return_exceptions=True)
async def run():
await init_db()
scheduler = AsyncIOScheduler()
# 每分钟检查一轮,根据 feed.fetch_interval 可分别执行更细粒度调度(此处为简单例子)
scheduler.add_job(lambda: asyncio.create_task(scheduled_job()), 'interval', minutes=1)
scheduler.start()
# 保持进程运行
while True:
await asyncio.sleep(3600)
if __name__ == "__main__":
asyncio.run(run())
说明:
- 该示例是最小可用版本:实现了订阅、抓取、去重并发送消息。生产化时需要更多错误处理、并发控制和优化。
- send_message_to_potatochat 需要用 PotatoChat 的 API token 和接口实现真实发送。
- feedparser 用于解析 RSS/Atom,aiohttp 拉取内容,aiosqlite 用于异步 sqlite 操作。
- 可以把 scheduler 改为针对每个 feed 单独调度(根据 fetch_interval)以支持不同抓取频率。
- 命令处理(示例)
- 当机器人收到聊天命令(通过 PotatoChat 的消息 webhook)时:
- /subscribe <rss_url> -> 调用 subscribe_feed(url, chat_id)
- /unsubscribe <rss_url> 或 /unsubscribe <feed_id> -> 从 subscriptions 表删除
- /list -> 查询并返回该 chat 的订阅列表
- /feeds -> 仅管理员可用:列出所有已注册 feed
- /force_fetch <feed_id> -> 立即抓取并分发
- 扩展功能(可选)
- 支持关键字过滤(只推送含某关键词的条目)
- 支持每频道自定义模板(Markdown 或 HTML)
- 支持文章全文抓取(readability)或提取主要图片并带图推送
- 支持多语言和多实例部署
- 将 posts_seen 限长以避免无限增长(只保留最近 N 条或定期清理)
- 监控与运维
- 将异常日志上报(例如 Sentry)
- 指标(抓取失败率、平均延迟、消息发送速率)上 Prometheus
- 健康检查接口(/healthz)
最后
- 如果你愿意我可以:
- 把上述模板改为更完整的项目仓库结构并提供 Dockerfile;
- 根据 PotatoChat 的具体 API(给我 API 文档或示例请求)把 send_message_to_potatochat 函数替换为真正实现,并完成命令 webhook 的示例;
- 或者给出 Node.js / Go 的实现示例。
你想怎么继续?需要我把代码改为针对 PotatoChat API 的具体实现,还是生成一个可直接运行的 Docker 项目?