-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmain.py
More file actions
156 lines (131 loc) · 4.63 KB
/
main.py
File metadata and controls
156 lines (131 loc) · 4.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3.12
"""DeepGraph - Hierarchical ML Research Knowledge Engine."""
import os
import sys
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from compat.filelock import FileLock
from config import (
APP_NAME,
AUTO_PIPELINE_ENABLED,
AUTO_RESEARCH_ENABLED,
BACKFILL_GRAPH_ON_START,
IDEA_WORKSPACE_DIR,
REFRESH_MERGE_CANDIDATES_ON_START,
ROOT_NODE_ID,
WEB_HOST,
WEB_PORT,
WORKSPACE_DIR,
PDF_CACHE_DIR,
)
from db.database import describe_backend, init_db
from db.evidence_graph import (
backfill_entity_resolutions,
backfill_graph_from_structured_data,
refresh_merge_candidates,
)
from db.taxonomy import seed_taxonomy, backfill_result_taxonomy
from web.app import app
_PROCESS_LOCK = None
_PROCESS_LOCK_PATH = (
Path(os.environ.get("TEMP", str(Path.home() / ".cache"))) / "deepgraph-main.lock"
if os.name == "nt"
else Path("/tmp/deepgraph-main.lock")
)
def _current_lock_owner() -> str | None:
try:
owner = _PROCESS_LOCK_PATH.read_text(encoding="utf-8").strip()
except OSError:
return None
return owner or None
def _try_acquire_process_lock() -> bool:
global _PROCESS_LOCK
if _PROCESS_LOCK is not None:
return True
lock = FileLock(str(_PROCESS_LOCK_PATH))
if not lock.try_acquire():
return False
try:
handle = getattr(lock, "_handle")
handle.seek(0)
handle.truncate()
handle.write(f"{os.getpid()}\n")
handle.flush()
except OSError:
lock.release()
return False
_PROCESS_LOCK = lock
return True
def _release_process_lock() -> None:
global _PROCESS_LOCK
if _PROCESS_LOCK is None:
return
try:
_PROCESS_LOCK.release()
finally:
_PROCESS_LOCK = None
def _serve_http() -> None:
print(f"Starting {APP_NAME} at http://{WEB_HOST}:{WEB_PORT} (root node: {ROOT_NODE_ID})", flush=True)
try:
from waitress import serve
except ImportError:
print(
"Waitress is not installed; falling back to Flask dev server. "
"Install waitress for production deployments.",
flush=True,
)
app.run(host=WEB_HOST, port=WEB_PORT, debug=False, threaded=True)
return
serve(app, host=WEB_HOST, port=WEB_PORT, threads=8)
def main():
if not _try_acquire_process_lock():
owner = _current_lock_owner()
if owner:
print(f"DeepGraph main already running under pid {owner}; refusing duplicate startup.", flush=True)
else:
print("DeepGraph main already running; refusing duplicate startup.", flush=True)
return
# Ensure directories exist
WORKSPACE_DIR.mkdir(parents=True, exist_ok=True)
PDF_CACHE_DIR.mkdir(parents=True, exist_ok=True)
IDEA_WORKSPACE_DIR.mkdir(parents=True, exist_ok=True)
try:
# Initialize database
print("Initializing database...", flush=True)
init_db()
backend = describe_backend()
print("Database ready.", flush=True)
print(f"Database target: {backend['target']} ({backend['backend']})", flush=True)
# Seed taxonomy tree
print("Seeding taxonomy tree...", flush=True)
seed_taxonomy()
print("Taxonomy ready.", flush=True)
print("Backfilling result taxonomy links...", flush=True)
backfill_result_taxonomy()
print("Result taxonomy ready.", flush=True)
print("Backfilling entity resolution map...", flush=True)
backfill_entity_resolutions()
print("Entity resolutions ready.", flush=True)
# Skip heavy backfills on startup for faster boot
# These can run in the background via pipeline
print("Skipping graph/merge backfill (run in pipeline instead).", flush=True)
if AUTO_PIPELINE_ENABLED:
from orchestrator.paper_worker import start as start_paper_worker
print("Starting paper ingestion worker...", flush=True)
start_paper_worker()
print("Paper ingestion worker ready.", flush=True)
if AUTO_RESEARCH_ENABLED:
from orchestrator.auto_research import start as start_auto_research
from orchestrator.gpu_scheduler import start as start_gpu_scheduler
print("Starting Auto Research worker...", flush=True)
start_auto_research()
print("Starting GPU scheduler...", flush=True)
start_gpu_scheduler()
print("Auto Research worker ready.", flush=True)
# Start web server
_serve_http()
finally:
_release_process_lock()
if __name__ == "__main__":
main()