Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ class _SourceRefreshOptions:
class _ExecutionOptions:
max_inflight_rows: int | None = None
max_inflight_bytes: int | None = None
timeout: int | None = None


class FlowBuilder:
Expand Down
8 changes: 8 additions & 0 deletions python/cocoindex/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class OpArgs:
- max_batch_size: The maximum batch size for the executor. Only valid if `batching` is True.
- behavior_version: The behavior version of the executor. Cache will be invalidated if it
changes. Must be provided if `cache` is True.
- timeout: Timeout in seconds for this function execution. None means use default (300s).
- arg_relationship: It specifies the relationship between an input argument and the output,
e.g. `(ArgRelationship.CHUNKS_BASE_TEXT, "content")` means the output is chunks for the
input argument with name `content`.
Expand All @@ -164,6 +165,7 @@ class OpArgs:
batching: bool = False
max_batch_size: int | None = None
behavior_version: int | None = None
timeout: int | None = None
arg_relationship: tuple[ArgRelationship, str] | None = None


Expand Down Expand Up @@ -202,6 +204,7 @@ def _register_op_factory(

class _WrappedExecutor:
_executor: Any
_spec: Any
_args_info: list[_ArgInfo]
_kwargs_info: dict[str, _ArgInfo]
_result_encoder: Callable[[Any], Any]
Expand Down Expand Up @@ -391,6 +394,11 @@ def enable_cache(self) -> bool:
def behavior_version(self) -> int | None:
return op_args.behavior_version

def timeout(self) -> int | None:
if op_args.timeout is not None:
return op_args.timeout

return None
def batching_options(self) -> dict[str, Any] | None:
if op_args.batching:
return {
Expand Down
6 changes: 6 additions & 0 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ pub struct ExecutionOptions {

#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_inflight_bytes: Option<usize>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout: Option<std::time::Duration>,
}

impl ExecutionOptions {
Expand Down Expand Up @@ -289,6 +292,9 @@ impl fmt::Display for ImportOpSpec {
pub struct TransformOpSpec {
pub inputs: Vec<OpArgBinding>,
pub op: OpSpec,

#[serde(default)]
pub execution_options: ExecutionOptions,
}

impl SpecFormatter for TransformOpSpec {
Expand Down
15 changes: 13 additions & 2 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use crate::{
};
use futures::future::{BoxFuture, try_join3};
use futures::{FutureExt, future::try_join_all};
use tokio::time::Duration;

const TIMEOUT_THRESHOLD: u64 = 1800;
const WARNING_THRESHOLD: u64 = 30;

#[derive(Debug)]
pub(super) enum ValueTypeBuilder {
Expand Down Expand Up @@ -802,18 +806,24 @@ impl AnalyzerContext {
.with(&output_enriched_type.without_attrs())?;
let output_type = output_enriched_type.typ.clone();
let output =
op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?;
let op_name = reactive_op_name.clone();
op_scope.add_op_output(reactive_op.name.clone(), output_enriched_type)?;
let op_name = reactive_op.name.clone();
let op_kind = op.op.kind.clone();
let execution_options_timeout = op.execution_options.timeout;
async move {
trace!("Start building executor for transform op `{op_name}`");
let executor = executor.await.with_context(|| {
format!("Preparing for transform op: {op_name}")
})?;
let enable_cache = executor.enable_cache();
let behavior_version = executor.behavior_version();
let timeout = executor.timeout()
.or(execution_options_timeout)
.or(Some(Duration::from_secs(TIMEOUT_THRESHOLD)));
trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
let function_exec_info = AnalyzedFunctionExecInfo {
enable_cache,
timeout,
behavior_version,
fingerprinter: logic_fingerprinter
.with(&behavior_version)?,
Expand All @@ -828,6 +838,7 @@ impl AnalyzerContext {
}
Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
name: op_name,
op_kind,
inputs: input_value_mappings,
function_exec_info,
executor,
Expand Down
1 change: 1 addition & 0 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ impl FlowBuilder {
})
.collect(),
op: spec,
execution_options: Default::default(),
}),
};

Expand Down
3 changes: 3 additions & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::prelude::*;

use crate::ops::interface::*;
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
use std::time::Duration;

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct AnalyzedLocalFieldReference {
Expand Down Expand Up @@ -64,6 +65,7 @@ pub struct AnalyzedImportOp {

pub struct AnalyzedFunctionExecInfo {
pub enable_cache: bool,
pub timeout: Option<Duration>,
pub behavior_version: Option<u32>,

/// Fingerprinter of the function's behavior.
Expand All @@ -74,6 +76,7 @@ pub struct AnalyzedFunctionExecInfo {

pub struct AnalyzedTransformOp {
pub name: String,
pub op_kind: String,
pub inputs: Vec<AnalyzedValueMapping>,
pub function_exec_info: AnalyzedFunctionExecInfo,
pub executor: Box<dyn SimpleFunctionExecutor>,
Expand Down
100 changes: 83 additions & 17 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::prelude::*;

use anyhow::{Context, Ok};
use futures::future::try_join_all;
use log::warn;
use tokio::time::Duration;

use crate::base::value::EstimatedByteSize;
use crate::builder::{AnalyzedTransientFlow, plan::*};
Expand All @@ -13,6 +15,9 @@ use crate::{

use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell};

const TIMEOUT_THRESHOLD: u64 = 1800;
const WARNING_THRESHOLD: u64 = 30;

#[derive(Debug)]
pub struct ScopeValueBuilder {
// TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity.
Expand Down Expand Up @@ -368,17 +373,24 @@ async fn evaluate_op_scope(
for reactive_op in op_scope.reactive_ops.iter() {
match reactive_op {
AnalyzedReactiveOp::Transform(op) => {
// Track transform operation start
let transform_key = format!("transform/{}{}", op_scope.scope_qualifier, op.name);

if let Some(ref op_stats) = operation_in_process_stats {
let transform_key =
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
op_stats.start_processing(&transform_key, 1);
}

let mut input_values = Vec::with_capacity(op.inputs.len());
for value in assemble_input_values(&op.inputs, scoped_entries) {
input_values.push(value?);
}
let timeout_duration = op
.function_exec_info
.timeout
.unwrap_or(Duration::from_secs(TIMEOUT_THRESHOLD));
let warn_duration = Duration::from_secs(WARNING_THRESHOLD);

let op_name_for_warning = op.name.clone();
let op_kind_for_warning = op.op_kind.clone();

let result = if op.function_exec_info.enable_cache {
let output_value_cell = memory.get_cache_entry(
Expand All @@ -393,27 +405,81 @@ async fn evaluate_op_scope(
&op.function_exec_info.output_type,
/*ttl=*/ None,
)?;
evaluate_with_cell(output_value_cell.as_ref(), move || {

let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
op.executor.evaluate(input_values)
})
.await
.and_then(|v| head_scope.define_field(&op.output, &v))
});
let mut eval_future = Box::pin(eval_future);
let mut warned = false;
let timeout_future = tokio::time::sleep(timeout_duration);
tokio::pin!(timeout_future);

let res = loop {
tokio::select! {
res = &mut eval_future => {
break Ok(res?);
}
_ = &mut timeout_future => {
break Err(anyhow!(
"Function '{}' ({}) timed out after {} seconds",
op.op_kind, op.name, timeout_duration.as_secs()
));
}
_ = tokio::time::sleep(warn_duration), if !warned => {
eprintln!(
"WARNING: Function '{}' ({}) is taking longer than 30s",
op_kind_for_warning, op_name_for_warning
);
warn!(
"Function '{}' ({}) is taking longer than {}s",
op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
);
warned = true;
}
}
};

res.and_then(|v| head_scope.define_field(&op.output, &v))
} else {
op.executor
.evaluate(input_values)
.await
.and_then(|v| head_scope.define_field(&op.output, &v))
}
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));
let eval_future = op.executor.evaluate(input_values);
let mut eval_future = Box::pin(eval_future);
let mut warned = false;
let timeout_future = tokio::time::sleep(timeout_duration);
tokio::pin!(timeout_future);

let res = loop {
tokio::select! {
res = &mut eval_future => {
break Ok(res?);
}
_ = &mut timeout_future => {
break Err(anyhow!(
"Function '{}' ({}) timed out after {} seconds",
op.op_kind, op.name, timeout_duration.as_secs()
));
}
_ = tokio::time::sleep(warn_duration), if !warned => {
eprintln!(
"WARNING: Function '{}' ({}) is taking longer than 30s",
op_kind_for_warning, op_name_for_warning
);
warn!(
"Function '{}' ({}) is taking longer than {}s",
op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
);
warned = true;
}
}
};

res.and_then(|v| head_scope.define_field(&op.output, &v))
};

// Track transform operation completion
if let Some(ref op_stats) = operation_in_process_stats {
let transform_key =
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
op_stats.finish_processing(&transform_key, 1);
}

result?
result.with_context(|| format!("Evaluating Transform op `{}`", op.name))?
}

AnalyzedReactiveOp::ForEach(op) => {
Expand Down
1 change: 1 addition & 0 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl SourceUpdateTask {
let mut change_stream = change_stream;
let retry_options = retryable::RetryOptions {
retry_timeout: None,
per_call_timeout: None,
initial_backoff: std::time::Duration::from_secs(5),
max_backoff: std::time::Duration::from_secs(60),
};
Expand Down
5 changes: 5 additions & 0 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ pub trait SimpleFunctionExecutor: Send + Sync {
fn behavior_version(&self) -> Option<u32> {
None
}

/// Returns None to use the default timeout (300s)
fn timeout(&self) -> Option<std::time::Duration> {
None
}
}

#[async_trait]
Expand Down
16 changes: 15 additions & 1 deletion src/ops/py_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct PyFunctionExecutor {

enable_cache: bool,
behavior_version: Option<u32>,
timeout: Option<std::time::Duration>,
}

impl PyFunctionExecutor {
Expand Down Expand Up @@ -112,6 +113,10 @@ impl interface::SimpleFunctionExecutor for Arc<PyFunctionExecutor> {
fn behavior_version(&self) -> Option<u32> {
self.behavior_version
}

fn timeout(&self) -> Option<std::time::Duration> {
self.timeout
}
}

struct PyBatchedFunctionExecutor {
Expand All @@ -121,6 +126,7 @@ struct PyBatchedFunctionExecutor {

enable_cache: bool,
behavior_version: Option<u32>,
timeout: Option<std::time::Duration>,
batching_options: batching::BatchingOptions,
}

Expand Down Expand Up @@ -240,7 +246,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
.as_ref()
.ok_or_else(|| anyhow!("Python execution context is missing"))?
.clone();
let (prepare_fut, enable_cache, behavior_version, batching_options) =
let (prepare_fut, enable_cache, behavior_version, timeout, batching_options) =
Python::with_gil(|py| -> anyhow::Result<_> {
let prepare_coro = executor
.call_method(py, "prepare", (), None)
Expand All @@ -260,6 +266,11 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
.call_method(py, "behavior_version", (), None)
.to_result_with_py_trace(py)?
.extract::<Option<u32>>(py)?;
let timeout = executor
.call_method(py, "timeout", (), None)
.to_result_with_py_trace(py)?
.extract::<Option<u64>>(py)?
.map(std::time::Duration::from_secs);
let batching_options = executor
.call_method(py, "batching_options", (), None)
.to_result_with_py_trace(py)?
Expand All @@ -271,6 +282,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
prepare_fut,
enable_cache,
behavior_version,
timeout,
batching_options,
))
})?;
Expand All @@ -284,6 +296,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
result_type,
enable_cache,
behavior_version,
timeout,
batching_options,
}
.into_fn_executor(),
Expand All @@ -297,6 +310,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
result_type,
enable_cache,
behavior_version,
timeout,
}))
};
Ok(executor)
Expand Down
Loading
Loading