Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added R1 web extractor feature #1115

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 112 additions & 82 deletions examples/R1_web_crawler/R1_web_crawler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import json
import time
import requests
from dotenv import load_dotenv
from openai import OpenAI
from serpapi import GoogleSearch
from firecrawl import FirecrawlApp
from serpapi.google_search import GoogleSearch

# ANSI color codes
class Colors:
Expand All @@ -22,17 +22,18 @@ class Colors:
# Initialize clients
client = OpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"), base_url="https://api.deepseek.com")
firecrawl_api_key = os.getenv("FIRECRAWL_API_KEY")
serp_api_key = os.getenv("SERP_API_KEY")

def search_google(query):
"""Search Google using SerpAPI and return top results."""
print(f"{Colors.YELLOW}Searching Google for '{query}'...{Colors.RESET}")
search = GoogleSearch({"q": query, "api_key": os.getenv("SERP_API_KEY")})
search = GoogleSearch({"q": query, "api_key": serp_api_key})
return search.get_dict().get("organic_results", [])

def select_urls_with_r1(company, objective, serp_results):
"""
Use R1 to select the most relevant URLs from SERP results for the given company and objective.
Returns a JSON object with a "selected_urls" property that is an array of strings.
Returns a list of URLs.
"""
try:
# Prepare the data for R1
Expand All @@ -44,7 +45,7 @@ def select_urls_with_r1(company, objective, serp_results):
messages=[
{
"role": "system",
"content": "You select URLs from the SERP results relevant to the company and objective."
"content": "You are a URL selector that always responds with valid JSON. You select URLs from the SERP results relevant to the company and objective. Your response must be a JSON object with a 'selected_urls' array property containing strings."
},
{
"role": "user",
Expand All @@ -53,90 +54,127 @@ def select_urls_with_r1(company, objective, serp_results):
f"Objective: {objective}\n"
f"SERP Results: {json.dumps(serp_data)}\n\n"
"Return a JSON object with a property 'selected_urls' that contains an array "
"of URLs most likely to help meet the objective. If you think the data might not be on the homepage, add a /* to the end of the URL. Do not return any social media links. For example: {\"selected_urls\": [\"https://example.com\", \"https://example2.com\"]}"
"of URLs most likely to help meet the objective. Add a /* to the end of the URL if you think it should search all of the pages in the site. Do not return any social media links. For example: {\"selected_urls\": [\"https://example.com\", \"https://example2.com\"]}"
)
}
],
]
)

# The response is guaranteed to follow the specified JSON schema
result = json.loads(response.choices[0].message.content)
urls = result.get("selected_urls", [])
return urls
try:
# First try to parse as JSON
result = json.loads(response.choices[0].message.content)
if isinstance(result, dict) and "selected_urls" in result:
urls = result["selected_urls"]
else:
# If JSON doesn't have the expected structure, fall back to text parsing
response_text = response.choices[0].message.content
urls = [line.strip() for line in response_text.split('\n')
if line.strip().startswith(('http://', 'https://'))]
except json.JSONDecodeError:
# If JSON parsing fails, fall back to text parsing
response_text = response.choices[0].message.content
urls = [line.strip() for line in response_text.split('\n')
if line.strip().startswith(('http://', 'https://'))]

# Clean up URLs - remove wildcards and trailing slashes
cleaned_urls = [url.replace('/*', '').rstrip('/') for url in urls]
cleaned_urls = [url for url in cleaned_urls if url]

if not cleaned_urls:
print(f"{Colors.YELLOW}No valid URLs found.{Colors.RESET}")
return []

print(f"{Colors.CYAN}Selected URLs for extraction by R1:{Colors.RESET}")
for url in cleaned_urls:
print(f"- {url}")

return cleaned_urls

except Exception as e:
print(f"{Colors.RED}Error selecting URLs with R1: {e}{Colors.RESET}")
return []



def extract_company_info(urls, prompt, company, api_key):
"""Use requests to call Firecrawl's extract endpoint with selected URLs."""
print(f"{Colors.YELLOW}Extracting structured data from the provided URLs using Firecrawl's /extract endpoint...{Colors.RESET}")
app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
print(f"{Colors.YELLOW}Extracting structured data from the provided URLs using Firecrawl...{Colors.RESET}")

headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}

payload = {
"urls": urls,
"prompt": prompt + " for " + company,
"enableWebSearch": True
}

try:
extract_prompt = prompt + " for " + company
response = app.extract(urls, {"prompt": extract_prompt, "enableWebSearch": True})
print(response)
return response
response = requests.post(
"https://api.firecrawl.dev/v1/extract",
headers=headers,
json=payload,
timeout=30
)

data = response.json()

if not data.get('success'):
print(f"{Colors.RED}API returned error: {data.get('error', 'No error message')}{Colors.RESET}")
return None

# Assuming Firecrawl provides a way to retrieve data with 'id'
extraction_id = data.get('id')
if not extraction_id:
print(f"{Colors.RED}No extraction ID found in response.{Colors.RESET}")
return None

# Polling for the extraction result
return poll_firecrawl_result(extraction_id, api_key)

except requests.exceptions.RequestException as e:
print(f"{Colors.RED}Request failed: {e}{Colors.RESET}")
return None
except json.JSONDecodeError as e:
print(f"{Colors.RED}Failed to parse response: {e}{Colors.RESET}")
return None
except Exception as e:
print(f"{Colors.RED}Failed to extract data: {e}{Colors.RESET}")
return None

def deduplicate_with_r1(data, company, objective):
"""Use R1 to deduplicate and consolidate extracted information."""
print(f"{Colors.YELLOW}Deduplicating and consolidating information using R1...{Colors.RESET}")

try:
# Ensure data is valid JSON before sending
if not data:
return {}

response = client.chat.completions.create(
model="deepseek-reasoner",
messages=[
{
"role": "system",
"content": "You are an expert at consolidating information and removing duplicates. Analyze the extracted data and provide a clean, consolidated response."
},
{
"role": "user",
"content": (
f"Company: {company}\n"
f"Objective: {objective}\n"
f"Extracted Data: {json.dumps(data, indent=2)}\n\n"
"Please analyze this data and:\n"
"1. Remove any duplicate information\n"
"2. Consolidate similar points\n"
"3. Format the response as a clean JSON object\n"
"4. Ensure all information is relevant to the objective\n"
"Return only the JSON response."
)
}
],
)

# Handle empty or invalid responses
response_text = response.choices[0].message.content.strip()
if not response_text:
return {}

def poll_firecrawl_result(extraction_id, api_key, interval=5, max_attempts=12):
"""Poll Firecrawl API to get the extraction result."""
url = f"https://api.firecrawl.dev/v1/extract/{extraction_id}"
headers = {
'Authorization': f'Bearer {api_key}'
}

for attempt in range(1, max_attempts + 1):
try:
consolidated_data = json.loads(response_text)
return consolidated_data
# print(f"{Colors.YELLOW}Polling for extraction result (Attempt {attempt}/{max_attempts})...{Colors.RESET}")
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()

if data.get('success') and data.get('data'):
print(f"{Colors.GREEN}Data successfully extracted:{Colors.RESET}")
print(json.dumps(data['data'], indent=2))
return data['data']
elif data.get('success') and not data.get('data'):
time.sleep(interval)
else:
print(f"{Colors.RED}API Error: {data.get('error', 'No error message provided')}{Colors.RESET}")
return None

except requests.exceptions.RequestException:
return None
except json.JSONDecodeError:
# If JSON parsing fails, try to extract JSON from the response
# Look for content between curly braces
start = response_text.find('{')
end = response_text.rfind('}')
if start >= 0 and end >= 0:
json_str = response_text[start:end+1]
return json.loads(json_str)
return {}

except Exception as e:
print(f"{Colors.RED}Error deduplicating data with R1: {e}{Colors.RESET}")
return data
return None
except Exception:
return None

print(f"{Colors.RED}Max polling attempts reached. Extraction did not complete in time.{Colors.RESET}")
return None

def main():
company = input(f"{Colors.BLUE}Enter the company name: {Colors.RESET}")
Expand All @@ -154,20 +192,12 @@ def main():
print(f"{Colors.RED}R1 did not return any URLs.{Colors.RESET}")
return

print(f"{Colors.CYAN}Selected URLs for extraction by R1:{Colors.RESET}")
for url in selected_urls:
print(f"- {url}")

data = extract_company_info(selected_urls, objective, company, firecrawl_api_key)

if data and data.get('success') and data.get('data'):
# Deduplicate and consolidate the extracted data
consolidated_data = deduplicate_with_r1(data['data'], company, objective)

print(f"\n{Colors.GREEN}Consolidated and deduplicated data:{Colors.RESET}")
print(json.dumps(consolidated_data, indent=2))
if data:
print(f"{Colors.GREEN}Extraction completed successfully.{Colors.RESET}")
else:
print(f"{Colors.RED}Failed to extract the requested information. Try refining your prompt or choosing a different company.{Colors.RESET}")

if __name__ == "__main__":
main()
main()
Loading