Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ See the [Distributed Deployment Guide](docs/distributed-deployment.md) for clust
|---------|-------------|
| `dora list` | List running dataflows (alias: `ps`) |
| `dora clean` | Remove finished and failed dataflows from the coordinator |
| `dora logs <ID>` | Show logs for a dataflow or node |
| `dora logs <ID> [--node <NAME>]` | Show logs for a dataflow or node |
| `dora top` | Real-time resource monitor (TUI); also `dora inspect top` |
| `dora topic list` | List topics in a dataflow |
| `dora topic hz <TOPIC>` | Measure topic publish frequency (TUI) |
Expand Down
2 changes: 1 addition & 1 deletion README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ dora cluster down
| 命令 | 描述 |
|------|------|
| `dora list` | 列出运行中的数据流(别名:`ps`) |
| `dora logs <ID>` | 显示数据流或节点的日志 |
| `dora logs <ID> [--node <NAME>]` | 显示数据流或节点的日志 |
| `dora top` | 实时资源监控(TUI);也可使用 `dora inspect top` |
| `dora topic list` | 列出数据流中的主题 |
| `dora topic hz <TOPIC>` | 测量主题发布频率(TUI) |
Expand Down
97 changes: 78 additions & 19 deletions binaries/cli/src/command/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@ use eyre::{Context, Result, bail};
use uuid::Uuid;

#[derive(Debug, Args)]
/// Show logs of a given dataflow and node.
/// Show logs of a given dataflow.
pub struct LogsArgs {
/// Identifier of the dataflow
/// Identifier of the dataflow (UUID or name)
#[clap(value_name = "UUID_OR_NAME")]
pub dataflow: Option<String>,
/// Show logs for the given node (omit with --all-nodes)
#[clap(value_name = "NAME")]
/// Deprecated positional node name. Use --node instead.
#[clap(value_name = "NAME", hide = true)]
pub legacy_node: Option<NodeId>,
/// Show logs for the given node
#[clap(long, short = 'n', value_name = "NAME", conflicts_with = "all_nodes")]
pub node: Option<NodeId>,
/// Show logs from all nodes merged by timestamp.
/// Streams from coordinator by default, falls back to local out/ directory.
#[clap(long)]
#[clap(long, conflicts_with = "node")]
pub all_nodes: bool,
/// Number of lines to show from the end of the logs
#[clap(long)]
Expand Down Expand Up @@ -94,6 +97,8 @@ impl Executable for LogsArgs {
fn execute(self) -> eyre::Result<()> {
default_tracing()?;

reject_legacy_node_arg(&self)?;

// --local always uses local file path
if self.local {
if self.follow {
Expand All @@ -102,14 +107,12 @@ impl Executable for LogsArgs {
return read_local_logs(&self);
}

// Single node via coordinator (unchanged)
if let Some(ref _node) = self.node
&& !self.all_nodes
{
let node = self.node.clone().unwrap();
// Single node via coordinator
if let Some(ref node) = self.node {
let node = node.clone();
let config = build_log_config(&self)?;
let session = self.coordinator.connect()?;
let uuid = resolve_dataflow_identifier_interactive(&session, self.dataflow.as_deref())?;
let uuid = resolve_logs_dataflow_identifier(&session, self.dataflow.as_deref(), None)?;
return logs(
&session,
uuid,
Expand All @@ -128,8 +131,11 @@ impl Executable for LogsArgs {
// Try coordinator first, fall back to local
match self.coordinator.connect() {
Ok(session) => {
let uuid =
resolve_dataflow_identifier_interactive(&session, self.dataflow.as_deref())?;
let uuid = resolve_logs_dataflow_identifier(
&session,
self.dataflow.as_deref(),
legacy_positional_node(&self),
)?;
let config = build_log_config(&self)?;
stream_logs_from_coordinator(
&session,
Expand All @@ -149,24 +155,77 @@ impl Executable for LogsArgs {
}
}

fn reject_legacy_node_arg(args: &LogsArgs) -> Result<()> {
if let Some(node) = &args.legacy_node {
let dataflow = args.dataflow.as_deref().unwrap_or("<DATAFLOW>");
bail!(
"positional node argument `{node}` is no longer supported\n\n \
hint: use `dora logs {dataflow} --node {node}` instead"
);
}
Ok(())
}

fn legacy_positional_node(args: &LogsArgs) -> Option<&str> {
args.dataflow
.as_deref()
.filter(|_| args.node.is_none() && !args.all_nodes)
}

fn resolve_logs_dataflow_identifier(
session: &WsSession,
dataflow: Option<&str>,
legacy_node_hint: Option<&str>,
) -> Result<Uuid> {
match resolve_dataflow_identifier_interactive(session, dataflow) {
Ok(uuid) => Ok(uuid),
Err(err) if legacy_node_hint.is_some() => {
let node = legacy_node_hint.unwrap();
Err(err).wrap_err_with(|| {
format!(
"failed to resolve dataflow `{node}`\n\n \
hint: if `{node}` is a node name, use `dora logs --node {node}`"
)
})
}
Err(err) => Err(err),
}
}

fn find_logs_dataflow_dir(out_dir: &Path, args: &LogsArgs) -> Result<PathBuf> {
match find_dataflow_dir(out_dir, args.dataflow.as_deref()) {
Ok(dir) => Ok(dir),
Err(err) if legacy_positional_node(args).is_some() => {
let node = legacy_positional_node(args).unwrap();
Err(err).wrap_err_with(|| {
format!(
"failed to resolve local log dataflow `{node}`\n\n \
hint: if `{node}` is a node name, use `dora logs --local --node {node}`"
)
})
}
Err(err) => Err(err),
}
}

fn read_local_logs(args: &LogsArgs) -> Result<()> {
let out_dir = Path::new("out");
if !out_dir.exists() {
bail!(
"no out/ directory found in current directory\n\n \
hint: local logs are stored in ./out/ when running with `dora run`.\n \
For remote dataflows, connect to the coordinator with `dora logs -d <DATAFLOW>`"
For remote dataflows, connect to the coordinator with `dora logs <DATAFLOW>`"
);
}

// Find the dataflow directory (most recent if not specified)
let dataflow_dir = find_dataflow_dir(out_dir, args.dataflow.as_deref())?;
let dataflow_dir = find_logs_dataflow_dir(out_dir, args)?;

let config = build_log_config(args)?;
let now = Utc::now();

let log_files = match &args.node {
Some(node) if !args.all_nodes => find_node_log_files(&dataflow_dir, node)?,
Some(node) => find_node_log_files(&dataflow_dir, node)?,
_ => find_log_files(&dataflow_dir)?,
};
if log_files.is_empty() {
Expand Down Expand Up @@ -200,16 +259,16 @@ fn follow_local_logs(args: &LogsArgs) -> Result<()> {
bail!(
"no out/ directory found in current directory\n\n \
hint: local logs are stored in ./out/ when running with `dora run`.\n \
For remote dataflows, connect to the coordinator with `dora logs -d <DATAFLOW>`"
For remote dataflows, connect to the coordinator with `dora logs <DATAFLOW>`"
);
}

let dataflow_dir = find_dataflow_dir(out_dir, args.dataflow.as_deref())?;
let dataflow_dir = find_logs_dataflow_dir(out_dir, args)?;
let config = build_log_config(args)?;
let now = Utc::now();

let files = match &args.node {
Some(node) if !args.all_nodes => find_node_log_files(&dataflow_dir, node)?,
Some(node) => find_node_log_files(&dataflow_dir, node)?,
_ => find_log_files(&dataflow_dir)?,
};

Expand Down
41 changes: 39 additions & 2 deletions binaries/cli/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ pub enum Command {
/// Remove finished and failed dataflows from the coordinator
#[clap(display_order = 10)]
Clean(CleanArgs),
/// Show logs of a given dataflow and node
#[command(allow_missing_positional = true)]
/// Show logs of a given dataflow
#[clap(display_order = 11)]
Logs(LogsArgs),
/// Inspect running dataflows in real-time
Expand Down Expand Up @@ -300,6 +299,44 @@ mod tests {
parse_ok(&["dora", "logs"]);
}

#[test]
fn parse_logs_dataflow() {
parse_ok(&["dora", "logs", "my-dataflow"]);
}

#[test]
fn parse_logs_node_flag() {
parse_ok(&["dora", "logs", "--node", "sensor"]);
parse_ok(&["dora", "logs", "-n", "sensor"]);
}

#[test]
fn parse_logs_dataflow_node_flag() {
parse_ok(&["dora", "logs", "my-dataflow", "--node", "sensor"]);
}

#[test]
fn parse_logs_all_nodes() {
parse_ok(&["dora", "logs", "my-dataflow", "--all-nodes"]);
}

#[test]
fn parse_logs_legacy_positional_node_for_runtime_hint() {
parse_ok(&["dora", "logs", "my-dataflow", "sensor"]);
}

#[test]
fn reject_logs_node_and_all_nodes() {
parse_err(&[
"dora",
"logs",
"my-dataflow",
"--node",
"sensor",
"--all-nodes",
]);
}

#[test]
fn parse_build() {
parse_ok(&["dora", "build", "foo.yml"]);
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl PendingNodes {
let result = match &node_exited_before_subscribe {
Some(causing_node) => Err(format!(
"Node {causing_node} exited before initializing dora. For \
more information, run `dora logs {} {causing_node}`.",
more information, run `dora logs {} --node {causing_node}`.",
self.dataflow_id
)),
None => Ok(()),
Expand Down
16 changes: 8 additions & 8 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,16 +540,16 @@ dora clean --format json

#### `dora logs`

Show and follow logs of a dataflow and node.
Show and follow logs of a dataflow.

```
dora logs [UUID_OR_NAME] [NODE] [OPTIONS]
dora logs [UUID_OR_NAME] [OPTIONS]
```

| Flag | Default | Description |
|------|---------|-------------|
| `[UUID_OR_NAME]` | | Dataflow UUID or name |
| `[NODE]` | | Node name (required unless `--all-nodes`) |
| `--node <NAME>`, `-n` | | Node name |
| `--all-nodes` | false | Merge logs from all nodes by timestamp |
| `--tail <N>` | all | Show last N lines |
| `--follow`, `-f` | false | Stream new log entries |
Expand All @@ -572,7 +572,7 @@ dora logs [UUID_OR_NAME] [NODE] [OPTIONS]
dora logs my-dataflow --all-nodes --follow

# Last 50 errors from a specific node
dora logs my-dataflow sensor --level error --tail 50
dora logs my-dataflow --node sensor --level error --tail 50

# Search logs from last 5 minutes
dora logs my-dataflow --all-nodes --since 5m --grep "timeout"
Expand All @@ -581,7 +581,7 @@ dora logs my-dataflow --all-nodes --since 5m --grep "timeout"
dora logs --local --all-nodes --tail 100

# Post-mortem analysis: errors in time window
dora logs --local sensor --since 1h --until 30m --level error
dora logs --local --node sensor --since 1h --until 30m --level error
```

**Duration formats:** `30` (seconds), `30s`, `5m`, `1h`, `2d`
Expand Down Expand Up @@ -1582,7 +1582,7 @@ dora start dataflow.yml --name my-robot --attach
dora top

# Logs from any node regardless of machine
dora logs my-robot inference --follow
dora logs my-robot --node inference --follow

# List all dataflows
dora list
Expand Down Expand Up @@ -1653,7 +1653,7 @@ dora run dataflow.yml --log-level trace --debug
dora node info -d my-dataflow problem-node

# 4. Monitor specific node logs
dora logs my-dataflow problem-node --follow --level debug
dora logs my-dataflow --node problem-node --follow --level debug

# 5. Check resource usage
dora top
Expand Down Expand Up @@ -1696,5 +1696,5 @@ Read directly with:

```bash
dora logs --local --all-nodes
dora logs --local <node-name> --tail 50
dora logs --local --node <node-name> --tail 50
```
10 changes: 5 additions & 5 deletions docs/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ dora node info -d my-dataflow problem-node
dora top

# 5. Stream logs from the problem node
dora logs my-dataflow problem-node --follow --level debug
dora logs my-dataflow --node problem-node --follow --level debug

# 6. Is the node producing output?
dora topic echo -d my-dataflow problem-node/output
Expand Down Expand Up @@ -539,13 +539,13 @@ Note: CPU percentages are per-core, so values can exceed 100% for multi-threaded

```bash
# Stream logs from a specific node
dora logs my-dataflow sensor-node --follow
dora logs my-dataflow --node sensor-node --follow

# Stream logs from all nodes
dora logs my-dataflow --all-nodes --follow

# Filter by log level
dora logs my-dataflow sensor-node --follow --level debug
dora logs my-dataflow --node sensor-node --follow --level debug

# Stream with grep filter
dora logs my-dataflow --all-nodes --follow --grep "error"
Expand All @@ -572,7 +572,7 @@ Read directly:
dora logs --local --all-nodes

# Specific node, last 50 lines
dora logs --local sensor-node --tail 50
dora logs --local --node sensor-node --tail 50
```

### Filtering and Searching
Expand Down Expand Up @@ -660,7 +660,7 @@ dora list
dora top

# 2. Check its logs
dora logs my-dataflow problem-node --follow --level trace
dora logs my-dataflow --node problem-node --follow --level trace

# 3. Check if upstream nodes are publishing
dora topic echo -d my-dataflow upstream-node/output
Expand Down
4 changes: 2 additions & 2 deletions docs/distributed-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ dora list
dora top

# View node logs
dora logs my-app <node-id> --follow
dora logs my-app --node <node-id> --follow

# Stop a dataflow
dora stop my-app
Expand Down Expand Up @@ -834,4 +834,4 @@ nodes:
- **Monitor with multiple tools**: `dora cluster status` for daemon health, `dora top` for resource usage, `dora logs` for node output.
- **Test locally first**. Develop with `dora run dataflow.yml`, then deploy to a cluster. The same dataflow YAML works in both modes -- `_unstable_deploy` fields are ignored in local mode.
- **Use rolling upgrades** instead of stopping the entire cluster. `dora cluster upgrade` processes one machine at a time to maintain availability.
- **Keep cluster.yml in version control** alongside your dataflow definitions.
- **Keep cluster.yml in version control** alongside your dataflow definitions.
Loading
Loading