Skip to content

Commit f458f98

Browse files
committed
[CHORE] Tool for managing tasks at the gRPC level.
1 parent 08647ab commit f458f98

File tree

3 files changed

+185
-0
lines changed

3 files changed

+185
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/sysdb/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ sea-query-binder = { workspace = true, features = ["sqlx-sqlite"] }
2424
chrono = { workspace = true }
2525
prost = { workspace = true }
2626
derivative = "2.2.0"
27+
clap = { workspace = true }
2728

2829
chroma-config = { workspace = true }
2930
chroma-error = { workspace = true, features = ["tonic", "sqlx"] }
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
use chroma_types::chroma_proto;
2+
use clap::{Parser, Subcommand};
3+
use tonic::transport::Channel;
4+
5+
#[derive(Parser)]
6+
#[command(name = "chroma-sysdb")]
7+
#[command(about = "CLI client for Chroma coordinator task management", long_about = None)]
8+
struct Cli {
9+
#[arg(long, default_value = "http://localhost:50051")]
10+
addr: String,
11+
12+
#[command(subcommand)]
13+
command: Command,
14+
}
15+
16+
#[derive(Subcommand)]
17+
enum Command {
18+
#[command(about = "Create a new task")]
19+
CreateTask {
20+
#[arg(long)]
21+
name: String,
22+
#[arg(long)]
23+
operator_name: String,
24+
#[arg(long)]
25+
input_collection_id: String,
26+
#[arg(long)]
27+
output_collection_name: String,
28+
#[arg(long)]
29+
params: String,
30+
#[arg(long)]
31+
tenant_id: String,
32+
#[arg(long)]
33+
database: String,
34+
#[arg(long, default_value = "100")]
35+
min_records_for_task: u64,
36+
},
37+
#[command(about = "Get task by name")]
38+
GetTask {
39+
#[arg(long)]
40+
input_collection_id: String,
41+
#[arg(long)]
42+
task_name: String,
43+
},
44+
#[command(about = "Delete a task")]
45+
DeleteTask {
46+
#[arg(long)]
47+
input_collection_id: String,
48+
#[arg(long)]
49+
task_name: String,
50+
#[arg(long)]
51+
delete_output: bool,
52+
},
53+
#[command(about = "Mark a task run as complete")]
54+
DoneTask {
55+
#[arg(long)]
56+
task_id: String,
57+
#[arg(long)]
58+
task_run_nonce: String,
59+
},
60+
#[command(about = "Get all operators")]
61+
GetOperators,
62+
#[command(about = "Peek schedule by collection IDs")]
63+
PeekSchedule {
64+
#[arg(long, value_delimiter = ',')]
65+
collection_ids: Vec<String>,
66+
},
67+
}
68+
69+
#[tokio::main]
70+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
71+
let cli = Cli::parse();
72+
73+
let channel = Channel::from_shared(cli.addr.clone())?.connect().await?;
74+
75+
let mut client = chroma_proto::sys_db_client::SysDbClient::new(channel);
76+
77+
match cli.command {
78+
Command::CreateTask {
79+
name,
80+
operator_name,
81+
input_collection_id,
82+
output_collection_name,
83+
params,
84+
tenant_id,
85+
database,
86+
min_records_for_task,
87+
} => {
88+
let request = chroma_proto::CreateTaskRequest {
89+
name,
90+
operator_name,
91+
input_collection_id,
92+
output_collection_name,
93+
params,
94+
tenant_id,
95+
database,
96+
min_records_for_task,
97+
};
98+
99+
let response = client.create_task(request).await?;
100+
println!("Task created: {}", response.into_inner().task_id);
101+
}
102+
Command::GetTask {
103+
input_collection_id,
104+
task_name,
105+
} => {
106+
let request = chroma_proto::GetTaskByNameRequest {
107+
input_collection_id,
108+
task_name,
109+
};
110+
111+
let response = client.get_task_by_name(request).await?;
112+
let task = response.into_inner();
113+
114+
println!("Task ID: {:?}", task.task_id);
115+
println!("Name: {:?}", task.name);
116+
println!("Operator: {:?}", task.operator_name);
117+
println!("Input Collection: {:?}", task.input_collection_id);
118+
println!("Output Collection Name: {:?}", task.output_collection_name);
119+
println!("Output Collection ID: {:?}", task.output_collection_id);
120+
println!("Params: {:?}", task.params);
121+
println!("Completion Offset: {:?}", task.completion_offset);
122+
println!("Min Records: {:?}", task.min_records_for_task);
123+
}
124+
Command::DeleteTask {
125+
input_collection_id,
126+
task_name,
127+
delete_output,
128+
} => {
129+
let request = chroma_proto::DeleteTaskRequest {
130+
input_collection_id,
131+
task_name,
132+
delete_output,
133+
};
134+
135+
let response = client.delete_task(request).await?;
136+
println!("Task deleted: {}", response.into_inner().success);
137+
}
138+
Command::DoneTask {
139+
task_id,
140+
task_run_nonce,
141+
} => {
142+
let request = chroma_proto::DoneTaskRequest {
143+
// TODO(claude): Pass an actual collection id.
144+
collection_id: None,
145+
task_id: Some(task_id),
146+
task_run_nonce: Some(task_run_nonce),
147+
};
148+
149+
client.done_task(request).await?;
150+
println!("Task marked as done");
151+
}
152+
Command::GetOperators => {
153+
let request = chroma_proto::GetOperatorsRequest {};
154+
155+
let response = client.get_operators(request).await?;
156+
let operators = response.into_inner().operators;
157+
158+
println!("Operators:");
159+
for op in operators {
160+
println!(" {} - {}", op.id, op.name);
161+
}
162+
}
163+
Command::PeekSchedule { collection_ids } => {
164+
let request = chroma_proto::PeekScheduleByCollectionIdRequest {
165+
collection_id: collection_ids,
166+
};
167+
168+
let response = client.peek_schedule_by_collection_id(request).await?;
169+
let entries = response.into_inner().schedule;
170+
171+
println!("Schedule:");
172+
for entry in entries {
173+
println!(" Collection: {:?}", entry.collection_id);
174+
println!(" Task ID: {:?}", entry.task_id);
175+
println!(" Nonce: {:?}", entry.task_run_nonce);
176+
println!(" When: {:?}", entry.when_to_run);
177+
println!();
178+
}
179+
}
180+
}
181+
182+
Ok(())
183+
}

0 commit comments

Comments
 (0)