-
Notifications
You must be signed in to change notification settings - Fork 78
Add method to upload DuckDB files to Unity Catalog Volume with tests #2024
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
radhikaathalye-db
wants to merge
20
commits into
feature/add_local_dashboards
from
feature/upload_duckdb_extract
Closed
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
5585328
Add local dashboard classes.
goodwillpunning 07caf36
Update job deployer with profiler ingestion job.
goodwillpunning d03d81e
Add initial integration test.
goodwillpunning f8982dd
Add method to upload DuckDB files to Unity Catalog Volume with tests
radhikaathalye-db a4d2520
Update app context to call dashboard manager with WorkspaceClient.
goodwillpunning 4ebb53e
Add LSQL definitions for Synapse Profiler Dashboard
goodwillpunning 8370ef6
Merge latest from feature/add_local_dashboards into feature/upload_du…
radhikaathalye-db 72c3f87
refactor: use workspaceClient instead of requests; fix error logging
radhikaathalye-db 03ff5bf
Add more specific exception handling.
goodwillpunning 2aeab84
Update dedicated SQL pool LSQL widgets.
goodwillpunning c34394d
Replace LSQL dashboards with Python SDK.
goodwillpunning ac81031
Add private functions for creating/replacing profiler dashboard.
goodwillpunning 6070973
Add more specific error handling to dashboard manager.
goodwillpunning fb9eb00
Update args for CLI command.
goodwillpunning ac7c806
Remove profiler extract ingestion job deployer.
goodwillpunning a094691
Remove unit tests for profiler ingestion job.
goodwillpunning f8f11aa
Add method to upload DuckDB files to Unity Catalog Volume with tests
radhikaathalye-db 56be197
Merge upstream changes and update test cases.
goodwillpunning 136f115
Add more specific exception handling.
goodwillpunning 5fec3c6
Remove unnecessary params in dashboard manager.
goodwillpunning File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
171 changes: 171 additions & 0 deletions
171
src/databricks/labs/lakebridge/assessments/dashboards/dashboard_manager.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,171 @@ | ||
| import io | ||
| import os | ||
| import json | ||
|
|
||
| import logging | ||
| from pathlib import Path | ||
|
|
||
| from databricks.sdk.errors import PermissionDenied, NotFound, InternalError | ||
| from databricks.sdk.errors.platform import ResourceAlreadyExists, DatabricksError | ||
| from databricks.sdk.service.dashboards import Dashboard | ||
| from databricks.sdk.service.iam import User | ||
| from databricks.sdk import WorkspaceClient | ||
|
|
||
| from databricks.labs.blueprint.wheels import find_project_root | ||
|
|
||
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class DashboardTemplateLoader: | ||
| """ | ||
| Class for loading the JSON representation of a Databricks dashboard | ||
| according to the source system. | ||
| """ | ||
|
|
||
| def __init__(self, templates_dir: Path | None): | ||
| self.templates_dir = templates_dir | ||
|
|
||
| def load(self, source_system: str) -> dict: | ||
| """ | ||
| Loads a profiler summary dashboard. | ||
| :param source_system: - the name of the source data warehouse | ||
| """ | ||
| if self.templates_dir is None: | ||
| raise ValueError("Dashboard template path cannot be empty.") | ||
|
|
||
| filename = f"{source_system.lower()}_dashboard.lvdash.json" | ||
| filepath = os.path.join(self.templates_dir, filename) | ||
| if not os.path.exists(filepath): | ||
| raise FileNotFoundError(f"Could not find dashboard template matching '{source_system}'.") | ||
| with open(filepath, "r", encoding="utf-8") as f: | ||
| return json.load(f) | ||
|
|
||
|
|
||
| class DashboardManager: | ||
| """ | ||
| Class for managing the lifecycle of a profiler dashboard summary, a.k.a. "local dashboards" | ||
| """ | ||
|
|
||
| _DASHBOARD_NAME = "Lakebridge Profiler Assessment" | ||
|
|
||
| def __init__(self, ws: WorkspaceClient, is_debug: bool = False): | ||
| self._ws = ws | ||
| self._is_debug = is_debug | ||
|
|
||
| @staticmethod | ||
| def _replace_catalog_schema( | ||
| serialized_dashboard: str, | ||
| new_catalog: str, | ||
| new_schema: str, | ||
| old_catalog: str = "`PROFILER_CATALOG`", | ||
| old_schema: str = "`PROFILER_SCHEMA`", | ||
| ): | ||
| """Given a serialized JSON dashboard, replaces all catalog and schema references with the | ||
| provided catalog and schema names.""" | ||
| updated_dashboard = serialized_dashboard.replace(old_catalog, f"`{new_catalog}`") | ||
| return updated_dashboard.replace(old_schema, f"`{new_schema}`") | ||
|
|
||
| def _create_or_replace_dashboard( | ||
| self, folder: Path, ws_parent_path: str, dest_catalog: str, dest_schema: str | ||
| ) -> Dashboard: | ||
| """ | ||
| Creates or updates a profiler summary dashboard in the current user’s Databricks workspace home. | ||
| Existing dashboards are automatically replaced with the latest dashboard template. | ||
| """ | ||
|
|
||
| # Load the dashboard template | ||
| logging.info(f"Loading dashboard template {folder}") | ||
| dashboard_loader = DashboardTemplateLoader(folder) | ||
| dashboard_json = dashboard_loader.load(source_system="synapse") | ||
| dashboard_str = json.dumps(dashboard_json) | ||
|
|
||
| # Replace catalog and schema placeholders | ||
| updated_dashboard_str = self._replace_catalog_schema( | ||
| dashboard_str, new_catalog=dest_catalog, new_schema=dest_schema | ||
| ) | ||
| dashboard = Dashboard( | ||
| display_name=self._DASHBOARD_NAME, | ||
| parent_path=ws_parent_path, | ||
| warehouse_id=self._ws.config.warehouse_id, | ||
| serialized_dashboard=updated_dashboard_str, | ||
| ) | ||
|
|
||
| # Create dashboard or replace if previously deployed | ||
| try: | ||
| dashboard = self._ws.lakeview.create(dashboard=dashboard) | ||
| except ResourceAlreadyExists: | ||
| logging.info("Dashboard already exists! Removing dashboard from workspace location.") | ||
| dashboard_ws_path = str(Path(ws_parent_path) / f"{self._DASHBOARD_NAME}.lvdash.json") | ||
| self._ws.workspace.delete(dashboard_ws_path) | ||
| dashboard = self._ws.lakeview.create(dashboard=dashboard) | ||
| except DatabricksError as e: | ||
| logging.error(f"Could not create profiler summary dashboard: {e}") | ||
|
|
||
| if dashboard.dashboard_id: | ||
| logging.info(f"Created dashboard '{dashboard.dashboard_id}' in workspace location '{ws_parent_path}'.") | ||
|
|
||
| return dashboard | ||
|
|
||
| def create_profiler_summary_dashboard( | ||
| self, | ||
| extract_file: str, | ||
| source_tech: str, | ||
| catalog_name: str = "lakebridge_profiler", | ||
| schema_name: str = "profiler_runs", | ||
| ) -> None: | ||
| """Deploys a profiler summary dashboard to the current Databricks user’s workspace home.""" | ||
|
|
||
| logger.info("Deploying profiler summary dashboard.") | ||
|
|
||
| # Load the AI/BI Dashboard template for the source system | ||
| template_folder = ( | ||
| find_project_root(__file__) | ||
| / f"src/databricks/labs/lakebridge/resources/assessments/dashboards/{source_tech}" | ||
| ) | ||
| ws_path = f"/Workspace/Users/{self._current_user}/Lakebridge/Dashboards/" | ||
| self._create_or_replace_dashboard( | ||
| folder=template_folder, ws_parent_path=ws_path, dest_catalog=catalog_name, dest_schema=schema_name | ||
| ) | ||
|
|
||
| def upload_duckdb_to_uc_volume(self, local_file_path, volume_path): | ||
| """ | ||
| Upload a DuckDB file to Unity Catalog Volume | ||
| Args: | ||
| local_file_path (str): Local path to the DuckDB file | ||
| volume_path (str): Target path in UC Volume (e.g., '/Volumes/catalog/schema/volume/myfile.duckdb') | ||
| Returns: | ||
| bool: True if successful, False otherwise | ||
| """ | ||
|
|
||
| # Validate inputs | ||
| if not os.path.exists(local_file_path): | ||
| logger.error(f"Local file not found: {local_file_path}") | ||
| return False | ||
|
|
||
| if not volume_path.startswith('/Volumes/'): | ||
| logger.error("Volume path must start with '/Volumes/'") | ||
| return False | ||
|
|
||
| try: | ||
| with open(local_file_path, 'rb') as f: | ||
| file_bytes = f.read() | ||
| binary_data = io.BytesIO(file_bytes) | ||
| self._ws.files.upload(volume_path, binary_data, overwrite=True) | ||
| logger.info(f"Successfully uploaded {local_file_path} to {volume_path}") | ||
| return True | ||
| except FileNotFoundError as e: | ||
| logger.error(f"Profiler extract file was not found: \n{e}") | ||
| return False | ||
| except PermissionDenied as e: | ||
| logger.error(f"Insufficient privileges detected while accessing Volume path: \n{e}") | ||
| return False | ||
| except NotFound as e: | ||
| logger.error(f"Invalid Volume path provided: \n{e}") | ||
| return False | ||
| except InternalError as e: | ||
| logger.error(f"Internal Databricks error while uploading extract file: \n{e}") | ||
| return False | ||
| except Exception as e: | ||
goodwillpunning marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logger.error(f"Failed to upload file: {str(e)}") | ||
| return False | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.