Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions agent/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,51 @@
import json

import requests


def callback(job, connection, result, *args, **kwargs):
from agent.server import Server

press_url = Server().press_url
requests.post(url=f"{press_url}/api/method/press.api.callbacks.callback", data={"job_id": job.id})
server = Server()
press_url = server.press_url

path = "/api/method/press.api.callbacks.callback"
data = {"job_id": job.id}
token = server.config["agent_token"]

requests.post(
url=f"{press_url}{path}",
data=data,
headers={"X-Agent-Token": token},
)


def update_callback(job):
from agent.server import Server

server = Server()
press_url = server.press_url

job_string = json.dumps(job, default=str)

path = "/api/method/press.api.callbacks.update_job"

data = {
"job": job_string,
"server": server.name,
}

token = server.config["agent_token"]

try:
response = requests.post(
url=f"{press_url}{path}",
data=data,
headers={"X-Agent-Token": token},
timeout=10,
)

return response.ok

except requests.RequestException:
return False
24 changes: 23 additions & 1 deletion agent/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,18 @@ def ping_server(password: str):
@click.option("--proxy-ip", required=False, type=str, default=None)
@click.option("--sentry-dsn", required=False, type=str)
@click.option("--press-url", required=False, type=str)
def config(name, user, workers, job_timeout, proxy_ip=None, sentry_dsn=None, press_url=None, db_port=3306):
@click.option("--agent-token", required=False, type=str)
def config(
name,
user,
workers,
job_timeout,
proxy_ip=None,
sentry_dsn=None,
press_url=None,
db_port=3306,
agent_token=None,
):
config = {
"benches_directory": f"/home/{user}/benches",
"name": name,
Expand All @@ -91,6 +102,8 @@ def config(name, user, workers, job_timeout, proxy_ip=None, sentry_dsn=None, pre
"db_port": db_port,
"job_timeout": job_timeout,
}
if agent_token:
config["agent_token"] = agent_token
if press_url:
config["press_url"] = press_url
if proxy_ip:
Expand All @@ -102,6 +115,15 @@ def config(name, user, workers, job_timeout, proxy_ip=None, sentry_dsn=None, pre
json.dump(config, f, sort_keys=True, indent=4)


@setup.command()
@click.option("--agent-token", required=True)
def agent_token(agent_token):
server = Server()
config = server.get_config(for_update=True)
config["agent_token"] = agent_token
server.set_config(config, indent=4)


@setup.command()
def pyspy():
privileges_line = "frappe ALL = (root) NOPASSWD: /home/frappe/agent/env/bin/py-spy"
Expand Down
36 changes: 35 additions & 1 deletion agent/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,38 @@
)


def update_job(model: Model):
if isinstance(model, StepModel):
model = model.job

connection().sadd("dirty_jobs", model.id)


def get_updated_jobs():
from agent.web import to_dict

redis = connection()
res = []

with redis.pipeline() as pipe:
pipe.smembers("dirty_jobs")
pipe.delete("dirty_jobs")

result, _ = pipe.execute()

job_ids = [int(i) for i in result]

if not job_ids:
return []

for jid in job_ids:
job = JobModel.get(JobModel.id == jid)
temp = to_dict(job)
res.append((temp, job))

return res


def connection():
from agent.server import Server

Expand All @@ -65,6 +97,8 @@ def save(wrapped, instance: Action, args, kwargs):
wrapped(*args, **kwargs)
instance.model.save()

update_job(instance.model)


class Action:
if TYPE_CHECKING:
Expand Down Expand Up @@ -158,7 +192,7 @@ def step(name):
def wrapper(wrapped, instance: Base, args, kwargs):
from agent.base import AgentException

instance.step_record.start(name, instance.job_record.model.id)
instance.step_record.start(name, instance.job_record.model)
try:
result = wrapped(*args, **kwargs)
except AgentException as e:
Expand Down
166 changes: 166 additions & 0 deletions agent/job_update_poll.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import base64
import json
import time
from datetime import datetime, timezone

import requests

from agent.callbacks import update_callback
from agent.job import get_updated_jobs, update_job


def verify_token_expiry(token):
"""
Returns True if token expires in less than 7 days.
Returns False otherwise.
"""

try:
parts = token.split(".")

if len(parts) != 3:
return True

payload_b64 = parts[1]

# Add required padding
payload_b64 += "=" * (-len(payload_b64) % 4)

payload_json = base64.urlsafe_b64decode(payload_b64)
payload = json.loads(payload_json)

exp = payload.get("exp")

if not exp:
return True

expiry_time = datetime.fromtimestamp(exp, tz=timezone.utc)
now = datetime.now(timezone.utc)

remaining = expiry_time - now

return remaining.total_seconds() < (7 * 24 * 60 * 60)

except Exception:
return True


def get_regenerate_token():
from agent.server import Server

server = Server()
press_url = server.press_url

path = "/api/method/press.api.agent_auth.regenerate_token"

token = server.config["agent_token"]

try:
response = requests.post(
f"{press_url}{path}",
headers={"X-Agent-Token": token},
timeout=30,
)

data = response.json()

return data["message"]
except requests.RequestException:
return False


def retry_undelivered():
from agent.server import Server

server = Server()
press_url = server.press_url

path = "/api/method/press.api.callbacks.retry_undelivered"

token = server.config["agent_token"]

try:
response = requests.get(url=f"{press_url}{path}", headers={"X-Agent-Token": token}, timeout=10)

return response.ok
except requests.RequestException:
return False


def handle_retry(counter: int) -> int:
"""Retry undelivered jobs every 10 seconds."""
if counter >= 2:
retry_undelivered()
return 0

return counter


def handle_token_refresh(server, counter: int) -> int:
"""Check and refresh token every 5 minutes."""
if counter >= 60:
token = server.config["agent_token"]

if verify_token_expiry(token):
new_token = get_regenerate_token()

if new_token:
from agent.server import Server

server = Server()

server.update_config(
{
"agent_token": new_token,
}
)

return 0

return counter


def process_jobs():
"""Process updated jobs."""
jobs = get_updated_jobs()

for job_dict, job in jobs:
success = update_callback(job_dict)

if not success:
update_job(job)


def run():
from agent.server import Server

server = Server()

retry_counter = 0
token_check_counter = 0

while True:
if not server.config.get("enable_feature_worker", False):
continue

try:
retry_counter = handle_retry(retry_counter)

token_check_counter = handle_token_refresh(
server,
token_check_counter,
)

process_jobs()

except Exception as e:
print(e)

retry_counter += 1
token_check_counter += 1

time.sleep(5)


if __name__ == "__main__":
run()
8 changes: 6 additions & 2 deletions agent/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ def discover_targets(self):
def fetch_targets(self):
press_url = self.config.get("press_url")
press_token = self.config.get("press_token")

path = "/api/method/press.api.monitoring.targets"
data = {"token": press_token}

return requests.post(
f"{press_url}/api/method/press.api.monitoring.targets",
data={"token": press_token},
f"{press_url}{path}",
data=data,
).json()["message"]

def generate_prometheus_sites_config(self, benches):
Expand Down
1 change: 1 addition & 0 deletions agent/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,7 @@ def _generate_supervisor_config(self):
"directory": self.directory,
"user": self.config["user"],
"sentry_dsn": self.config.get("sentry_dsn"),
"enable_feature_worker": self.config.get("enable_feature_worker", False), # Default for now
"is_standalone": self.config.get("standalone", False),
}
if self.config.get("name").startswith("n") and not self.config.get("name").startswith("nat"):
Expand Down
12 changes: 11 additions & 1 deletion agent/templates/agent/supervisor.conf.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ stderr_logfile={{ directory }}/logs/worker.error.log
user={{ user }}
directory={{ directory }}

[program:job_update_poll]
command=bash -c "{{ directory }}/repo/wait-for-it.sh redis://127.0.0.1:{{ redis_port }} && exec {{ directory }}/env/bin/python {{ directory }}/repo/agent/job_update_poll.py"
environment=PYTHONUNBUFFERED=1
autostart=true
autorestart=true
stdout_logfile={{ directory }}/logs/job_update_poll.log
stderr_logfile={{ directory }}/logs/job_update_poll.error.log
user={{ user }}
directory={{ directory }}

{% if is_proxy_server or is_standalone %}
[program:nginx_reload_manager]
command=bash -c "{{ directory }}/repo/wait-for-it.sh redis://127.0.0.1:{{ redis_port }} && exec {{directory}}/env/bin/python {{ directory }}/repo/agent/nginx_reload_manager.py"
Expand All @@ -46,7 +56,7 @@ directory={{ directory }}

[group:agent]

{% set programs = "web, redis, worker" %}
{% set programs = "web, redis, worker, job_update_poll" %}

{% if is_proxy_server or is_standalone %}
{% set programs = programs + ", nginx_reload_manager" %}
Expand Down
Loading