diff --git a/agent_reach/channels/__init__.py b/agent_reach/channels/__init__.py index f2a0149..4893590 100644 --- a/agent_reach/channels/__init__.py +++ b/agent_reach/channels/__init__.py @@ -23,7 +23,7 @@ from .xiaoyuzhou import XiaoyuzhouChannel from .v2ex import V2EXChannel from .xueqiu import XueqiuChannel - +from .toutiao import ToutiaoChannel ALL_CHANNELS: List[Channel] = [ @@ -40,6 +40,7 @@ XiaoyuzhouChannel(), V2EXChannel(), XueqiuChannel(), + ToutiaoChannel(), RSSChannel(), ExaSearchChannel(), WebChannel(), diff --git a/agent_reach/channels/toutiao.py b/agent_reach/channels/toutiao.py new file mode 100644 index 0000000..6c7d54c --- /dev/null +++ b/agent_reach/channels/toutiao.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +"""Toutiao (今日头条) — search articles and trending content.""" + +import json +import re +import urllib.parse +import urllib.request +from typing import Any, List, Tuple + +from .base import Channel + +_UA = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" +_TIMEOUT = 10 +_SEARCH_URL = ( + "https://so.toutiao.com/search" + "?dvpf=pc&source=input&keyword={keyword}&enable_druid_v2=1" +) + +_SKIP_TEMPLATES = ("Search", "Bottom", "76-", "20-", "26-", "67-baike") + + +def _fetch_html(url: str) -> str: + """Fetch URL and return HTML string.""" + req = urllib.request.Request(url, headers={"User-Agent": _UA}) + with urllib.request.urlopen(req, timeout=_TIMEOUT) as resp: + return resp.read().decode("utf-8") + + +def _parse_search_results(html: str) -> list: + """Extract article results from Toutiao search page HTML. + + The page embeds each search result as a JSON object inside a ", html, re.DOTALL) + articles = [] + for s in scripts: + if len(s) < 1000: + continue + if not s.strip().startswith("{"): + continue + try: + data = json.loads(s).get("data", {}) + except (json.JSONDecodeError, ValueError): + continue + if not isinstance(data, dict): + continue + + title = data.get("title", "") + tpl = data.get("template_key", "") + if not title: + continue + if any(tpl.startswith(p) for p in _SKIP_TEMPLATES): + continue + + article_url = ( + (data.get("display") or {}).get("info", {}).get("url", "") + or data.get("article_url", "") + or data.get("source_url", "") + ) + + if not article_url: + continue + + articles.append({ + "title": title, + "url": article_url, + "source": data.get("media_name", "") or data.get("source", ""), + "abstract": (data.get("abstract", "") or "")[:300], + "publish_time": data.get("publish_time"), + "read_count": data.get("read_count"), + "comment_count": data.get("comment_count"), + }) + return articles + + +class ToutiaoChannel(Channel): + name = "toutiao" + description = "今日头条搜索与资讯" + backends = ["Toutiao Web (public)"] + tier = 0 + + def can_handle(self, url: str) -> bool: + from urllib.parse import urlparse + d = urlparse(url).netloc.lower() + return "toutiao.com" in d + + def check(self, config=None) -> Tuple[str, str]: + try: + test_url = _SEARCH_URL.format(keyword="test") + req = urllib.request.Request(test_url, headers={"User-Agent": _UA}) + with urllib.request.urlopen(req, timeout=_TIMEOUT) as resp: + if resp.status == 200: + content = resp.read().decode("utf-8") + if "data" in content or "title" in content: + return "ok", "头条搜索可用(搜索文章、视频、资讯)" + return "warn", "头条搜索返回非预期内容" + return "warn", "头条搜索返回非 200 状态" + except Exception as e: + return "warn", f"头条搜索连接失败(可能需要代理):{e}" + + def search(self, keyword: str, limit: int = 10) -> list: + """搜索头条文章。 + + Args: + keyword: 搜索关键词 + limit: 最多返回条数 + + Returns: + list of dicts with keys: + title, url, source, abstract, publish_time, read_count, comment_count + """ + encoded = urllib.parse.quote(keyword) + url = _SEARCH_URL.format(keyword=encoded) + html = _fetch_html(url) + return _parse_search_results(html)[:limit] \ No newline at end of file diff --git a/tests/test_channel_contracts.py b/tests/test_channel_contracts.py index 5fe4b74..fdce590 100644 --- a/tests/test_channel_contracts.py +++ b/tests/test_channel_contracts.py @@ -118,6 +118,7 @@ def test_channel_can_handle_contract(): "weibo": "https://weibo.com/u/1749127163", "rss": "https://example.com/feed.xml", "xueqiu": "https://xueqiu.com/S/SH600519", + "toutiao": "https://so.toutiao.com/search?keyword=test", "exa_search": "https://example.com", "web": "https://example.com", } diff --git a/tests/test_toutiao_channel.py b/tests/test_toutiao_channel.py new file mode 100644 index 0000000..4445a67 --- /dev/null +++ b/tests/test_toutiao_channel.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +"""Tests for Toutiao channel.""" + +import unittest +from unittest.mock import patch, MagicMock + +from agent_reach.channels.toutiao import ToutiaoChannel, _parse_search_results + + +class TestToutiaoChannel(unittest.TestCase): + + def setUp(self): + self.ch = ToutiaoChannel() + + def test_can_handle_toutiao_urls(self): + assert self.ch.can_handle("https://www.toutiao.com/article/123") + assert self.ch.can_handle("https://so.toutiao.com/search?keyword=test") + assert self.ch.can_handle("https://m.toutiao.com/abc") + + def test_can_handle_rejects_other_urls(self): + assert not self.ch.can_handle("https://www.baidu.com") + assert not self.ch.can_handle("https://weibo.com/123") + assert not self.ch.can_handle("https://github.com/user/repo") + + def test_check_ok(self): + mock_resp = MagicMock() + mock_resp.status = 200 + mock_resp.read = lambda: b'{"data":{"title":"test"}}' + + mock_enter = MagicMock(return_value=mock_resp) + mock_exit = MagicMock(return_value=False) + mock_resp.__enter__ = mock_enter + mock_resp.__exit__ = mock_exit + + with patch("urllib.request.urlopen", return_value=mock_resp): + status, msg = self.ch.check() + assert status == "ok" + + def test_check_network_error(self): + with patch("urllib.request.urlopen", side_effect=Exception("timeout")): + status, msg = self.ch.check() + assert status == "warn" + assert "连接失败" in msg + + def test_channel_attributes(self): + assert self.ch.name == "toutiao" + assert self.ch.tier == 0 + assert len(self.ch.backends) > 0 + + def test_parse_search_results_with_article(self): + abstract = "摘要内容" + "x" * 1000 + html = ( + '' + ) + results = _parse_search_results(html) + assert len(results) == 1 + assert results[0]["title"] == "测试文章" + assert results[0]["source"] == "测试媒体" + assert results[0]["url"] == "https://www.toutiao.com/group/123/" + + def test_parse_search_results_skips_non_article(self): + html = ( + '' + ) + results = _parse_search_results(html) + assert len(results) == 0 + + def test_parse_search_results_empty_html(self): + results = _parse_search_results("") + assert results == [] + + def test_parse_search_results_skips_short_scripts(self): + html = "" + results = _parse_search_results(html) + assert results == [] \ No newline at end of file