Skip to content

Commit

Permalink
v1.1.0 (#12)
Browse files Browse the repository at this point in the history
* feat: add just docker-run-python
* feat: add orbiter translate --analyze
* feat: add installing pyz from GitHub for binary execution mode
* chore: add config file for various env vars
* chore: update translation ruleset template
* chore: add back integration_test, test fix re: formatting
  • Loading branch information
fritz-astronomer authored Aug 29, 2024
1 parent 59cf9a7 commit 29dc851
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- uses: extractions/setup-just@v1
- uses: extractions/setup-just@v2
- run: just install
- run: just build
if: matrix.os == 'ubuntu-20.04'
Expand Down
25 changes: 24 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,33 @@ docker-run-binary REPO='orbiter-community-translations' RULESET='orbiter_transla
#!/usr/bin/env bash
set -euxo pipefail
cat <<"EOF" | docker run --platform linux/amd64 -v `pwd`:/data -w /data -i ubuntu /bin/bash
chmod +x ./orbiter-linux-x86_64 && \
echo "setting up certificates for https" && \
apt update && apt install -y ca-certificates && update-ca-certificates --fresh && \
echo "sourcing .env" && \
set -a && source .env && set +a && \
chmod +x ./orbiter-linux-x86_64 && \
echo "[ORBITER LIST-RULESETS]" && \
./orbiter-linux-x86_64 list-rulesets && \
mkdir -p workflow && \
echo "[ORBITER INSTALL]" && \
LOG_LEVEL=DEBUG ./orbiter-linux-x86_64 install --repo={{REPO}} && \
echo "[ORBITER TRANSLATE]" && \
LOG_LEVEL=DEBUG ./orbiter-linux-x86_64 translate workflow/ output/ --ruleset {{RULESET}}
EOF
docker-run-python REPO='orbiter-community-translations' RULESET='orbiter_translations.oozie.xml_demo.translation_ruleset':
#!/usr/bin/env bash
set -euxo pipefail
cat <<"EOF" | docker run --platform linux/amd64 -v `pwd`:/data -w /data -i python /bin/bash
echo "sourcing .env" && \
set -a && source .env && set +a && \
echo "installing orbiter" && \
pip install '.'
echo "[ORBITER LIST-RULESETS]" && \
orbiter list-rulesets && \
mkdir -p workflow && \
echo "[ORBITER INSTALL]" && \
LOG_LEVEL=DEBUG orbiter install --repo={{REPO}} && \
echo "[ORBITER TRANSLATE]" && \
LOG_LEVEL=DEBUG orbiter translate workflow/ output/ --ruleset {{RULESET}}
EOF
8 changes: 1 addition & 7 deletions orbiter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
from __future__ import annotations

import os
import re
from enum import Enum
from typing import Any, Tuple

__version__ = "1.0.2"
__version__ = "1.1.0"

version = __version__

KG_ACCOUNT_ID = "3b189b4c-c047-4fdb-9b46-408aa2978330"

ORBITER_TASK_SUFFIX = os.getenv("ORBITER_TASK_SUFFIX", "_task")
"""By default, we add `_task` as a suffix to a task name to prevent name collision issues. This can be overridden."""


class FileType(Enum):
YAML = "YAML"
Expand Down
50 changes: 40 additions & 10 deletions orbiter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
from csv import DictReader
from rich.markdown import Markdown
import pkgutil

from orbiter import import_from_qualname, KG_ACCOUNT_ID
from urllib.request import urlretrieve

from orbiter import import_from_qualname
from orbiter.config import (
RUNNING_AS_BINARY,
KG_ACCOUNT_ID,
TRANSLATION_VERSION,
LOG_LEVEL,
)
from orbiter.rules.rulesets import TranslationRuleset

RUNNING_AS_BINARY = getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS")


# ### LOGGING ###
def formatter(r):
Expand All @@ -38,7 +43,6 @@ def formatter(r):


logger.remove()
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
sys.tracebacklimit = 1000 if LOG_LEVEL == "DEBUG" else 0
logger_defaults = dict(colorize=True, format=formatter)
exceptions_off = {"backtrace": False, "diagnose": False}
Expand Down Expand Up @@ -160,11 +164,18 @@ def orbiter():
default=True,
show_default=True,
)
@click.option(
"--analyze/--no-analyze",
help="[optional] print statistics instead of rendering output",
default=False,
show_default=True,
)
def translate(
input_dir: Path,
output_dir: Path,
ruleset: str | None,
_format: bool,
analyze: bool,
):
"""Translate workflows in an `INPUT_DIR` to an `OUTPUT_DIR` Airflow Project.
Expand Down Expand Up @@ -195,9 +206,13 @@ def translate(
)

try:
translation_ruleset.translate_fn(
project = translation_ruleset.translate_fn(
translation_ruleset=translation_ruleset, input_dir=input_dir
).render(output_dir)
)
if analyze:
project.analyze()
else:
project.render(output_dir)
except RuntimeError as e:
logger.error(f"Error encountered during translation: {e}")
raise click.Abort()
Expand All @@ -207,7 +222,8 @@ def translate(

def _pip_install(repo: str, key: str):
"""If we are running via python/pip, we can utilize pip to install translations"""
_exec = f"pip3 install {repo}"
_exec = f"{sys.executable} -m pip install {repo}"
_exec += f"=={TRANSLATION_VERSION}" if TRANSLATION_VERSION != "latest" else ""
if repo == "astronomer-orbiter-translations":
if not key:
raise ValueError(
Expand Down Expand Up @@ -247,6 +263,20 @@ def _get_keygen_pyz(key):
f.write(r.content)


def _get_gh_pyz(
repo: str = "https://github.com/astronomer/orbiter-community-translations",
file: str = "orbiter_translations.pyz",
):
if TRANSLATION_VERSION != "latest":
url = f"{repo}/releases/download/{TRANSLATION_VERSION}/{file}"
else:
url = f"{repo}/releases/latest/download/{file}"
logger.info(f"Downloading {file} from {url}")
(downloaded_file, res) = urlretrieve(url, file) # nosec B310
logger.debug(f"Downloaded {file} to {downloaded_file}, response: {res}")
return downloaded_file


def _add_pyz():
local_pyz = [
str(_path.resolve()) for _path in Path(".").iterdir() if _path.suffix == ".pyz"
Expand All @@ -257,14 +287,14 @@ def _add_pyz():

def _bin_install(repo: str, key: str):
"""If we are running via a PyInstaller binary, we need to download a .pyz"""
if repo == "astronomer-orbiter-translations":
if "astronomer-orbiter-translations" in repo:
if not key:
raise ValueError(
"License key is required for 'astronomer-orbiter-translations'!"
)
_get_keygen_pyz(key)
else:
raise NotImplementedError()
_get_gh_pyz()
_add_pyz()
(_, _version) = import_from_qualname("orbiter_translations.version")
logger.info(f"Successfully installed {repo}, version: {_version}")
Expand Down
14 changes: 14 additions & 0 deletions orbiter/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os
import sys

TRANSLATION_VERSION = os.getenv("ORBITER_TRANSLATION_VERSION", "latest")
"""The version of the translation ruleset to download. This can be overridden."""

ORBITER_TASK_SUFFIX = os.getenv("ORBITER_TASK_SUFFIX", "_task")
"""By default, we add `_task` as a suffix to a task name to prevent name collision issues. This can be overridden."""

LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
"""You can set the log level to DEBUG to see more detailed logs."""

KG_ACCOUNT_ID = "3b189b4c-c047-4fdb-9b46-408aa2978330"
RUNNING_AS_BINARY = getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS")
107 changes: 106 additions & 1 deletion orbiter/objects/project.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

from functools import reduce
from pathlib import Path
from typing import Dict, Iterable, Set
from typing import Dict, Iterable, Set, Literal

import yaml
from loguru import logger
Expand Down Expand Up @@ -609,6 +610,100 @@ def render(self, output_dir: Path) -> None:
else:
logger.debug("No entries for .env")

@validate_call
def analyze(self, output_fmt: Literal["json", "csv", "md"] = "md"):
"""Print an analysis of the project to the console.
```pycon
>>> from orbiter.objects.operators.empty import OrbiterEmptyOperator
>>> OrbiterProject().add_dags([
... OrbiterDAG(file_path="", dag_id="foo", orbiter_kwargs={"file_path": "foo.py"},
... tasks={"bar": OrbiterEmptyOperator(task_id="bar")}
... ),
... OrbiterDAG(file_path="", dag_id="baz", orbiter_kwargs={"file_path": "baz.py"},
... tasks={"bop": OrbiterEmptyOperator(task_id="bop")}
... )
... ]).analyze()
... # doctest: +ELLIPSIS
┏━...
...Analysis...
┗━...
<BLANKLINE>
<BLANKLINE>
DAGs OrbiterEmptyOperator
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
foo.py 1 1
baz.py 1 1
Totals 2 2
<BLANKLINE>
```
"""
import sys

dag_analysis = [
{
"file": dag.orbiter_kwargs.get("file_path", dag.file_path),
"dag_id": dag.dag_id,
"task_types": [type(task).__name__ for task in dag.tasks.values()],
}
for dag in self.dags.values()
]

file_analysis = {}
for analysis in dag_analysis:
analysis_output = file_analysis.get(analysis["file"], {})
analysis_output["DAGs"] = analysis_output.get("DAGs", 0) + 1
tasks_of_type = reduce(
lambda acc, task_type: acc | {task_type: acc.get(task_type, 0) + 1},
analysis["task_types"],
dict(),
)
analysis_output |= tasks_of_type
file_analysis[analysis["file"]] = analysis_output

file_analysis = [{"": k} | v for k, v in file_analysis.items()]
totals = {"": "Totals"}
for file in file_analysis:
for k, v in file.items():
if k != "":
totals[k] = totals.get(k, 0) + v
file_analysis.append(totals)

if output_fmt == "json":
import json

json.dump(file_analysis, sys.stdout)
elif output_fmt == "csv":
import csv
import sys

writer = csv.DictWriter(sys.stdout, fieldnames=file_analysis[0].keys())
writer.writeheader()
writer.writerows(file_analysis)
elif output_fmt == "md":
from rich.console import Console
from rich.markdown import Markdown
from tabulate import tabulate

console = Console()

# DAGs EmptyOp
# file_a 1 1
table = tabulate(
tabular_data=file_analysis,
headers="keys",
tablefmt="pipe",
# https://github.com/Textualize/rich/issues/3027
missingval="⠀", # (special 'braille space' character)
)
console.print(
Markdown(
f"# Analysis\n{table}",
style="magenta",
)
)


# https://github.com/pydantic/pydantic/issues/8790
OrbiterProject.add_connections = validate_call()(OrbiterProject.add_connections)
Expand All @@ -618,3 +713,13 @@ def render(self, output_dir: Path) -> None:
OrbiterProject.add_pools = validate_call()(OrbiterProject.add_pools)
OrbiterProject.add_requirements = validate_call()(OrbiterProject.add_requirements)
OrbiterProject.add_variables = validate_call()(OrbiterProject.add_variables)


if __name__ == "__main__":
import doctest

doctest.testmod(
optionflags=doctest.ELLIPSIS
| doctest.NORMALIZE_WHITESPACE
| doctest.IGNORE_EXCEPTION_DETAIL
)
3 changes: 2 additions & 1 deletion orbiter/objects/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from loguru import logger
from pydantic import AfterValidator, BaseModel, validate_call

from orbiter import clean_value, ORBITER_TASK_SUFFIX
from orbiter import clean_value
from orbiter.ast_helper import (
OrbiterASTBase,
py_function,
py_bitshift,
)
from orbiter.ast_helper import py_assigned_object
from orbiter.config import ORBITER_TASK_SUFFIX
from orbiter.objects import ImportList
from orbiter.objects import OrbiterBase
from orbiter.objects.pool import OrbiterPool
Expand Down
2 changes: 1 addition & 1 deletion orbiter/objects/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from pydantic import field_validator

from orbiter import ORBITER_TASK_SUFFIX
from orbiter.config import ORBITER_TASK_SUFFIX
from orbiter.ast_helper import (
OrbiterASTBase,
py_with,
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ dependencies = [
# CLI prompt
"questionary",

# fetch translation_rulesets in binary mode
"requests",

# For validation of objects
"pydantic >= 2.6",

Expand All @@ -61,7 +64,7 @@ dependencies = [
"pendulum",
"tzdata", # for timezone data, if system doesn't have it?

# for 'help' command
# for 'list-rulesets' command, and '--analyze'
"tabulate",

# logging
Expand Down
4 changes: 0 additions & 4 deletions tests/integration_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import pytest

from orbiter.__main__ import run
from tests.conftest import manual_tests


# noinspection PyUnreachableCode
@pytest.mark.skip("Relies on orbiter-community-translations, not yet published")
@manual_tests
def test_integration():
run("just docker-build-binary", shell=True, capture_output=True, text=True)
Expand Down
Loading

0 comments on commit 29dc851

Please sign in to comment.