diff --git a/backend/src/monitor.rs b/backend/src/monitor.rs index e012887d70d2c..a943313bd471b 100644 --- a/backend/src/monitor.rs +++ b/backend/src/monitor.rs @@ -2196,7 +2196,18 @@ async fn handle_zombie_jobs(db: &Pool, base_internal_url: &str, worker } let jobs = sqlx::query_as::<_, QueuedJob>( - "SELECT *, null as workflow_as_code_status FROM v2_as_queue WHERE id = ANY($1)", + "SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, q.started_at, q.scheduled_for, q.running, + j.runnable_id AS script_hash, j.runnable_path AS script_path, j.args, j.raw_code, + q.canceled_by IS NOT NULL AS canceled, q.canceled_by, q.canceled_reason, r.ping AS last_ping, + j.kind AS job_kind, CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path, + j.permissioned_as, COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, j.raw_flow, + j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, q.suspend, q.suspend_until, + j.same_worker, j.raw_lock, j.pre_run_error, j.permissioned_as_email AS email, j.visible_to_owner, + r.memory_peak AS mem_peak, j.flow_innermost_root_job AS root_job, s.flow_leaf_jobs AS leaf_jobs, + j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.timeout, j.flow_step_id, j.cache_ttl, + j.priority, NULL::TEXT AS logs, j.script_entrypoint_override, j.preprocessed, null as workflow_as_code_status + FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id) + WHERE j.id = ANY($1)", ) .bind(&timeouts[..]) .fetch_all(db) @@ -2210,8 +2221,19 @@ async fn handle_zombie_jobs(db: &Pool, base_internal_url: &str, worker let non_restartable_jobs = if *RESTART_ZOMBIE_JOBS { vec![] } else { - sqlx::query_as::<_, QueuedJob>("SELECT *, null as workflow_as_code_status FROM v2_as_queue WHERE last_ping < now() - ($1 || ' seconds')::interval - AND running = true AND job_kind NOT IN ('flow', 'flowpreview', 'flownode', 'singlescriptflow') AND same_worker = false") + sqlx::query_as::<_, QueuedJob>("SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, q.started_at, q.scheduled_for, q.running, + j.runnable_id AS script_hash, j.runnable_path AS script_path, j.args, j.raw_code, + q.canceled_by IS NOT NULL AS canceled, q.canceled_by, q.canceled_reason, r.ping AS last_ping, + j.kind AS job_kind, CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path, + j.permissioned_as, COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, j.raw_flow, + j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, q.suspend, q.suspend_until, + j.same_worker, j.raw_lock, j.pre_run_error, j.permissioned_as_email AS email, j.visible_to_owner, + r.memory_peak AS mem_peak, j.flow_innermost_root_job AS root_job, s.flow_leaf_jobs AS leaf_jobs, + j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.timeout, j.flow_step_id, j.cache_ttl, + j.priority, NULL::TEXT AS logs, j.script_entrypoint_override, j.preprocessed, null as workflow_as_code_status + FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id) + WHERE r.ping < now() - ($1 || ' seconds')::interval + AND q.running = true AND j.kind NOT IN ('flow', 'flowpreview', 'flownode', 'singlescriptflow') AND j.same_worker = false") .bind(ZOMBIE_JOB_TIMEOUT.as_str()) .fetch_all(db) .await @@ -2236,7 +2258,18 @@ async fn handle_zombie_jobs(db: &Pool, base_internal_url: &str, worker } let zombie_jobs_restart_limit_reached = sqlx::query_as::<_, QueuedJob>( - "SELECT *, null as workflow_as_code_status FROM v2_as_queue WHERE id = ANY($1)", + "SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, q.started_at, q.scheduled_for, q.running, + j.runnable_id AS script_hash, j.runnable_path AS script_path, j.args, j.raw_code, + q.canceled_by IS NOT NULL AS canceled, q.canceled_by, q.canceled_reason, r.ping AS last_ping, + j.kind AS job_kind, CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path, + j.permissioned_as, COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, j.raw_flow, + j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, q.suspend, q.suspend_until, + j.same_worker, j.raw_lock, j.pre_run_error, j.permissioned_as_email AS email, j.visible_to_owner, + r.memory_peak AS mem_peak, j.flow_innermost_root_job AS root_job, s.flow_leaf_jobs AS leaf_jobs, + j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.timeout, j.flow_step_id, j.cache_ttl, + j.priority, NULL::TEXT AS logs, j.script_entrypoint_override, j.preprocessed, null as workflow_as_code_status + FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id) + WHERE j.id = ANY($1)", ) .bind(&zombie_jobs_uuid_restart_limit_reached[..]) .fetch_all(db) @@ -2431,13 +2464,13 @@ async fn handle_zombie_flows(db: &DB) -> error::Result<()> { let flows = sqlx::query!( r#" SELECT - id AS "id!", workspace_id AS "workspace_id!", parent_job, is_flow_step, - flow_status AS "flow_status: Box", last_ping, same_worker - FROM v2_as_queue - WHERE running = true AND suspend = 0 AND suspend_until IS null AND scheduled_for <= now() - AND (job_kind = 'flow' OR job_kind = 'flowpreview' OR job_kind = 'flownode') - AND last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval - AND canceled = false + j.id AS "id!", j.workspace_id AS "workspace_id!", j.parent_job, j.flow_step_id IS NOT NULL AS "is_flow_step?", + COALESCE(s.flow_status, s.workflow_as_code_status) AS "flow_status: Box", r.ping AS last_ping, j.same_worker AS "same_worker?" + FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id) + WHERE q.running = true AND q.suspend = 0 AND q.suspend_until IS null AND q.scheduled_for <= now() + AND (j.kind = 'flow' OR j.kind = 'flowpreview' OR j.kind = 'flownode') + AND r.ping IS NOT NULL AND r.ping < NOW() - ($1 || ' seconds')::interval + AND q.canceled_by IS NULL "#, FLOW_ZOMBIE_TRANSITION_TIMEOUT.as_str() diff --git a/backend/tests/common/mod.rs b/backend/tests/common/mod.rs index 748fae04c1d45..7e71990ebaeb6 100644 --- a/backend/tests/common/mod.rs +++ b/backend/tests/common/mod.rs @@ -309,7 +309,17 @@ pub async fn listen_for_uuid_on( pub async fn completed_job(uuid: Uuid, db: &Pool) -> CompletedJob { sqlx::query_as::<_, CompletedJob>( - "SELECT *, result->'wm_labels' as labels FROM v2_as_completed_job WHERE id = $1", + "SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, c.duration_ms, + c.status = 'success' OR c.status = 'skipped' AS success, j.runnable_id AS script_hash, j.runnable_path AS script_path, + j.args, c.result, FALSE AS deleted, j.raw_code, c.status = 'canceled' AS canceled, + c.canceled_by, c.canceled_reason, j.kind AS job_kind, + CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path, + j.permissioned_as, COALESCE(c.flow_status, c.workflow_as_code_status) AS flow_status, j.raw_flow, + j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, c.started_at, + c.status = 'skipped' AS is_skipped, j.raw_lock, j.permissioned_as_email AS email, j.visible_to_owner, + c.memory_peak AS mem_peak, j.tag, j.priority, NULL::TEXT AS logs, c.result_columns, + j.script_entrypoint_override, j.preprocessed, c.result->'wm_labels' as labels + FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1", ) .bind(uuid) .fetch_one(db) diff --git a/backend/tests/worker.rs b/backend/tests/worker.rs index f9a68ed5679aa..2bad024e7d03e 100644 --- a/backend/tests/worker.rs +++ b/backend/tests/worker.rs @@ -2369,7 +2369,7 @@ async fn test_script_schedule_handlers(db: Pool) -> anyhow::Result<()> let uuid = uuid.unwrap().unwrap(); let completed_job = sqlx::query!( - "SELECT script_path FROM v2_as_completed_job WHERE id = $1", + "SELECT j.runnable_path as script_path FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1", uuid ) .fetch_one(&db2) @@ -2440,7 +2440,7 @@ async fn test_script_schedule_handlers(db: Pool) -> anyhow::Result<()> let uuid = uuid.unwrap().unwrap(); let completed_job = - sqlx::query!("SELECT script_path FROM v2_as_completed_job WHERE id = $1", uuid) + sqlx::query!("SELECT j.runnable_path as script_path FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1", uuid) .fetch_one(&db2) .await .unwrap(); @@ -2527,7 +2527,7 @@ async fn test_flow_schedule_handlers(db: Pool) -> anyhow::Result<()> { let uuid = uuid.unwrap().unwrap(); let completed_job = sqlx::query!( - "SELECT script_path FROM v2_as_completed_job WHERE id = $1", + "SELECT j.runnable_path as script_path FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1", uuid ) .fetch_one(&db2) @@ -2599,7 +2599,7 @@ async fn test_flow_schedule_handlers(db: Pool) -> anyhow::Result<()> { let uuid = uuid.unwrap().unwrap(); let completed_job = - sqlx::query!("SELECT script_path FROM v2_as_completed_job WHERE id = $1", uuid) + sqlx::query!("SELECT j.runnable_path as script_path FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1", uuid) .fetch_one(&db2) .await .unwrap(); diff --git a/backend/windmill-api/src/approvals.rs b/backend/windmill-api/src/approvals.rs index e03b587f109e5..129600048142c 100644 --- a/backend/windmill-api/src/approvals.rs +++ b/backend/windmill-api/src/approvals.rs @@ -204,31 +204,31 @@ pub async fn get_approval_form_details( "WITH job_info AS ( -- Query for Teams (running jobs) SELECT - parent.job_kind AS \"job_kind!: JobKind\", - parent.script_hash AS \"script_hash: ScriptHash\", - parent.raw_flow AS \"raw_flow: sqlx::types::Json>\", - child.parent_job AS \"parent_job: Uuid\", - parent.created_at AS \"created_at!: chrono::NaiveDateTime\", - parent.created_by AS \"created_by!\", - parent.script_path, - parent.args AS \"args: sqlx::types::Json>\" - FROM v2_as_queue child - JOIN v2_as_queue parent ON parent.id = child.parent_job - WHERE child.id = $1 AND child.workspace_id = $2 + parent_j.kind AS \"job_kind!: JobKind\", + parent_j.runnable_id AS \"script_hash: ScriptHash\", + parent_j.raw_flow AS \"raw_flow: sqlx::types::Json>\", + child_j.parent_job AS \"parent_job: Uuid\", + parent_j.created_at AS \"created_at!: chrono::NaiveDateTime\", + parent_j.created_by AS \"created_by!\", + parent_j.runnable_path as script_path, + parent_j.args AS \"args: sqlx::types::Json>\" + FROM v2_job_queue child_q JOIN v2_job child_j USING (id) + JOIN v2_job parent_j ON parent_j.id = child_j.parent_job + WHERE child_j.id = $1 AND child_j.workspace_id = $2 UNION ALL -- Query for Slack (completed jobs) SELECT - v2_as_queue.job_kind AS \"job_kind!: JobKind\", - v2_as_queue.script_hash AS \"script_hash: ScriptHash\", - v2_as_queue.raw_flow AS \"raw_flow: sqlx::types::Json>\", - v2_as_completed_job.parent_job AS \"parent_job: Uuid\", - v2_as_completed_job.created_at AS \"created_at!: chrono::NaiveDateTime\", - v2_as_completed_job.created_by AS \"created_by!\", - v2_as_queue.script_path, - v2_as_queue.args AS \"args: sqlx::types::Json>\" - FROM v2_as_queue - JOIN v2_as_completed_job ON v2_as_completed_job.parent_job = v2_as_queue.id - WHERE v2_as_completed_job.id = $1 AND v2_as_completed_job.workspace_id = $2 + parent_j.kind AS \"job_kind!: JobKind\", + parent_j.runnable_id AS \"script_hash: ScriptHash\", + parent_j.raw_flow AS \"raw_flow: sqlx::types::Json>\", + completed_j.parent_job AS \"parent_job: Uuid\", + completed_j.created_at AS \"created_at!: chrono::NaiveDateTime\", + completed_j.created_by AS \"created_by!\", + parent_j.runnable_path as script_path, + parent_j.args AS \"args: sqlx::types::Json>\" + FROM v2_job_completed completed_c JOIN v2_job completed_j USING (id) + JOIN v2_job parent_j ON parent_j.id = completed_j.parent_job + WHERE completed_j.id = $1 AND completed_j.workspace_id = $2 ) SELECT * FROM job_info LIMIT 1", job_id, diff --git a/backend/windmill-api/src/apps.rs b/backend/windmill-api/src/apps.rs index ff14dc1ab8a72..05653c95ea0b0 100644 --- a/backend/windmill-api/src/apps.rs +++ b/backend/windmill-api/src/apps.rs @@ -2365,13 +2365,13 @@ async fn check_if_allowed_to_access_s3_file_from_app( || { sqlx::query_scalar!( r#"SELECT EXISTS ( - SELECT 1 FROM v2_as_completed_job - WHERE workspace_id = $2 - AND (job_kind = 'appscript' OR job_kind = 'preview') - AND created_by = 'anonymous' - AND started_at > now() - interval '3 hours' - AND script_path LIKE $3 || '/%' - AND result @> ('{"s3":"' || $1 || '"}')::jsonb + SELECT 1 FROM v2_job_completed c JOIN v2_job j USING (id) + WHERE j.workspace_id = $2 + AND (j.kind = 'appscript' OR j.kind = 'preview') + AND j.created_by = 'anonymous' + AND c.started_at > now() - interval '3 hours' + AND j.runnable_path LIKE $3 || '/%' + AND c.result @> ('{"s3":"' || $1 || '"}')::jsonb )"#, file_query.s3, w_id, diff --git a/backend/windmill-api/src/concurrency_groups.rs b/backend/windmill-api/src/concurrency_groups.rs index ba85043a735e1..50cd8379274dc 100644 --- a/backend/windmill-api/src/concurrency_groups.rs +++ b/backend/windmill-api/src/concurrency_groups.rs @@ -159,23 +159,23 @@ async fn get_concurrent_intervals( let lq = ListCompletedQuery { order_desc: Some(true), ..lq }; let lqc = lq.clone(); let lqq: ListQueueQuery = lqc.into(); - let mut sqlb_q = SqlBuilder::select_from("v2_as_queue") + let mut sqlb_q = SqlBuilder::select_from("v2_job_queue q JOIN v2_job USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id)") .fields(UnifiedJob::queued_job_fields()) .order_by("created_at", lq.order_desc.unwrap_or(true)) .limit(row_limit) .clone(); - let mut sqlb_c = SqlBuilder::select_from("v2_as_completed_job") + let mut sqlb_c = SqlBuilder::select_from("v2_job_completed c JOIN v2_job USING (id)") .fields(UnifiedJob::completed_job_fields()) .order_by("started_at", lq.order_desc.unwrap_or(true)) .limit(row_limit) .clone(); - let mut sqlb_q_user = SqlBuilder::select_from("v2_as_queue") - .fields(&["id"]) + let mut sqlb_q_user = SqlBuilder::select_from("v2_job_queue q JOIN v2_job USING (id)") + .fields(&["v2_job.id"]) .order_by("created_at", lq.order_desc.unwrap_or(true)) .limit(row_limit) .clone(); - let mut sqlb_c_user = SqlBuilder::select_from("v2_as_completed_job") - .fields(&["id"]) + let mut sqlb_c_user = SqlBuilder::select_from("v2_job_completed c JOIN v2_job USING (id)") + .fields(&["v2_job.id"]) .order_by("started_at", lq.order_desc.unwrap_or(true)) .limit(row_limit) .clone(); diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 48c89c26543b6..d92be79e04558 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -1502,10 +1502,10 @@ async fn get_job_logs( .flatten(); let record = sqlx::query!( - "SELECT created_by AS \"created_by!\", CONCAT(coalesce(v2_as_completed_job.logs, ''), coalesce(job_logs.logs, '')) as logs, job_logs.log_offset, job_logs.log_file_index - FROM v2_as_completed_job - LEFT JOIN job_logs ON job_logs.job_id = v2_as_completed_job.id - WHERE v2_as_completed_job.id = $1 AND v2_as_completed_job.workspace_id = $2 AND ($3::text[] IS NULL OR v2_as_completed_job.tag = ANY($3))", + "SELECT j.created_by AS \"created_by!\", CONCAT(coalesce(NULL, ''), coalesce(job_logs.logs, '')) as logs, job_logs.log_offset, job_logs.log_file_index + FROM v2_job_completed c JOIN v2_job j USING (id) + LEFT JOIN job_logs ON job_logs.job_id = j.id + WHERE j.id = $1 AND j.workspace_id = $2 AND ($3::text[] IS NULL OR j.tag = ANY($3))", id, w_id, tags.as_ref().map(|v| v.as_slice()) @@ -1550,10 +1550,10 @@ async fn get_job_logs( Ok(content_plain(Body::from(logs))) } else { let text = sqlx::query!( - "SELECT created_by AS \"created_by!\", CONCAT(coalesce(v2_as_queue.logs, ''), coalesce(job_logs.logs, '')) as logs, coalesce(job_logs.log_offset, 0) as log_offset, job_logs.log_file_index - FROM v2_as_queue - LEFT JOIN job_logs ON job_logs.job_id = v2_as_queue.id - WHERE v2_as_queue.id = $1 AND v2_as_queue.workspace_id = $2 AND ($3::text[] IS NULL OR v2_as_queue.tag = ANY($3))", + "SELECT j.created_by AS \"created_by!\", CONCAT(coalesce(NULL, ''), coalesce(job_logs.logs, '')) as logs, coalesce(job_logs.log_offset, 0) as log_offset, job_logs.log_file_index + FROM v2_job_queue q JOIN v2_job j USING (id) + LEFT JOIN job_logs ON job_logs.job_id = j.id + WHERE j.id = $1 AND j.workspace_id = $2 AND ($3::text[] IS NULL OR j.tag = ANY($3))", id, w_id, tags.as_ref().map(|v| v.as_slice()) @@ -1609,9 +1609,9 @@ async fn get_args( .map(|authed| get_scope_tags(authed)) .flatten(); let record = sqlx::query!( - "SELECT created_by AS \"created_by!\", args as \"args: sqlx::types::Json>\" - FROM v2_as_completed_job - WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + "SELECT j.created_by AS \"created_by!\", j.args as \"args: sqlx::types::Json>\" + FROM v2_job_completed c JOIN v2_job j USING (id) + WHERE j.id = $1 AND j.workspace_id = $2 AND ($3::text[] IS NULL OR j.tag = ANY($3))", id, &w_id, tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, @@ -2166,7 +2166,7 @@ async fn cancel_selection( let mut tx = user_db.begin(&authed).await?; let tags = get_scope_tags(&authed).map(|v| v.iter().map(|s| s.to_string()).collect_vec()); let jobs_to_cancel = sqlx::query_scalar!( - "SELECT id AS \"id!\" FROM v2_as_queue WHERE id = ANY($1) AND schedule_path IS NULL AND ($2::text[] IS NULL OR tag = ANY($2))", + "SELECT j.id AS \"id!\" FROM v2_job_queue q JOIN v2_job j USING (id) WHERE j.id = ANY($1) AND (CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END) IS NULL AND ($2::text[] IS NULL OR j.tag = ANY($2))", &jobs, tags.as_ref().map(|v| v.as_slice()) ) @@ -2270,7 +2270,7 @@ async fn count_queue_jobs( Ok(Json( sqlx::query_as!( QueueStats, - "SELECT coalesce(COUNT(*) FILTER(WHERE suspend = 0 AND running = false), 0) as \"database_length!\", coalesce(COUNT(*) FILTER(WHERE suspend > 0), 0) as \"suspended!\" FROM v2_as_queue WHERE (workspace_id = $1 OR $2) AND scheduled_for <= now() AND ($3::text[] IS NULL OR tag = ANY($3))", + "SELECT coalesce(COUNT(*) FILTER(WHERE q.suspend = 0 AND q.running = false), 0) as \"database_length!\", coalesce(COUNT(*) FILTER(WHERE q.suspend > 0), 0) as \"suspended!\" FROM v2_job_queue q JOIN v2_job j USING (id) WHERE (j.workspace_id = $1 OR $2) AND q.scheduled_for <= now() AND ($3::text[] IS NULL OR j.tag = ANY($3))", w_id, w_id == "admins" && cq.all_workspaces.unwrap_or(false), tags.as_ref().map(|v| v.as_slice()) @@ -2713,9 +2713,9 @@ async fn get_suspended_flow_info<'c>( let flow = sqlx::query_as!( FlowInfo, r#" - SELECT id AS "id!", flow_status, suspend AS "suspend!", script_path - FROM v2_as_queue - WHERE id = $1 + SELECT j.id AS "id!", COALESCE(s.flow_status, s.workflow_as_code_status) as flow_status, q.suspend AS "suspend!", j.runnable_path as script_path + FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_status s USING (id) + WHERE j.id = $1 "#, job_id, ) @@ -2929,9 +2929,9 @@ pub async fn get_flow_user_state( let mut tx = user_db.begin(&authed).await?; let r = sqlx::query_scalar!( r#" - SELECT flow_status->'user_states'->$1 - FROM v2_as_queue - WHERE id = $2 AND workspace_id = $3 + SELECT COALESCE(s.flow_status, s.workflow_as_code_status)->'user_states'->$1 + FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_status s USING (id) + WHERE j.id = $2 AND j.workspace_id = $3 "#, key, job_id, @@ -4060,10 +4060,10 @@ pub async fn restart_flow( let mut tx = user_db.clone().begin(&authed).await?; let completed_job = sqlx::query!( "SELECT - script_path, args AS \"args: sqlx::types::Json>>\", - tag AS \"tag!\", priority - FROM v2_as_completed_job - WHERE id = $1 and workspace_id = $2", + j.runnable_path as script_path, j.args AS \"args: sqlx::types::Json>>\", + j.tag AS \"tag!\", j.priority + FROM v2_job_completed c JOIN v2_job j USING (id) + WHERE j.id = $1 and j.workspace_id = $2", job_id, &w_id, ) @@ -4704,10 +4704,10 @@ pub async fn delete_job_metadata_after_use(db: &DB, job_uuid: Uuid) -> Result<() pub async fn check_queue_too_long(db: &DB, queue_limit: Option) -> error::Result<()> { if let Some(limit) = queue_limit { let count = sqlx::query_scalar!( - "SELECT COUNT(*) FROM v2_as_queue WHERE canceled = false AND (scheduled_for <= now() - OR (suspend_until IS NOT NULL - AND ( suspend <= 0 - OR suspend_until <= now())))", + "SELECT COUNT(*) FROM v2_job_queue q JOIN v2_job j USING (id) WHERE q.canceled_by IS NULL AND (q.scheduled_for <= now() + OR (q.suspend_until IS NOT NULL + AND ( q.suspend <= 0 + OR q.suspend_until <= now())))", ) .fetch_one(db) .await? @@ -7756,10 +7756,10 @@ async fn count_by_tag( let counts = sqlx::query_as!( TagCount, r#" - SELECT tag as "tag!", COUNT(*) as "count!" - FROM v2_as_completed_job - WHERE started_at > NOW() - make_interval(secs => $1) AND ($2::text IS NULL OR workspace_id = $2) - GROUP BY tag + SELECT j.tag as "tag!", COUNT(*) as "count!" + FROM v2_job_completed c JOIN v2_job j USING (id) + WHERE c.started_at > NOW() - make_interval(secs => $1) AND ($2::text IS NULL OR j.workspace_id = $2) + GROUP BY j.tag ORDER BY "count!" DESC "#, horizon as f64, diff --git a/backend/windmill-api/src/users.rs b/backend/windmill-api/src/users.rs index 722c05ebce636..71e8704ea7af2 100644 --- a/backend/windmill-api/src/users.rs +++ b/backend/windmill-api/src/users.rs @@ -494,12 +494,12 @@ async fn list_user_usage( SELECT usr.email, usage.executions FROM usr , LATERAL ( - SELECT COALESCE(SUM(duration_ms + 1000)/1000 , 0)::BIGINT executions - FROM v2_as_completed_job - WHERE workspace_id = $1 - AND job_kind NOT IN ('flow', 'flowpreview', 'flownode') - AND email = usr.email - AND now() - '1 week'::interval < created_at + SELECT COALESCE(SUM(c.duration_ms + 1000)/1000 , 0)::BIGINT executions + FROM v2_job_completed c JOIN v2_job j USING (id) + WHERE j.workspace_id = $1 + AND j.kind NOT IN ('flow', 'flowpreview', 'flownode') + AND j.permissioned_as_email = usr.email + AND now() - '1 week'::interval < j.created_at ) usage WHERE workspace_id = $1 ", diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 189581446dedb..0ee39320aacce 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -539,7 +539,7 @@ async fn cancel_persistent_script_jobs_internal<'c>( // we could have retrieved the job IDs in the first query where we retrieve the hashes, but just in case a job was inserted in the queue right in-between the two above query, we re-do the fetch here let jobs_to_cancel = sqlx::query_scalar::<_, Uuid>( - "SELECT id FROM v2_as_queue WHERE workspace_id = $1 AND script_path = $2 AND canceled = false", + "SELECT j.id FROM v2_job_queue q JOIN v2_job j USING (id) WHERE j.workspace_id = $1 AND j.runnable_path = $2 AND q.canceled_by IS NULL", ) .bind(w_id) .bind(script_path) @@ -2913,8 +2913,18 @@ async fn get_queued_job_tx<'c>( tx: &mut Transaction<'c, Postgres>, ) -> error::Result> { sqlx::query_as::<_, QueuedJob>( - "SELECT *, null as workflow_as_code_status - FROM v2_as_queue WHERE id = $1 AND workspace_id = $2", + "SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, q.started_at, q.scheduled_for, q.running, + j.runnable_id AS script_hash, j.runnable_path AS script_path, j.args, j.raw_code, + q.canceled_by IS NOT NULL AS canceled, q.canceled_by, q.canceled_reason, r.ping AS last_ping, + j.kind AS job_kind, CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path, + j.permissioned_as, COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, j.raw_flow, + j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, q.suspend, q.suspend_until, + j.same_worker, j.raw_lock, j.pre_run_error, j.permissioned_as_email AS email, j.visible_to_owner, + r.memory_peak AS mem_peak, j.flow_innermost_root_job AS root_job, s.flow_leaf_jobs AS leaf_jobs, + j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.timeout, j.flow_step_id, j.cache_ttl, + j.priority, NULL::TEXT AS logs, j.script_entrypoint_override, j.preprocessed, null as workflow_as_code_status + FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id) + WHERE j.id = $1 AND j.workspace_id = $2", ) .bind(id) .bind(w_id) @@ -2925,8 +2935,18 @@ async fn get_queued_job_tx<'c>( pub async fn get_queued_job(id: &Uuid, w_id: &str, db: &DB) -> error::Result> { sqlx::query_as::<_, QueuedJob>( - "SELECT *, null as workflow_as_code_status - FROM v2_as_queue WHERE id = $1 AND workspace_id = $2", + "SELECT j.id, j.workspace_id, j.parent_job, j.created_by, j.created_at, q.started_at, q.scheduled_for, q.running, + j.runnable_id AS script_hash, j.runnable_path AS script_path, j.args, j.raw_code, + q.canceled_by IS NOT NULL AS canceled, q.canceled_by, q.canceled_reason, r.ping AS last_ping, + j.kind AS job_kind, CASE WHEN j.trigger_kind = 'schedule'::job_trigger_kind THEN j.trigger END AS schedule_path, + j.permissioned_as, COALESCE(s.flow_status, s.workflow_as_code_status) AS flow_status, j.raw_flow, + j.flow_step_id IS NOT NULL AS is_flow_step, j.script_lang AS language, q.suspend, q.suspend_until, + j.same_worker, j.raw_lock, j.pre_run_error, j.permissioned_as_email AS email, j.visible_to_owner, + r.memory_peak AS mem_peak, j.flow_innermost_root_job AS root_job, s.flow_leaf_jobs AS leaf_jobs, + j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.timeout, j.flow_step_id, j.cache_ttl, + j.priority, NULL::TEXT AS logs, j.script_entrypoint_override, j.preprocessed, null as workflow_as_code_status + FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id) + WHERE j.id = $1 AND j.workspace_id = $2", ) .bind(id) .bind(w_id) @@ -3171,7 +3191,7 @@ pub async fn push<'c, 'd>( } let in_queue = sqlx::query_scalar!( - "SELECT COUNT(id) FROM v2_as_queue WHERE email = $1", + "SELECT COUNT(j.id) FROM v2_job_queue q JOIN v2_job j USING (id) WHERE j.permissioned_as_email = $1", email ) .fetch_one(_db) @@ -3185,7 +3205,7 @@ pub async fn push<'c, 'd>( } let concurrent_runs = sqlx::query_scalar!( - "SELECT COUNT(id) FROM v2_as_queue WHERE running = true AND email = $1", + "SELECT COUNT(j.id) FROM v2_job_queue q JOIN v2_job j USING (id) WHERE q.running = true AND j.permissioned_as_email = $1", email ) .fetch_one(_db) @@ -4381,11 +4401,11 @@ async fn restarted_flows_resolution( > { let row = sqlx::query!( "SELECT - script_path, script_hash AS \"script_hash: ScriptHash\", - job_kind AS \"job_kind!: JobKind\", - flow_status AS \"flow_status: Json>\", - raw_flow AS \"raw_flow: Json>\" - FROM v2_as_completed_job WHERE id = $1 and workspace_id = $2", + j.runnable_path as script_path, j.runnable_id AS \"script_hash: ScriptHash\", + j.kind AS \"job_kind!: JobKind\", + COALESCE(c.flow_status, c.workflow_as_code_status) AS \"flow_status: Json>\", + j.raw_flow AS \"raw_flow: Json>\" + FROM v2_job_completed c JOIN v2_job j USING (id) WHERE j.id = $1 and j.workspace_id = $2", completed_flow_id, workspace_id, )