+
+
-
${escapeHtml(job.domain || '')}
+
+ ${isCollapsed ? '▶' : '▼'}
+ ${escapeHtml(job.domain || '')}
+
Started ${fmtTime(job.started)}
${job.completed_at ? `
Completed ${fmtTime(job.completed_at)}
` : ''}
@@ -7387,23 +7848,54 @@ def job_sleep(job_domain: Optional[str], seconds: float, chunk: float = 1.0) ->
${renderProgress(progress, job.status)}
-
- Wordlist: ${escapeHtml(job.wordlist || 'default')}
- Interval: ${escapeHtml(job.interval || 0)}s
- Nikto: ${job.skip_nikto ? 'Skipped' : 'Enabled'}
-
-
${escapeHtml(job.message || '')}
- ${renderJobControls(job)}
-
- ${stepsHtml || '
Awaiting step updates…
'}
-
-
- ${logsHtml}
+
+
+ Wordlist: ${escapeHtml(job.wordlist || 'default')}
+ Interval: ${escapeHtml(job.interval || 0)}s
+ Nikto: ${job.skip_nikto ? 'Skipped' : 'Enabled'}
+
+
${escapeHtml(job.message || '')}
+ ${renderJobControls(job)}
+
+ ${stepsHtml || '
Awaiting step updates…
'}
+
+
+ ${logsHtml}
+
`;
});
jobsList.innerHTML = cards.join('');
+
+ // Update pagination controls
+ const paginationEl = document.querySelector('.jobs-pagination');
+ if (totalJobs > 5) {
+ paginationEl.style.display = 'flex';
+ document.getElementById('jobs-page-info').textContent = `Page ${jobsCurrentPage} of ${totalPages} (${totalJobs} jobs)`;
+ document.getElementById('jobs-prev-page').disabled = jobsCurrentPage <= 1;
+ document.getElementById('jobs-next-page').disabled = jobsCurrentPage >= totalPages;
+ } else {
+ paginationEl.style.display = 'none';
+ }
+}
+
+function toggleJobDetails(jobId) {
+ const isCollapsed = jobsCollapsedState[jobId] !== false;
+ jobsCollapsedState[jobId] = !isCollapsed;
+ localStorage.setItem('jobsCollapsedState', JSON.stringify(jobsCollapsedState));
+
+ const card = document.querySelector(`.job-card[data-job-id="${jobId}"]`);
+ if (card) {
+ const details = card.querySelector('.job-details');
+ const icon = card.querySelector('.job-toggle-icon');
+ if (details) {
+ details.style.display = isCollapsed ? 'block' : 'none';
+ }
+ if (icon) {
+ icon.textContent = isCollapsed ? '▼' : '▶';
+ }
+ }
}
function renderQueue(queue) {
@@ -9493,6 +9985,113 @@ def job_sleep(job_domain: Optional[str], seconds: float, chunk: float = 1.0) ->
});
}
+// Jobs pagination and filter controls
+const jobsFilterDomainInput = document.getElementById('jobs-filter-domain');
+const jobsFilterStatusSelect = document.getElementById('jobs-filter-status');
+const jobsClearFiltersBtn = document.getElementById('jobs-clear-filters');
+const jobsPerPageSelect = document.getElementById('jobs-per-page');
+const jobsPrevPageBtn = document.getElementById('jobs-prev-page');
+const jobsNextPageBtn = document.getElementById('jobs-next-page');
+const jobsExpandAllBtn = document.getElementById('jobs-expand-all-btn');
+const jobsCollapseAllBtn = document.getElementById('jobs-collapse-all-btn');
+
+// Initialize filter values from localStorage
+if (jobsFilterDomainInput) jobsFilterDomainInput.value = jobsFilterDomain;
+if (jobsFilterStatusSelect) jobsFilterStatusSelect.value = jobsFilterStatus;
+if (jobsPerPageSelect) jobsPerPageSelect.value = jobsPerPage.toString();
+
+// Filter domain input handler
+if (jobsFilterDomainInput) {
+ jobsFilterDomainInput.addEventListener('input', (e) => {
+ jobsFilterDomain = e.target.value;
+ localStorage.setItem('jobsFilterDomain', jobsFilterDomain);
+ jobsCurrentPage = 1;
+ renderJobs(latestRunningJobs);
+ });
+}
+
+// Filter status select handler
+if (jobsFilterStatusSelect) {
+ jobsFilterStatusSelect.addEventListener('change', (e) => {
+ jobsFilterStatus = e.target.value;
+ localStorage.setItem('jobsFilterStatus', jobsFilterStatus);
+ jobsCurrentPage = 1;
+ renderJobs(latestRunningJobs);
+ });
+}
+
+// Clear filters button
+if (jobsClearFiltersBtn) {
+ jobsClearFiltersBtn.addEventListener('click', () => {
+ jobsFilterDomain = '';
+ jobsFilterStatus = '';
+ jobsCurrentPage = 1;
+ localStorage.setItem('jobsFilterDomain', '');
+ localStorage.setItem('jobsFilterStatus', '');
+ if (jobsFilterDomainInput) jobsFilterDomainInput.value = '';
+ if (jobsFilterStatusSelect) jobsFilterStatusSelect.value = '';
+ renderJobs(latestRunningJobs);
+ });
+}
+
+// Per page select handler
+if (jobsPerPageSelect) {
+ jobsPerPageSelect.addEventListener('change', (e) => {
+ jobsPerPage = parseInt(e.target.value, 10);
+ localStorage.setItem('jobsPerPage', jobsPerPage.toString());
+ jobsCurrentPage = 1;
+ renderJobs(latestRunningJobs);
+ });
+}
+
+// Previous page button
+if (jobsPrevPageBtn) {
+ jobsPrevPageBtn.addEventListener('click', () => {
+ if (jobsCurrentPage > 1) {
+ jobsCurrentPage--;
+ renderJobs(latestRunningJobs);
+ }
+ });
+}
+
+// Next page button
+if (jobsNextPageBtn) {
+ jobsNextPageBtn.addEventListener('click', () => {
+ jobsCurrentPage++;
+ renderJobs(latestRunningJobs);
+ });
+}
+
+// Expand all jobs button
+if (jobsExpandAllBtn) {
+ jobsExpandAllBtn.addEventListener('click', () => {
+ jobsCollapsedState = {};
+ Object.keys(latestRunningJobs || []).forEach((_, idx) => {
+ const job = latestRunningJobs[idx];
+ if (job && job.domain) {
+ jobsCollapsedState[job.domain] = false;
+ }
+ });
+ localStorage.setItem('jobsCollapsedState', JSON.stringify(jobsCollapsedState));
+ renderJobs(latestRunningJobs);
+ });
+}
+
+// Collapse all jobs button
+if (jobsCollapseAllBtn) {
+ jobsCollapseAllBtn.addEventListener('click', () => {
+ jobsCollapsedState = {};
+ Object.keys(latestRunningJobs || []).forEach((_, idx) => {
+ const job = latestRunningJobs[idx];
+ if (job && job.domain) {
+ jobsCollapsedState[job.domain] = true;
+ }
+ });
+ localStorage.setItem('jobsCollapsedState', JSON.stringify(jobsCollapsedState));
+ renderJobs(latestRunningJobs);
+ });
+}
+
document.addEventListener('click', (event) => {
const header = event.target.closest('.collapsible-header');
if (!header) return;
@@ -9688,10 +10287,52 @@ def job_sleep(job_domain: Optional[str], seconds: float, chunk: float = 1.0) ->
if (event.target === detailOverlay) closeDetailModal();
});
+// Load workflows into launch form dropdown
+async function loadLaunchWorkflows() {
+ if (!launchWorkflow) return;
+
+ try {
+ const resp = await fetch('/api/workflows');
+ const data = await resp.json();
+ const workflows = data.workflows || [];
+ const defaultWorkflowId = data.default_workflow_id;
+
+ if (workflows.length === 0) {
+ launchWorkflow.innerHTML = '
';
+ return;
+ }
+
+ // Build options with default marked
+ const options = workflows.map(wf => {
+ const isDefault = wf.id === defaultWorkflowId;
+ const label = isDefault ? `${wf.name} (Default)` : wf.name;
+ const selected = isDefault ? 'selected' : '';
+ return `
`;
+ }).join('');
+
+ launchWorkflow.innerHTML = options;
+ } catch (err) {
+ console.error('Error loading workflows for launch form:', err);
+ launchWorkflow.innerHTML = '
';
+ }
+}
+
+// Load workflows when page loads and when switching to launch view
+loadLaunchWorkflows();
+navLinks.forEach(link => {
+ const originalClickHandler = link.onclick;
+ link.addEventListener('click', () => {
+ if (link.dataset.view === 'launch') {
+ loadLaunchWorkflows();
+ }
+ });
+});
+
launchForm.addEventListener('submit', async (event) => {
event.preventDefault();
const payload = {
domain: event.target.domain.value,
+ workflow_id: launchWorkflow ? launchWorkflow.value : '',
wordlist: launchWordlist.value,
interval: launchInterval.value,
skip_nikto: launchSkipNikto.checked,
@@ -9710,6 +10351,7 @@ def job_sleep(job_domain: Optional[str], seconds: float, chunk: float = 1.0) ->
if (data.success) {
event.target.reset();
launchFormDirty = false;
+ loadLaunchWorkflows(); // Reload to reset to default
fetchState();
}
} catch (err) {
@@ -10141,6 +10783,304 @@ def job_sleep(job_domain: Optional[str], seconds: float, chunk: float = 1.0) ->
});
}
+// ================== WORKFLOWS ==================
+
+const workflowForm = document.getElementById('workflow-form');
+const workflowIdInput = document.getElementById('workflow-id');
+const workflowNameInput = document.getElementById('workflow-name');
+const workflowDescriptionInput = document.getElementById('workflow-description');
+const workflowPhasesContainer = document.getElementById('workflow-phases-container');
+const workflowAddPhaseBtn = document.getElementById('workflow-add-phase');
+const workflowCancelBtn = document.getElementById('workflow-cancel');
+const workflowStatus = document.getElementById('workflow-status');
+const workflowsList = document.getElementById('workflows-list');
+
+let workflowPhases = [];
+let editingWorkflowId = null;
+
+const availableTools = [
+ 'amass', 'subfinder', 'assetfinder', 'findomain', 'sublist3r', 'crtsh',
+ 'github-subdomains', 'dnsx', 'ffuf', 'httpx', 'waybackurls', 'gau',
+ 'nuclei', 'nikto', 'gowitness', 'nmap', 'custom'
+];
+
+function renderWorkflowPhases() {
+ if (!workflowPhasesContainer) return;
+
+ workflowPhasesContainer.innerHTML = workflowPhases.map((phase, idx) => {
+ const isCustom = phase.tool === 'custom';
+ return `
+
+ `;
+ }).join('');
+
+ if (workflowPhases.length === 0) {
+ workflowPhasesContainer.innerHTML = '
No phases added yet. Click "Add Phase" to start building your workflow.
';
+ }
+}
+
+function addWorkflowPhase() {
+ workflowPhases.push({
+ tool: 'amass',
+ command: '',
+ flags: '',
+ input: '',
+ output: ''
+ });
+ renderWorkflowPhases();
+}
+
+function removeWorkflowPhase(idx) {
+ workflowPhases.splice(idx, 1);
+ renderWorkflowPhases();
+}
+
+function updatePhaseToolType(idx, tool) {
+ if (workflowPhases[idx]) {
+ workflowPhases[idx].tool = tool;
+ renderWorkflowPhases();
+ }
+}
+
+function updatePhaseField(idx, field, value) {
+ if (workflowPhases[idx]) {
+ workflowPhases[idx][field] = value;
+ }
+}
+
+if (workflowAddPhaseBtn) {
+ workflowAddPhaseBtn.addEventListener('click', addWorkflowPhase);
+}
+
+if (workflowCancelBtn) {
+ workflowCancelBtn.addEventListener('click', () => {
+ resetWorkflowForm();
+ });
+}
+
+function resetWorkflowForm() {
+ editingWorkflowId = null;
+ workflowPhases = [];
+ if (workflowIdInput) workflowIdInput.value = '';
+ if (workflowNameInput) workflowNameInput.value = '';
+ if (workflowDescriptionInput) workflowDescriptionInput.value = '';
+ renderWorkflowPhases();
+ if (workflowStatus) {
+ workflowStatus.textContent = '';
+ workflowStatus.className = 'status';
+ }
+}
+
+if (workflowForm) {
+ workflowForm.addEventListener('submit', async (event) => {
+ event.preventDefault();
+
+ if (!workflowNameInput || !workflowNameInput.value.trim()) {
+ if (workflowStatus) {
+ workflowStatus.textContent = 'Workflow name is required';
+ workflowStatus.className = 'status error';
+ }
+ return;
+ }
+
+ if (workflowPhases.length === 0) {
+ if (workflowStatus) {
+ workflowStatus.textContent = 'At least one phase is required';
+ workflowStatus.className = 'status error';
+ }
+ return;
+ }
+
+ const payload = {
+ name: workflowNameInput.value.trim(),
+ description: workflowDescriptionInput ? workflowDescriptionInput.value.trim() : '',
+ phases: workflowPhases
+ };
+
+ if (editingWorkflowId) {
+ payload.id = editingWorkflowId;
+ }
+
+ const endpoint = editingWorkflowId ? '/api/workflows/update' : '/api/workflows/create';
+
+ if (workflowStatus) {
+ workflowStatus.textContent = 'Saving...';
+ workflowStatus.className = 'status';
+ }
+
+ try {
+ const resp = await fetch(endpoint, {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify(payload),
+ });
+ const data = await resp.json();
+
+ if (workflowStatus) {
+ workflowStatus.textContent = data.message || 'Saved';
+ workflowStatus.className = 'status ' + (data.success ? 'success' : 'error');
+ }
+
+ if (data.success) {
+ resetWorkflowForm();
+ await loadWorkflows();
+ }
+ } catch (err) {
+ if (workflowStatus) {
+ workflowStatus.textContent = err.message;
+ workflowStatus.className = 'status error';
+ }
+ }
+ });
+}
+
+async function loadWorkflows() {
+ try {
+ const resp = await fetch('/api/workflows');
+ const data = await resp.json();
+ renderWorkflows(data.workflows || [], data.default_workflow_id);
+ } catch (err) {
+ console.error('Error loading workflows:', err);
+ }
+}
+
+function renderWorkflows(workflows, defaultWorkflowId) {
+ if (!workflowsList) return;
+
+ if (workflows.length === 0) {
+ workflowsList.innerHTML = '
No workflows created yet.
';
+ return;
+ }
+
+ const html = workflows.map(workflow => {
+ const isDefault = workflow.id === defaultWorkflowId;
+ return `
+
+
+
+
${escapeHtml(workflow.name)} ${isDefault ? 'Default' : ''}
+ ${workflow.description ? `
${escapeHtml(workflow.description)}
` : ''}
+
${workflow.phase_count} phases
+
+
+
+ ${!isDefault ? `` : ``}
+
+
+
+
+ `;
+ }).join('');
+
+ workflowsList.innerHTML = html;
+}
+
+async function editWorkflow(workflowId) {
+ try {
+ const resp = await fetch(`/api/workflow/${workflowId}`);
+ const data = await resp.json();
+
+ if (data.success && data.workflow) {
+ const workflow = data.workflow;
+ editingWorkflowId = workflow.id;
+ if (workflowIdInput) workflowIdInput.value = workflow.id;
+ if (workflowNameInput) workflowNameInput.value = workflow.name;
+ if (workflowDescriptionInput) workflowDescriptionInput.value = workflow.description || '';
+ workflowPhases = workflow.phases || [];
+ renderWorkflowPhases();
+
+ // Scroll to form
+ if (workflowForm) {
+ workflowForm.scrollIntoView({ behavior: 'smooth', block: 'start' });
+ }
+ }
+ } catch (err) {
+ alert(`Error loading workflow: ${err.message}`);
+ }
+}
+
+async function deleteWorkflow(workflowId) {
+ if (!confirm('Are you sure you want to delete this workflow?')) {
+ return;
+ }
+
+ try {
+ const resp = await fetch('/api/workflows/delete', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ id: workflowId }),
+ });
+ const data = await resp.json();
+
+ if (data.success) {
+ await loadWorkflows();
+ } else {
+ alert(`Delete failed: ${data.message}`);
+ }
+ } catch (err) {
+ alert(`Error deleting workflow: ${err.message}`);
+ }
+}
+
+async function setDefaultWorkflow(workflowId) {
+ try {
+ const resp = await fetch('/api/workflows/set-default', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ id: workflowId }),
+ });
+ const data = await resp.json();
+
+ if (data.success) {
+ await loadWorkflows();
+ } else {
+ alert(`Failed to set default: ${data.message}`);
+ }
+ } catch (err) {
+ alert(`Error setting default workflow: ${err.message}`);
+ }
+}
+
+// Load workflows when the workflows tab is shown
+document.querySelectorAll('.nav-link').forEach(link => {
+ link.addEventListener('click', () => {
+ if (link.getAttribute('data-view') === 'workflows') {
+ loadWorkflows();
+ }
+ });
+});
+
// ================== LOGS VIEW ==================
function saveLogFilters() {
@@ -10792,11 +11732,11 @@ def resume_target_scan(domain: str, wordlist: Optional[str] = None,
cleaned = str(wordlist).strip()
if cleaned:
wordlist_val = cleaned
- return start_pipeline_job(normalized, wordlist_val, skip_flag, None)
+ return start_pipeline_job(normalized, wordlist_val, skip_flag, None, None)
def start_targets_from_input(domain_input: str, wordlist: Optional[str],
- skip_nikto: bool, interval: Optional[int]) -> Tuple[bool, str, List[Dict[str, Any]]]:
+ skip_nikto: bool, interval: Optional[int], workflow_id: Optional[str] = None) -> Tuple[bool, str, List[Dict[str, Any]]]:
cfg = get_config()
cleaned = _sanitize_domain_input(domain_input)
requested_any_tld = bool(cleaned.endswith(".*"))
@@ -10808,7 +11748,7 @@ def start_targets_from_input(domain_input: str, wordlist: Optional[str],
details: List[Dict[str, Any]] = []
success_any = False
for target in targets:
- success, message = start_pipeline_job(target, wordlist, skip_nikto, interval)
+ success, message = start_pipeline_job(target, wordlist, skip_nikto, interval, workflow_id)
if success:
success_any = True
details.append({
@@ -10831,7 +11771,7 @@ def start_targets_from_input(domain_input: str, wordlist: Optional[str],
return success_any, " ".join(summary_parts).strip(), details
-def start_pipeline_job(domain: str, wordlist: Optional[str], skip_nikto: bool, interval: Optional[int]) -> Tuple[bool, str]:
+def start_pipeline_job(domain: str, wordlist: Optional[str], skip_nikto: bool, interval: Optional[int], workflow_id: Optional[str] = None) -> Tuple[bool, str]:
normalized = (domain or "").strip().lower()
if not normalized:
return False, "Domain is required."
@@ -10843,6 +11783,11 @@ def start_pipeline_job(domain: str, wordlist: Optional[str], skip_nikto: bool, i
wordlist_path = default_wordlist.strip()
else:
wordlist_path = str(wordlist).strip()
+
+ # Use provided workflow_id or get default workflow
+ if not workflow_id:
+ default_workflow = get_default_workflow()
+ workflow_id = default_workflow["id"] if default_workflow else None
with JOB_LOCK:
if normalized in RUNNING_JOBS:
@@ -10857,6 +11802,7 @@ def start_pipeline_job(domain: str, wordlist: Optional[str], skip_nikto: bool, i
"wordlist": wordlist_path,
"skip_nikto": skip_nikto,
"interval": interval_val,
+ "workflow_id": workflow_id,
"status": "queued",
"message": "Waiting for a free slot.",
"steps": init_job_steps(skip_nikto),
@@ -11656,6 +12602,22 @@ def do_GET(self):
if self.path == "/api/backups":
self._send_json({"backups": list_backups()})
return
+ if self.path == "/api/workflows":
+ workflows = list_workflows()
+ default_workflow = get_default_workflow()
+ self._send_json({
+ "workflows": workflows,
+ "default_workflow_id": default_workflow["id"] if default_workflow else None
+ })
+ return
+ if self.path.startswith("/api/workflow/"):
+ workflow_id = unquote(self.path[len("/api/workflow/"):])
+ workflow = get_workflow(workflow_id)
+ if workflow:
+ self._send_json({"success": True, "workflow": workflow})
+ else:
+ self._send_json({"success": False, "message": "Workflow not found"}, status=HTTPStatus.NOT_FOUND)
+ return
if self.path.startswith("/api/backup/download/"):
backup_filename = unquote(self.path[len("/api/backup/download/"):])
@@ -11836,6 +12798,10 @@ def do_POST(self):
"/api/backup/create",
"/api/backup/restore",
"/api/backup/delete",
+ "/api/workflows/create",
+ "/api/workflows/update",
+ "/api/workflows/delete",
+ "/api/workflows/set-default",
}
if self.path not in allowed:
self.send_error(HTTPStatus.NOT_FOUND, "Not Found")
@@ -11917,6 +12883,7 @@ def do_POST(self):
if self.path == "/api/run":
domain = payload.get("domain", "")
wordlist = payload.get("wordlist")
+ workflow_id = payload.get("workflow_id")
interval_val = payload.get("interval")
interval_int: Optional[int] = None
if interval_val not in (None, ""):
@@ -11927,7 +12894,7 @@ def do_POST(self):
skip_default = get_config().get("skip_nikto_by_default", False)
skip_nikto = bool_from_value(payload.get("skip_nikto"), skip_default)
- success, message, _ = start_targets_from_input(domain, wordlist, skip_nikto, interval_int)
+ success, message, _ = start_targets_from_input(domain, wordlist, skip_nikto, interval_int, workflow_id)
status = HTTPStatus.OK if success else HTTPStatus.BAD_REQUEST
self._send_json({"success": success, "message": message}, status=status)
return
@@ -11955,6 +12922,39 @@ def do_POST(self):
status = HTTPStatus.OK if success else HTTPStatus.BAD_REQUEST
self._send_json({"success": success, "message": message}, status=status)
return
+
+ if self.path == "/api/workflows/create":
+ name = payload.get("name", "")
+ description = payload.get("description", "")
+ phases = payload.get("phases", [])
+ success, message, workflow_id = create_workflow(name, description, phases)
+ status = HTTPStatus.OK if success else HTTPStatus.BAD_REQUEST
+ self._send_json({"success": success, "message": message, "workflow_id": workflow_id}, status=status)
+ return
+
+ if self.path == "/api/workflows/update":
+ workflow_id = payload.get("id", "")
+ name = payload.get("name", "")
+ description = payload.get("description", "")
+ phases = payload.get("phases", [])
+ success, message = update_workflow(workflow_id, name, description, phases)
+ status = HTTPStatus.OK if success else HTTPStatus.BAD_REQUEST
+ self._send_json({"success": success, "message": message}, status=status)
+ return
+
+ if self.path == "/api/workflows/delete":
+ workflow_id = payload.get("id", "")
+ success, message = delete_workflow(workflow_id)
+ status = HTTPStatus.OK if success else HTTPStatus.BAD_REQUEST
+ self._send_json({"success": success, "message": message}, status=status)
+ return
+
+ if self.path == "/api/workflows/set-default":
+ workflow_id = payload.get("id") # None to clear default
+ success, message = set_default_workflow(workflow_id)
+ status = HTTPStatus.OK if success else HTTPStatus.BAD_REQUEST
+ self._send_json({"success": success, "message": message}, status=status)
+ return
success, message, cfg = update_config_settings(payload)
status = HTTPStatus.OK if success else HTTPStatus.BAD_REQUEST
diff --git a/test_main.py b/test_main.py
index fc00ae4..36a4ecf 100644
--- a/test_main.py
+++ b/test_main.py
@@ -1703,6 +1703,221 @@ def test_backup_filename_validation(self):
assert ".." in name or "/" in name or "\\" in name
+class TestWorkflows:
+ """Tests for workflow management functionality"""
+
+ def setup_method(self):
+ """Setup test fixtures"""
+ # Create a temporary database for testing
+ self.temp_db = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.db')
+ self.temp_db_path = Path(self.temp_db.name)
+ self.temp_db.close()
+
+ # Mock the database connection
+ self.original_get_db = main.get_db
+
+ def mock_get_db():
+ conn = sqlite3.connect(str(self.temp_db_path), check_same_thread=False)
+ conn.row_factory = sqlite3.Row
+ return conn
+
+ main.get_db = mock_get_db
+
+ # Initialize database schema
+ main.init_database()
+
+ def teardown_method(self):
+ """Cleanup test fixtures"""
+ # Restore original function
+ main.get_db = self.original_get_db
+
+ # Remove temp database
+ if self.temp_db_path.exists():
+ self.temp_db_path.unlink()
+
+ def test_ensure_default_workflow(self):
+ """Test that default workflow is created automatically"""
+ # Call ensure_default_workflow
+ main.ensure_default_workflow()
+
+ # Check that default workflow exists
+ workflows = main.list_workflows()
+ assert len(workflows) > 0
+
+ default_workflows = [w for w in workflows if w['is_default']]
+ assert len(default_workflows) == 1
+
+ default = default_workflows[0]
+ assert default['name'] == "Default Recon Pipeline"
+ assert default['is_default'] is True
+ assert len(default['phases']) > 0
+
+ def test_create_workflow(self):
+ """Test creating a custom workflow"""
+ phases = [
+ {"tool": "amass", "command": "", "flags": "-passive", "input": "", "output": ""},
+ {"tool": "httpx", "command": "", "flags": "", "input": "$INPUT$", "output": "$OUTPUT$"}
+ ]
+
+ success, message, workflow_id = main.create_workflow(
+ "Test Workflow",
+ "A test workflow",
+ phases
+ )
+
+ assert success is True
+ assert workflow_id is not None
+
+ # Verify workflow was created
+ workflow = main.get_workflow(workflow_id)
+ assert workflow is not None
+ assert workflow['name'] == "Test Workflow"
+ assert len(workflow['phases']) == 2
+
+ def test_update_workflow(self):
+ """Test updating an existing workflow"""
+ # Create a workflow first
+ phases = [{"tool": "amass", "command": "", "flags": "", "input": "", "output": ""}]
+ success, message, workflow_id = main.create_workflow("Original", "Original desc", phases)
+ assert success is True
+
+ # Update it
+ new_phases = [
+ {"tool": "subfinder", "command": "", "flags": "", "input": "", "output": ""},
+ {"tool": "httpx", "command": "", "flags": "", "input": "", "output": ""}
+ ]
+ success, message = main.update_workflow(workflow_id, "Updated", "Updated desc", new_phases)
+ assert success is True
+
+ # Verify updates
+ workflow = main.get_workflow(workflow_id)
+ assert workflow['name'] == "Updated"
+ assert workflow['description'] == "Updated desc"
+ assert len(workflow['phases']) == 2
+
+ def test_delete_non_default_workflow(self):
+ """Test deleting a non-default workflow"""
+ # Create a workflow
+ phases = [{"tool": "amass", "command": "", "flags": "", "input": "", "output": ""}]
+ success, message, workflow_id = main.create_workflow("To Delete", "Will be deleted", phases)
+ assert success is True
+
+ # Delete it
+ success, message = main.delete_workflow(workflow_id)
+ assert success is True
+
+ # Verify deletion
+ workflow = main.get_workflow(workflow_id)
+ assert workflow is None
+
+ def test_cannot_delete_default_workflow(self):
+ """Test that default workflow cannot be deleted"""
+ # Ensure default workflow exists
+ main.ensure_default_workflow()
+
+ # Get default workflow
+ workflows = main.list_workflows()
+ default_workflow = next((w for w in workflows if w['is_default']), None)
+ assert default_workflow is not None
+
+ # Try to delete default workflow
+ success, message = main.delete_workflow(default_workflow['id'])
+ assert success is False
+ assert "cannot delete the default workflow" in message.lower()
+
+ def test_set_default_workflow(self):
+ """Test setting a workflow as default"""
+ # Ensure original default exists
+ main.ensure_default_workflow()
+
+ # Create a new workflow
+ phases = [{"tool": "amass", "command": "", "flags": "", "input": "", "output": ""}]
+ success, message, new_workflow_id = main.create_workflow("New Default", "New default workflow", phases)
+ assert success is True
+
+ # Set it as default
+ success, message = main.set_default_workflow(new_workflow_id)
+ assert success is True
+
+ # Verify it's now default
+ workflow = main.get_workflow(new_workflow_id)
+ assert workflow['is_default'] is True
+
+ # Verify old default is no longer default
+ workflows = main.list_workflows()
+ default_workflows = [w for w in workflows if w['is_default']]
+ assert len(default_workflows) == 1
+ assert default_workflows[0]['id'] == new_workflow_id
+
+ def test_list_workflows(self):
+ """Test listing all workflows"""
+ # Ensure default workflow exists
+ main.ensure_default_workflow()
+
+ # Create additional workflows
+ phases = [{"tool": "amass", "command": "", "flags": "", "input": "", "output": ""}]
+ main.create_workflow("Workflow 1", "First", phases)
+ main.create_workflow("Workflow 2", "Second", phases)
+
+ # List workflows
+ workflows = main.list_workflows()
+ assert len(workflows) >= 3 # At least default + 2 created
+
+ # Check structure
+ for workflow in workflows:
+ assert 'id' in workflow
+ assert 'name' in workflow
+ assert 'phases' in workflow
+ assert 'is_default' in workflow
+
+ def test_start_pipeline_job_with_workflow(self):
+ """Test that jobs can be started with a specific workflow"""
+ # Ensure default workflow exists
+ main.ensure_default_workflow()
+
+ # Get default workflow
+ default_workflow = main.get_default_workflow()
+ assert default_workflow is not None
+
+ # Mock job starting (to avoid actually running tools)
+ with patch('main._start_job_thread'):
+ success, message = main.start_pipeline_job(
+ "test.com",
+ None,
+ False,
+ None,
+ default_workflow['id']
+ )
+
+ # Job should be queued successfully
+ assert success is True
+
+ # Check that job has workflow_id
+ with main.JOB_LOCK:
+ job = main.RUNNING_JOBS.get("test.com")
+ assert job is not None
+ assert job.get('workflow_id') == default_workflow['id']
+
+ def test_workflow_api_endpoints(self):
+ """Test workflow API endpoint data structure"""
+ # Ensure default workflow exists
+ main.ensure_default_workflow()
+
+ # Test list_workflows returns proper format for API
+ workflows = main.list_workflows()
+ default_workflow = main.get_default_workflow()
+
+ # Simulate API response
+ api_response = {
+ "workflows": workflows,
+ "default_workflow_id": default_workflow['id'] if default_workflow else None
+ }
+
+ assert isinstance(api_response['workflows'], list)
+ assert api_response['default_workflow_id'] is not None
+ assert len(api_response['workflows']) > 0
+
+
if __name__ == '__main__':
# Run tests with pytest
pytest.main([__file__, '-v', '--tb=short'])