573. PotatoChat RSS订阅机器人

看起来你想做一个“PotatoChat RSS 订阅机器人”(项目 #573)。下面给出一个完整的设计与实现示例(以 Python 为主),包含功能规划、数据模型、抓取与去重策略、命令交互、示例代码、部署建议和常见注意点。你可以按需把示例代码改成适配 PotatoChat 的具体 API(我在代码里留了必须替换的钩子)。

573. PotatoChat RSS订阅机器人

  1. 功能需求(建议)
  • 频道/用户级别订阅管理:subscribe、unsubscribe、list
  • 自动定期抓取多个 RSS/Atom 源
  • 增量推送:仅推送新的条目(去重/记最后项 id 或时间)
  • 可配置推送格式(标题、链接、摘要、发布时间)
  • 错误与重复处理:重试、限流、黑名单源
  • 管理命令(查看源状态、强制抓取、手工重发)
  • 持久化配置(数据库),小规模可用 sqlite
  • 日志与监控(异常报警)
  1. 数据模型(SQLite 示例)
  • feeds 表:id, url, title(optional), fetch_interval, last_guid_or_etag, last_modified, enabled
  • subscriptions 表:id, feed_id, chat_id (或 channel_id), created_at, options(json)
  • posts_seen 表:id (primary), feed_id, entry_id (GUID 或 link), published_at
  • logs(可选)
  1. 抓取与去重策略
  • 抓取时使用 If-None-Match / If-Modified-Since(若 feed 支持),以减流量
  • 使用 feed 中的 guid/id/link/published 作为唯一标识,若没有 guid 可以用链接+标题的 hash
  • 存储最近 N 条 entry_id(或只存最近时间点/ETag)用于去重
  • 对于大量订阅者,先聚合新增条目再批量分发到订阅的频道,避免重复抓取/推送
  1. 推送格式(示例)
  • 简短:标题 + 链接
  • 富信息:标题、发布时间、来源、摘要/前几行 + 原文链接
  • 带媒体:若 entry 有 enclosure/image,可附上图片或缩略图(依平台支持)
  1. 限流与退避
  • 每个 feed 遵循最低抓取间隔(例如 10 分钟起步)
  • 对网络错误使用指数退避
  • 控制每分钟消息发送数以不触发平台限流
  1. 权限与安全
  • 管理命令仅允许管理员或频道管理员使用
  • 对外部 URL 做安全校验(避免 SSRF 或恶意内容)
  • 存储 API token 时加密/妥善保管
  1. 部署建议
  • 使用 Docker 容器 + systemd / k8s
  • 持续运行任务:使用 APScheduler / celery / cron(示例用 APScheduler 或 asyncio loop)
  • 日志与健康检查接口
  1. 示例实现(Python,asyncio + aiohttp + feedparser + aiosqlite)
    下面给出一个精简但可运行的模版,你需要把 send_message_to_potatochat(…) 替换为 PotatoChat 平台的 API 调用实现(或 webhook)。用 pip 安装依赖: feedparser aiohttp aiosqlite apscheduler

示例代码(简化版):

  • requirements:

    • pip install aiohttp feedparser aiosqlite apscheduler
  • main.py(核心逻辑):

import asyncio
import hashlib
import time
from datetime import datetime
import feedparser
import aiohttp
import aiosqlite
from apscheduler.schedulers.asyncio import AsyncIOScheduler

DB = "rss_bot.db"

# --- 平台相关:请实现这个函数来向 PotatoChat 发送消息 ---
async def send_message_to_potatochat(chat_id: str, text: str):
    # TODO: 使用 PotatoChat 的 HTTP API / SDK 把 text 发送到 chat_id
    # 示例(伪代码):
    # await aiohttp.post("https://api.potatochat.example/send", json={"chat_id": chat_id, "text": text}, headers={"Authorization": "Bearer TOKEN"})
    print(f"[send -> {chat_id}] {text[:200]}")

# --- 帮助函数 ---
def entry_id(e):
    # 尝试取 guid/id/link/title 的组合做唯一 id
    if 'id' in e and e.id:
        return e.id
    if 'guid' in e and e.guid:
        return e.guid
    if 'link' in e and e.link:
        return e.link
    # fallback hash
    s = (e.get('title','') + e.get('summary','') + e.get('link','')).encode('utf-8')
    return hashlib.sha256(s).hexdigest()

# --- DB 初始化 ---
async def init_db():
    async with aiosqlite.connect(DB) as db:
        await db.executescript("""
        CREATE TABLE IF NOT EXISTS feeds (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            url TEXT UNIQUE,
            title TEXT,
            fetch_interval INTEGER DEFAULT 600,
            last_etag TEXT,
            last_modified TEXT,
            enabled INTEGER DEFAULT 1
        );
        CREATE TABLE IF NOT EXISTS subscriptions (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            feed_id INTEGER,
            chat_id TEXT,
            created_at INTEGER
        );
        CREATE TABLE IF NOT EXISTS posts_seen (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            feed_id INTEGER,
            entry_id TEXT,
            published_at INTEGER
        );
        """)
        await db.commit()

# --- 订阅管理接口(示例) ---
async def add_feed_if_not_exists(url, interval=600):
    async with aiosqlite.connect(DB) as db:
        cur = await db.execute("SELECT id FROM feeds WHERE url = ?", (url,))
        r = await cur.fetchone()
        if r:
            return r[0]
        await db.execute("INSERT INTO feeds(url, fetch_interval) VALUES(?,?)", (url, interval))
        await db.commit()
        cur = await db.execute("SELECT id FROM feeds WHERE url = ?", (url,))
        r = await cur.fetchone()
        return r[0]

async def subscribe_feed(url, chat_id):
    feed_id = await add_feed_if_not_exists(url)
    async with aiosqlite.connect(DB) as db:
        await db.execute("INSERT INTO subscriptions(feed_id, chat_id, created_at) VALUES(?,?,?)", (feed_id, chat_id, int(time.time())))
        await db.commit()
    return feed_id

async def get_all_feeds():
    async with aiosqlite.connect(DB) as db:
        cur = await db.execute("SELECT id, url, fetch_interval, last_etag, last_modified FROM feeds WHERE enabled=1")
        return await cur.fetchall()

async def get_subscribers(feed_id):
    async with aiosqlite.connect(DB) as db:
        cur = await db.execute("SELECT chat_id FROM subscriptions WHERE feed_id = ?", (feed_id,))
        rows = await cur.fetchall()
        return [r[0] for r in rows]

async def mark_seen(feed_id, entryid, published_ts=0):
    async with aiosqlite.connect(DB) as db:
        await db.execute("INSERT INTO posts_seen(feed_id, entry_id, published_at) VALUES(?,?,?)", (feed_id, entryid, published_ts))
        await db.commit()

async def already_seen(feed_id, entryid):
    async with aiosqlite.connect(DB) as db:
        cur = await db.execute("SELECT 1 FROM posts_seen WHERE feed_id=? AND entry_id=? LIMIT 1", (feed_id, entryid))
        return await cur.fetchone() is not None

# --- 抓取任务 ---
async def fetch_and_dispatch_feed(session, feed_row):
    feed_id, url, interval, last_etag, last_modified = feed_row
    headers = {}
    if last_etag:
        headers['If-None-Match'] = last_etag
    if last_modified:
        headers['If-Modified-Since'] = last_modified
    try:
        async with session.get(url, headers=headers, timeout=30) as resp:
            if resp.status == 304:
                # not modified
                return
            text = await resp.read()
            parsed = feedparser.parse(text)
            # update etag/last-modified if present
            etag = resp.headers.get('ETag') or parsed.get('etag')
            lastmod = resp.headers.get('Last-Modified') or parsed.get('modified')
            # store back to DB
            async with aiosqlite.connect(DB) as db:
                await db.execute("UPDATE feeds SET last_etag=?, last_modified=? WHERE id=?", (etag, lastmod, feed_id))
                await db.commit()

            entries = parsed.entries
            # process entries in chronological order (oldest first)
            entries.sort(key=lambda e: e.get('published_parsed') or e.get('updated_parsed') or time.gmtime(0))
            subs = await get_subscribers(feed_id)
            for e in entries:
                eid = entry_id(e)
                if await already_seen(feed_id, eid):
                    continue
                title = e.get('title','(no title)')
                link = e.get('link','')
                summary = e.get('summary','')
                published = e.get('published','')
                # format message
                msg = f"{title}\n{link}\n{published}\n{summary}"
                # dispatch to subscribers (could be batched)
                for chat_id in subs:
                    await send_message_to_potatochat(chat_id, msg)
                # mark seen
                ts = int(time.time())
                if 'published_parsed' in e and e.published_parsed:
                    ts = int(time.mktime(e.published_parsed))
                await mark_seen(feed_id, eid, ts)
    except Exception as ex:
        print("Fetch error", url, ex)

# --- 调度器主循环 ---
async def scheduled_job():
    async with aiohttp.ClientSession() as session:
        feeds = await get_all_feeds()
        tasks = [fetch_and_dispatch_feed(session, f) for f in feeds]
        await asyncio.gather(*tasks, return_exceptions=True)

async def run():
    await init_db()
    scheduler = AsyncIOScheduler()
    # 每分钟检查一轮,根据 feed.fetch_interval 可分别执行更细粒度调度(此处为简单例子)
    scheduler.add_job(lambda: asyncio.create_task(scheduled_job()), 'interval', minutes=1)
    scheduler.start()
    # 保持进程运行
    while True:
        await asyncio.sleep(3600)

if __name__ == "__main__":
    asyncio.run(run())

说明:

  • 该示例是最小可用版本:实现了订阅、抓取、去重并发送消息。生产化时需要更多错误处理、并发控制和优化。
  • send_message_to_potatochat 需要用 PotatoChat 的 API token 和接口实现真实发送。
  • feedparser 用于解析 RSS/Atom,aiohttp 拉取内容,aiosqlite 用于异步 sqlite 操作。
  • 可以把 scheduler 改为针对每个 feed 单独调度(根据 fetch_interval)以支持不同抓取频率。
  1. 命令处理(示例)
  • 当机器人收到聊天命令(通过 PotatoChat 的消息 webhook)时:
    • /subscribe <rss_url> -> 调用 subscribe_feed(url, chat_id)
    • /unsubscribe <rss_url> 或 /unsubscribe <feed_id> -> 从 subscriptions 表删除
    • /list -> 查询并返回该 chat 的订阅列表
    • /feeds -> 仅管理员可用:列出所有已注册 feed
    • /force_fetch <feed_id> -> 立即抓取并分发
  1. 扩展功能(可选)
  • 支持关键字过滤(只推送含某关键词的条目)
  • 支持每频道自定义模板(Markdown 或 HTML)
  • 支持文章全文抓取(readability)或提取主要图片并带图推送
  • 支持多语言和多实例部署
  • 将 posts_seen 限长以避免无限增长(只保留最近 N 条或定期清理)
  1. 监控与运维
  • 将异常日志上报(例如 Sentry)
  • 指标(抓取失败率、平均延迟、消息发送速率)上 Prometheus
  • 健康检查接口(/healthz)

最后

  • 如果你愿意我可以:
    • 把上述模板改为更完整的项目仓库结构并提供 Dockerfile;
    • 根据 PotatoChat 的具体 API(给我 API 文档或示例请求)把 send_message_to_potatochat 函数替换为真正实现,并完成命令 webhook 的示例;
    • 或者给出 Node.js / Go 的实现示例。

你想怎么继续?需要我把代码改为针对 PotatoChat API 的具体实现,还是生成一个可直接运行的 Docker 项目?