-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreddit_scraper_agent.py
107 lines (94 loc) · 3.16 KB
/
reddit_scraper_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python3
"""
Reddit Scraping Agent with Task Planner
This script creates a Reddit agent that:
1. Uses TaskPlanner to generate a dynamic plan for scraping Reddit
2. Uses GoogleSearchTool to find Reddit posts matching a search query
3. Uses BrowserTool to visit each URL and extract the content
4. Organizes and saves the results
"""
import asyncio
from nodetool.agents.agent import Agent
from nodetool.chat.dataframes import json_schema_for_dataframe
from nodetool.chat.providers import get_provider
from nodetool.agents.tools.browser import BrowserTool, GoogleSearchTool
from nodetool.chat.providers.base import ChatProvider
from nodetool.metadata.types import ColumnDef, Provider
from nodetool.workflows.processing_context import ProcessingContext
from nodetool.workflows.types import Chunk
async def test_reddit_scraper_agent(provider: ChatProvider, model: str):
context = ProcessingContext()
search_agent = Agent(
name="Reddit Search Agent",
objective="""
Search for Reddit posts in the r/StableDiffusion subreddit using Google Search
Return a list of URLs to the Reddit posts
""",
provider=provider,
model=model,
enable_analysis_phase=False,
enable_data_contracts_phase=False,
tools=[
GoogleSearchTool(workspace_dir=str(context.workspace_dir)),
],
output_schema={
"type": "array",
"items": {
"type": "string",
},
},
)
# 7. Execute each task in the plan
async for item in search_agent.execute(context):
if isinstance(item, Chunk):
print(item.content, end="", flush=True)
image_urls = search_agent.get_results()
print("Image URLs:")
print(image_urls)
image_scraper_agent = Agent(
name="Image Scraper Agent",
objective=f"""
Visit each URL and extract the image tags using CSS img selector.
Return a list of image URLs.
Use the remote browser to visit the URLs.
The URLs are:
{image_urls}
""",
provider=provider,
model=model,
enable_analysis_phase=False,
enable_data_contracts_phase=True,
tools=[
BrowserTool(workspace_dir=str(context.workspace_dir)),
],
output_schema={
"type": "array",
"items": {
"type": "string",
},
},
)
# 7. Execute each task in the plan
async for item in image_scraper_agent.execute(context):
if isinstance(item, Chunk):
print(item.content, end="", flush=True)
image_urls = image_scraper_agent.get_results()
print("Image URLs:")
print(image_urls)
if __name__ == "__main__":
asyncio.run(
test_reddit_scraper_agent(
provider=get_provider(Provider.OpenAI), model="gpt-4o-mini"
)
)
asyncio.run(
test_reddit_scraper_agent(
provider=get_provider(Provider.Gemini), model="gemini-2.0-flash"
)
)
asyncio.run(
test_reddit_scraper_agent(
provider=get_provider(Provider.Anthropic),
model="claude-3-5-sonnet-20241022",
)
)