Skip to content

Commit 2c6ef7f

Browse files
committed
Admin/XMover: Implement suggestions by CodeRabbit
1 parent b288ab8 commit 2c6ef7f

File tree

12 files changed

+86
-62
lines changed

12 files changed

+86
-62
lines changed

cratedb_toolkit/admin/xmover/analysis/shard.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __init__(self, client: CrateDBClient):
3838
self.shards: List[ShardInfo] = []
3939

4040
# Initialize session-based caches for performance.
41-
self._zone_conflict_cache: Dict[Tuple[str, int, str], Union[str, None]] = {}
41+
self._zone_conflict_cache: Dict[Tuple[str, str, int, str], Union[str, None]] = {}
4242
self._node_lookup_cache: Dict[str, Union[NodeInfo, None]] = {}
4343
self._target_nodes_cache: Dict[Tuple[float, frozenset[Any], float, float], List[NodeInfo]] = {}
4444
self._cache_hits = 0
@@ -206,7 +206,7 @@ def generate_rebalancing_recommendations(
206206
# Get moveable shards (only healthy ones for actual operations)
207207
moveable_shards = self.find_moveable_shards(constraints.min_size, constraints.max_size, constraints.table_name)
208208

209-
print(
209+
logger.info(
210210
f"Analyzing {len(moveable_shards)} candidate shards "
211211
f"in size range {constraints.min_size}-{constraints.max_size}GB..."
212212
)
@@ -239,12 +239,11 @@ def generate_rebalancing_recommendations(
239239
# Optimize processing: if filtering by source node, only process those shards
240240
if constraints.source_node:
241241
processing_shards = [s for s in moveable_shards if s.node_name == constraints.source_node]
242-
print(f"Focusing on {len(processing_shards)} shards from node {constraints.source_node}")
242+
logger.info(f"Focusing on {len(processing_shards)} shards from node {constraints.source_node}")
243243
else:
244244
processing_shards = moveable_shards
245245

246246
# Generate move recommendations
247-
safe_recommendations = 0 # noqa: F841
248247
total_evaluated = 0
249248

250249
for i, shard in enumerate(processing_shards):
@@ -368,12 +367,12 @@ def generate_rebalancing_recommendations(
368367

369368
if len(processing_shards) > 100:
370369
print() # New line after progress dots
371-
print(f"Generated {len(recommendations)} move recommendations (evaluated {total_evaluated} shards)")
372-
print(f"Performance: {self.get_cache_stats()}")
370+
logger.info(f"Generated {len(recommendations)} move recommendations (evaluated {total_evaluated} shards)")
371+
logger.info(f"Performance: {self.get_cache_stats()}")
373372
return recommendations
374373

375374
def validate_move_safety(
376-
self, recommendation: ShardRelocationResponse, max_disk_usage_percent: float = 90.0
375+
self, recommendation: ShardRelocationResponse, max_disk_usage_percent: float = 90.0, buffer_gb: float = 50.0
377376
) -> Tuple[bool, str]:
378377
"""Validate that a move recommendation is safe to execute"""
379378
# Find target node (with caching)
@@ -388,7 +387,7 @@ def validate_move_safety(
388387
return False, zone_conflict
389388

390389
# Check available space
391-
required_space_gb = recommendation.size_gb + 50 # 50GB buffer
390+
required_space_gb = recommendation.size_gb + buffer_gb
392391
if target_node.available_space_gb < required_space_gb:
393392
return (
394393
False,
@@ -423,7 +422,7 @@ def _check_zone_conflict_cached(self, recommendation: ShardRelocationResponse) -
423422
"""Check zone conflicts with caching"""
424423
# Create cache key: table, shard, target zone
425424
target_zone = self._get_node_zone(recommendation.to_node)
426-
cache_key = (recommendation.table_name, recommendation.shard_id, target_zone)
425+
cache_key = (recommendation.schema_name, recommendation.table_name, recommendation.shard_id, target_zone)
427426

428427
if cache_key in self._zone_conflict_cache:
429428
self._cache_hits += 1

cratedb_toolkit/admin/xmover/analysis/table.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from rich.console import Console
1515
from rich.table import Table
1616

17+
from cratedb_toolkit.admin.xmover.model import NodeInfo
1718
from cratedb_toolkit.admin.xmover.util.database import CrateDBClient
1819

1920
logger = logging.getLogger(__name__)
@@ -97,6 +98,9 @@ def find_table_by_name(self, table_name: str) -> Optional[str]:
9798

9899
try:
99100
choice = input("\nSelect table (enter number): ").strip()
101+
if not choice:
102+
rprint("[yellow]No selection made[/yellow]")
103+
return None
100104
idx = int(choice) - 1
101105
if 0 <= idx < len(rows):
102106
schema, table = rows[idx]
@@ -292,14 +296,9 @@ def format_table_health_report(self, table_dist: TableDistribution) -> None:
292296
zone_distribution = {}
293297
for node_name, node_data in table_dist.node_distributions.items():
294298
# Try to get zone info for each node
295-
node_info = next((n for n in all_nodes_info if n.name == node_name), None)
296-
if (
297-
node_info
298-
and hasattr(node_info, "attributes")
299-
and node_info.attributes
300-
and "zone" in node_info.attributes
301-
):
302-
zone = node_info.attributes["zone"]
299+
node_info: NodeInfo = next((n for n in all_nodes_info if n.name == node_name), None)
300+
if node_info.zone:
301+
zone = node_info.zone
303302
if zone not in zone_distribution:
304303
zone_distribution[zone] = {"nodes": 0, "shards": 0, "size": 0}
305304
zone_distribution[zone]["nodes"] += 1

cratedb_toolkit/admin/xmover/cli.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
Command Line Interface.
55
"""
66

7-
import sys
87
import time
98
from typing import Optional
109

@@ -46,11 +45,11 @@ def main(ctx):
4645
if not client.test_connection():
4746
console.print("[red]Error: Could not connect to CrateDB[/red]")
4847
console.print("Please check your CRATE_CONNECTION_STRING in .env file")
49-
sys.exit(1)
48+
raise click.Abort()
5049
ctx.obj["client"] = client
5150
except Exception as e:
5251
console.print(f"[red]Error connecting to CrateDB: {e}[/red]")
53-
sys.exit(1)
52+
raise click.Abort() from e
5453

5554

5655
@main.command()
@@ -170,11 +169,11 @@ def test_connection(ctx, connection_string: Optional[str]):
170169
console.print(f" • {node.name} (zone: {node.zone})")
171170
else:
172171
console.print("[red]✗ Connection failed[/red]")
173-
sys.exit(1)
172+
raise click.Abort()
174173

175174
except Exception as e:
176175
console.print(f"[red]✗ Connection error: {e}[/red]")
177-
sys.exit(1)
176+
raise click.Abort() from e
178177

179178

180179
@main.command()
@@ -525,13 +524,14 @@ def monitor_recovery(
525524
xmover monitor-recovery --watch # Continuous monitoring
526525
xmover monitor-recovery --recovery-type PEER # Only PEER recoveries
527526
"""
527+
effective_recovery_type = None if recovery_type == "all" else recovery_type
528528
recovery_monitor = RecoveryMonitor(
529529
client=ctx.obj["client"],
530530
options=RecoveryOptions(
531531
table=table,
532532
node=node,
533533
refresh_interval=refresh_interval,
534-
recovery_type=recovery_type,
534+
recovery_type=effective_recovery_type,
535535
include_transitioning=include_transitioning,
536536
),
537537
)

cratedb_toolkit/admin/xmover/model.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import dataclasses
21
from dataclasses import dataclass
32
from typing import Dict, Optional
43

@@ -149,6 +148,12 @@ def safety_score(self) -> float:
149148
if "rebalancing" in self.reason.lower():
150149
score += 0.2
151150

151+
# Consider shard size - smaller shards are safer to move
152+
if self.size_gb < 10:
153+
score += 0.1
154+
elif self.size_gb > 100:
155+
score -= 0.2
156+
152157
# Ensure score stays in valid range
153158
return max(0.0, min(1.0, score))
154159

@@ -165,15 +170,15 @@ class DistributionStats:
165170
node_balance_score: float # 0-100, higher is better
166171

167172

168-
@dataclasses.dataclass
173+
@dataclass
169174
class SizeCriteria:
170175
min_size: float = 40.0
171176
max_size: float = 60.0
172177
table_name: Optional[str] = None
173178
source_node: Optional[str] = None
174179

175180

176-
@dataclasses.dataclass
181+
@dataclass
177182
class ShardRelocationConstraints:
178183
min_size: float = SizeCriteria().min_size
179184
max_size: float = SizeCriteria().max_size

cratedb_toolkit/admin/xmover/operational/monitor.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def get_cluster_recovery_status(self) -> List[RecoveryInfo]:
3737
)
3838

3939
# Apply recovery type filter
40-
if self.options.recovery_type is not None:
40+
if self.options.recovery_type is not None and self.options.recovery_type.lower() != "all":
4141
recoveries = [r for r in recoveries if r.recovery_type.upper() == self.options.recovery_type.upper()]
4242

4343
return recoveries
@@ -178,7 +178,6 @@ def start(self, watch: bool, debug: bool = False):
178178

179179
# Track previous state for change detection
180180
previous_recoveries: Dict[str, Dict[str, Any]] = {}
181-
previous_timestamp = None
182181
first_run = True
183182

184183
while True:
@@ -307,7 +306,6 @@ def start(self, watch: bool, debug: bool = False):
307306
elif active_count > 0:
308307
console.print(f"{current_time} | {status} (no changes)")
309308

310-
previous_timestamp = current_time # noqa: F841
311309
first_run = False
312310
time.sleep(self.options.refresh_interval)
313311

cratedb_toolkit/admin/xmover/operational/recommend.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ def validate(self, request: ShardRelocationRequest):
123123
console.print()
124124
console.print("[dim]# Monitor shard health after execution[/dim]")
125125
console.print(
126-
"[dim]# Check with: SELECT * FROM sys.shards "
127-
"WHERE table_name = '{table_name}' AND id = {shard_id};[/dim]"
126+
"[dim]# Check with: SELECT * FROM sys.shards " # noqa: S608
127+
f"WHERE table_name = '{table_name}' AND id = {request.shard_id};[/dim]"
128128
)
129129
else:
130130
console.print("[red]✗ VALIDATION FAILED - Move not safe[/red]")
@@ -323,7 +323,7 @@ def execute(
323323
rec, max_disk_usage_percent=constraints.max_disk_usage
324324
)
325325
if not is_safe:
326-
if "Zone conflict" in safety_msg:
326+
if "zone conflict" in safety_msg.lower():
327327
zone_conflicts += 1
328328
console.print(f"-- Move {i}: SKIPPED - {safety_msg}")
329329
console.print(
@@ -340,7 +340,7 @@ def execute(
340340

341341
# Auto-execution if requested
342342
if auto_execute:
343-
self._execute_recommendations_safely(recommendations, validate)
343+
self._execute_recommendations_safely(constraints, recommendations, validate)
344344

345345
if validate and safe_moves < len(recommendations):
346346
if zone_conflicts > 0:
@@ -352,14 +352,16 @@ def execute(
352352
f"[yellow]Warning: Only {safe_moves} of {len(recommendations)} moves passed safety validation[/yellow]"
353353
)
354354

355-
def _execute_recommendations_safely(self, recommendations, validate: bool):
355+
def _execute_recommendations_safely(self, constraints, recommendations, validate: bool):
356356
"""Execute recommendations with extensive safety measures"""
357357

358358
# Filter to only safe recommendations
359359
safe_recommendations = []
360360
if validate:
361361
for rec in recommendations:
362-
is_safe, safety_msg = self.analyzer.validate_move_safety(rec, max_disk_usage_percent=95.0)
362+
is_safe, safety_msg = self.analyzer.validate_move_safety(
363+
rec, max_disk_usage_percent=constraints.max_disk_usage
364+
)
363365
if is_safe:
364366
safe_recommendations.append(rec)
365367
else:
@@ -423,7 +425,8 @@ def _execute_recommendations_safely(self, recommendations, validate: bool):
423425
# Execute the SQL command
424426
result = self.client.execute_query(sql_command)
425427

426-
if result.get("rowcount", 0) >= 0: # Success indicator for ALTER statements
428+
# ALTER TABLE REROUTE commands don't return rowcount, check for no error instead.
429+
if "error" not in result:
427430
console.print(" [green]✅ SUCCESS[/green] - Move initiated")
428431
successful_moves += 1
429432

@@ -482,7 +485,8 @@ def _wait_for_recovery_capacity(self, max_concurrent_recoveries: int = 5):
482485
while True:
483486
# Check active recoveries (including transitioning)
484487
recoveries = recovery_monitor.get_cluster_recovery_status()
485-
active_count = len([r for r in recoveries if r.overall_progress < 100.0 or r.stage != "DONE"])
488+
# Count recoveries that are actively running (not completed)
489+
active_count = len([r for r in recoveries if r.overall_progress < 100.0])
486490
status = f"{active_count}/{max_concurrent_recoveries}"
487491
if active_count < max_concurrent_recoveries:
488492
if wait_time > 0:

cratedb_toolkit/admin/xmover/util/database.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ def __init__(self, connection_string: Optional[str] = None):
3939
if not self.connection_string.endswith("/_sql"):
4040
self.connection_string = self.connection_string.rstrip("/") + "/_sql"
4141

42+
self.session = requests.Session()
43+
4244
def execute_query(self, query: str, parameters: Optional[List] = None) -> Dict[str, Any]:
4345
"""Execute a SQL query against CrateDB"""
4446
payload: Dict[str, Any] = {"stmt": query}
@@ -51,11 +53,18 @@ def execute_query(self, query: str, parameters: Optional[List] = None) -> Dict[s
5153
auth = (self.username, self.password)
5254

5355
try:
54-
response = requests.post(
56+
response = self.session.post(
5557
self.connection_string, json=payload, auth=auth, verify=self.ssl_verify, timeout=30
5658
)
5759
response.raise_for_status()
58-
return response.json()
60+
data = response.json()
61+
# CrateDB may include an "error" field even with 200 OK
62+
if isinstance(data, dict) and "error" in data and data["error"]:
63+
# Best-effort message extraction
64+
err = data["error"]
65+
msg = err.get("message") if isinstance(err, dict) else str(err)
66+
raise Exception(f"CrateDB error: {msg}")
67+
return data
5968
except requests.exceptions.RequestException as e:
6069
raise Exception(f"Failed to execute query: {e}") from e
6170

@@ -335,13 +344,13 @@ def get_recovery_details(self, schema_name: str, table_name: str, shard_id: int)
335344
s."primary",
336345
s.translog_stats['size'] as translog_size
337346
FROM sys.shards s
338-
WHERE s.table_name = ? AND s.id = ?
347+
WHERE s.schema_name = ? AND s.table_name = ? AND s.id = ?
339348
AND (s.state = 'RECOVERING' OR s.routing_state IN ('INITIALIZING', 'RELOCATING'))
340349
ORDER BY s.schema_name
341350
LIMIT 1
342351
"""
343352

344-
result = self.execute_query(query, [table_name, shard_id])
353+
result = self.execute_query(query, [schema_name, table_name, shard_id])
345354

346355
if not result.get("rows"):
347356
return None

cratedb_toolkit/admin/xmover/util/error.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
1-
from typing import List, Optional, cast
1+
from typing import List, Optional
22

3-
from rich.console import Console
3+
from rich import get_console
44
from rich.panel import Panel
55

6-
console = Console()
6+
console = get_console()
77

88

99
def explain_cratedb_error(error_message: Optional[str]):
10+
"""
11+
Decode and troubleshoot common CrateDB shard allocation errors.
12+
13+
Parameters
14+
----------
15+
error_message:
16+
Raw CrateDB error message. If None and interactive=True, the user is prompted
17+
to paste the message (finish with two blank lines).
18+
interactive:
19+
When False, never prompt for input; return early if no message is provided.
20+
"""
1021
console.print(Panel.fit("[bold blue]CrateDB Error Message Decoder[/bold blue]"))
1122
console.print("[dim]Helps decode and troubleshoot CrateDB shard allocation errors[/dim]")
1223
console.print()
@@ -24,7 +35,7 @@ def explain_cratedb_error(error_message: Optional[str]):
2435
break
2536
error_message = "\n".join(lines)
2637

27-
if not error_message.strip():
38+
if not (error_message or "").strip():
2839
console.print("[yellow]No error message provided[/yellow]")
2940
return
3041

@@ -96,7 +107,7 @@ def explain_cratedb_error(error_message: Optional[str]):
96107
error_lower = error_message.lower()
97108

98109
for pattern_info in error_patterns:
99-
if cast(str, pattern_info["pattern"]).lower() in error_lower:
110+
if pattern_info["pattern"].lower() in error_lower: # type: ignore[attr-defined]
100111
matches.append(pattern_info)
101112

102113
if matches:

0 commit comments

Comments
 (0)