diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..02f2ff1 --- /dev/null +++ b/.env.example @@ -0,0 +1,3 @@ +# CONFIG +DBT_VENV=/path/to/dbt/virtual/env +PREFECT_WORK_QUEUE=ddp \ No newline at end of file diff --git a/.gitignore b/.gitignore index cbf5305..ad54096 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,18 @@ logs prefect-env dbt-env -.env +.*env +*venv dbt/ +*.pyc + +.*yaml +.*yml + .DS_store # prefect artifacts .prefectignore -*.json \ No newline at end of file +config.json \ No newline at end of file diff --git a/README.md b/README.md index 5694f07..026431e 100644 --- a/README.md +++ b/README.md @@ -1 +1,15 @@ # ddp_prefect_starter + +- Setup a virtual environment using ``` python3 -m venv ``` +- Activate the virtual env ``` source /bin/activate ``` +- Install the dependencies from requirements.txt ``` pip3 install -r requirements.txt ``` +- If you are using any new packages, install them in your virtual env and use this to update the requirements.txt ``` pip3 freeze > requirements.txt ``` +- Start the prefect orion UI server ``` screen -S prefect-server -dm prefect server start ``` +- To execute your flow you need to run an agent in different terminal ``` prefect agent start -q default ```. This will run the default queue worker. + +# prefect cli tools + +- Run the help command to understand the options ``` python3 main.py -h ``` +- Setup the organization config in ```config.json``` . The boilerplate for this is available in ``` configExample.json ``` +- Run the main file to deploy your organization ``` python3 main.py --deploy ``` based on the config entered in ```config.json``` +- To reset the deployments after updates in config run ``` python3 main.py --deploy --reset yes ``` diff --git a/configExample.json b/configExample.json new file mode 100644 index 0000000..797c186 --- /dev/null +++ b/configExample.json @@ -0,0 +1,14 @@ +{ + "stir": [ + { + "connection_ids": [""], + "dbt_dir": null, + "schedule": "0 */2 * * *" + }, + { + "connection_ids": [""], + "dbt_dir": "/Path/to/dbt/code/repo", + "schedule": "0 */2 * * *" + } + ] +} diff --git a/deployments/Deployment.py b/deployments/Deployment.py new file mode 100644 index 0000000..e69de29 diff --git a/deployments/__init__.py b/deployments/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/Flow.py b/flows/Flow.py new file mode 100644 index 0000000..077309e --- /dev/null +++ b/flows/Flow.py @@ -0,0 +1,34 @@ +from prefect import flow +from typing import List, Union +from tasks.Airbyte import Airbyte +from tasks.Dbt import Dbt +from pydantic import BaseModel + +class Flow(BaseModel): + airbytes: Union[List[Airbyte], None] + dbt: Union[Dbt, None] + +@flow(flow_run_name='airbyte_flow') +def airbyte_flow(flow: Flow): + for airbyte in flow.airbytes: + airbyte.sync() + +@flow(flow_run_name='dbt_flow') +def dbt_flow(flow: Flow): + flow.dbt.pull_dbt_repo() + flow.dbt.dbt_deps() + flow.dbt.dbt_source_snapshot_freshness() + flow.dbt.dbt_run() + flow.dbt.dbt_test() + +@flow(flow_run_name='airbyte_dbt_flow') +def airbyte_dbt_flow(flow: Flow): + + for airbyte in flow.airbytes: + airbyte.sync() + + flow.dbt.pull_dbt_repo() + flow.dbt.dbt_deps() + flow.dbt.dbt_source_snapshot_freshness() + flow.dbt.dbt_run() + flow.dbt.dbt_test() \ No newline at end of file diff --git a/flows/__init__.py b/flows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py index c664aa7..731eb5f 100644 --- a/main.py +++ b/main.py @@ -1,62 +1,74 @@ -from prefect import flow, task -from prefect_airbyte.connections import trigger_sync -from prefect_dbt.cli.commands import trigger_dbt_cli_command -from prefect_shell import shell_run_command +import sys, getopt, argparse, json, os from dotenv import load_dotenv -import os +from pathlib import Path +import asyncio +from sqlalchemy import create_engine +from prefect.settings import PREFECT_API_DATABASE_CONNECTION_URL, PREFECT_API_URL +from sqlalchemy.ext.asyncio import create_async_engine, async_session, AsyncSession +from sqlalchemy.orm import sessionmaker +from organization.Organization import Organization + +# Load env load_dotenv() -@flow(name="airbyte-sync") -def run_airbyte_sync(): - trigger_sync( - connection_id=os.getenv('DOST_AIRBYTE_CONNECTION'), - poll_interval_s=3, - status_updates=True - ) - return 1 +# Load config +async def main(): -@flow(name="github-pull") -def pull_dost_github_repo(): - shell_run_command('rm -rf dbt && git clone '+ os.getenv('DOST_GITHUB_URL')) - return 1 + try: + config = None + if not Path('config.json').is_file(): + raise Exception('Config file not found') -@flow(name="dbt-transform") -def run_dbt_transform(): - trigger_dbt_cli_command( - command="dbt deps", project_dir='dbt' - ) - trigger_dbt_cli_command( - command="dbt run", project_dir='dbt' - ) - return 1 + with open('config.json') as f: + config = json.load(f) -@flow(name="orchestration-flow") -def run_flow(): + parser = argparse.ArgumentParser(description='Prefect deployment of NGOs') + parser.add_argument( + '--deploy', + required=True, + choices=['stir', 'sneha', 'shri'], + help='please enter the name of the NGO', + metavar='' + ) + parser.add_argument( + '--reset', + required=False, + choices=['yes', 'no'], + default='no', + help='resetting the deployments will remove all deployments and create fresh ones', + metavar='' + ) + args = parser.parse_args() - #syncing airbyte - run_airbyte_sync() + if args.deploy not in config: + raise Exception(f'Config for {args.deploy} org not found') + + # cli args + org_name = args.deploy + reset = args.reset - # pull dbt repo - pull_dost_github_repo() + # create a datbase session + url = PREFECT_API_DATABASE_CONNECTION_URL.value() + engine = create_async_engine( + url, + echo=True, + ) + session = sessionmaker(engine, class_=AsyncSession) - #dbt transform - run_dbt_transform() + organization = Organization(name=org_name, session=session(), pipelines=config[org_name]) -@flow(name="orchestrate-airbyte") -def run_airbyte_flow(): + await organization.deploy(reset == 'yes') + + await organization.close_session() - #syncing airbyte - run_airbyte_sync() + except Exception as e: -@flow(name="orchestrate-dbt") -def run_dbt_flow(): + print(e) - # pull dbt repo - pull_dost_github_repo() +if __name__ == "__main__": - #dbt transform - run_dbt_transform() + asyncio.run(main()) + -if __name__ == "__main__": - run_dbt_flow() \ No newline at end of file + \ No newline at end of file diff --git a/orchestrate-airbyte-deployment.yaml b/orchestrate-airbyte-deployment.yaml deleted file mode 100644 index 41a9bc2..0000000 --- a/orchestrate-airbyte-deployment.yaml +++ /dev/null @@ -1,38 +0,0 @@ -### -### A complete description of a Prefect Deployment for flow 'orchestrate-airbyte' -### -name: dost-orchestrate-airbyte -description: null -version: b493fbec7c9fb61bdf5a55f04d9cdfd5 -# The work queue that will handle this deployment's runs -work_queue_name: default -tags: -- staging -parameters: {} -schedule: null -infra_overrides: {} -infrastructure: - type: process - env: {} - labels: {} - name: null - command: null - stream_output: true - working_dir: null - block_type_slug: process - _block_type_slug: process - -### -### DO NOT EDIT BELOW THIS LINE -### -flow_name: orchestrate-airbyte -manifest_path: null -storage: null -path: /Users/dorjayyolmo/Dev/data/prefect_practise -entrypoint: main.py:orchestrate-airbyte -parameter_openapi_schema: - title: Parameters - type: object - properties: {} - required: null - definitions: null diff --git a/orchestrate-dbt-deployment.yaml b/orchestrate-dbt-deployment.yaml deleted file mode 100644 index 390faff..0000000 --- a/orchestrate-dbt-deployment.yaml +++ /dev/null @@ -1,39 +0,0 @@ -### -### A complete description of a Prefect Deployment for flow 'orchestrate-dbt' -### -name: dost-orchestrate-dbt -description: null -version: 61247154705433137246cd6f816e8246 -# The work queue that will handle this deployment's runs -work_queue_name: default -tags: - - staging -parameters: {} -schedule: - interval: 3600 -infra_overrides: {} -infrastructure: - type: process - env: {} - labels: {} - name: null - command: null - stream_output: true - working_dir: null - block_type_slug: process - _block_type_slug: process - -### -### DO NOT EDIT BELOW THIS LINE -### -flow_name: orchestrate-dbt -manifest_path: null -storage: null -path: /Users/dorjayyolmo/Dev/data/prefect_practise -entrypoint: main.py:orchestrate-dbt -parameter_openapi_schema: - title: Parameters - type: object - properties: {} - required: null - definitions: null diff --git a/orchestration-flow-deployment.yaml b/orchestration-flow-deployment.yaml deleted file mode 100644 index ec06f37..0000000 --- a/orchestration-flow-deployment.yaml +++ /dev/null @@ -1,38 +0,0 @@ -### -### A complete description of a Prefect Deployment for flow 'orchestration-flow' -### -name: dost-orchestrate-flow -description: null -version: 894a85addfd1a0c2d4ef1729a3066908 -# The work queue that will handle this deployment's runs -work_queue_name: default -tags: -- staging -parameters: {} -schedule: null -infra_overrides: {} -infrastructure: - type: process - env: {} - labels: {} - name: null - command: null - stream_output: true - working_dir: null - block_type_slug: process - _block_type_slug: process - -### -### DO NOT EDIT BELOW THIS LINE -### -flow_name: orchestration-flow -manifest_path: null -storage: null -path: /Users/dorjayyolmo/Dev/data/prefect_practise -entrypoint: main.py:orchestration-flow -parameter_openapi_schema: - title: Parameters - type: object - properties: {} - required: null - definitions: null diff --git a/organization/Organization.py b/organization/Organization.py new file mode 100644 index 0000000..be4b10e --- /dev/null +++ b/organization/Organization.py @@ -0,0 +1,115 @@ +import os +import requests +from prefect import flow +from typing import List, Mapping, Union +from pydantic import BaseModel +from prefect.deployments import Deployment +from prefect.server.models.deployments import read_deployments, delete_deployment +from prefect.server.models.flows import read_flows, delete_flow +from prefect.server.schemas.filters import DeploymentFilterTags +from prefect.server.schemas.schedules import CronSchedule +from prefect.settings import PREFECT_UI_API_URL +from sqlalchemy.orm import Session + +from tasks.Dbt import Dbt +from tasks.Airbyte import Airbyte +from flows.Flow import airbyte_flow, dbt_flow, airbyte_dbt_flow + +class Organization(BaseModel): + name: str = None + pipelines: List = None + session: Session = None + + class Config: + arbitrary_types_allowed=True + + def __init__(self, name: str, session: Session, pipelines: List): + super().__init__() + + self.name = name + self.pipelines = pipelines + self.session = session + + async def deploy(self, reset = False): + try: + + if reset: + + deploymentFilter = DeploymentFilterTags(all_=[self.name]) + flows = await read_flows(self.session, deployment_filter=deploymentFilter) + api_url = PREFECT_UI_API_URL.value() + + # delete a flow removes the deployments too + for flow in flows: + requests.delete(api_url + f'/flows/{flow.id}') + # the code below doesn't seem to work + # await delete_flow(self.session, flow.id) + + work_queue_name = os.getenv('PREFECT_WORK_QUEUE') + + for pipeline in self.pipelines: + + # Each pipeline will deployed and run as a flow as per the schedule + + airbyte_objs = [] + dbt_obj = None + flow_name = '' + deployment_name = '' + flow_function = None + tags = [self.name] + + has_airbyte = False + has_dbt = False + + # Check if airbyte connections are part of pipeline + if 'connection_ids' in pipeline and pipeline['connection_ids'] is not None and len(pipeline['connection_ids']) > 0: + has_airbyte = True + for connection_id in pipeline['connection_ids']: + airbyte = Airbyte(connection_id=connection_id) + airbyte_objs.append(airbyte) + + # Check if dbt transformation is part of repo + if 'dbt_dir' in pipeline and pipeline['dbt_dir'] is not None: + has_dbt = True + dbt_obj = Dbt(pipeline['dbt_dir'], os.getenv('DBT_VENV')) + + if has_airbyte and has_dbt: + flow_name = f'{self.name}_airbyte_dbt_flow' + deployment_name = f'{self.name}_airbyte_dbt_deploy' + # flow_function = getattr(flow, 'airbyte_dbt_flow') # Callable + flow_function = airbyte_dbt_flow + elif has_airbyte: + flow_name = f'{self.name}_airbyte_flow' + deployment_name = f'{self.name}_airbyte_deploy' + # flow_function = getattr(flow, 'airbyte_flow') # Callable + flow_function = airbyte_flow + elif has_dbt: + flow_name = f'{self.name}_dbt_flow' + deployment_name = f'{self.name}_dbt_deploy' + # flow_function = getattr(flow, 'dbt_flow') # Callable + flow_function = dbt_flow + + deployment = await Deployment.build_from_flow( + flow=flow_function.with_options(name=flow_name), + name=deployment_name, + work_queue_name=work_queue_name, + tags = tags, + ) + deployment.parameters = {'flow': {'airbytes': airbyte_objs, 'dbt': dbt_obj}} + if 'schedule' in pipeline and pipeline['schedule'] is not None and len(pipeline['schedule']) > 3: + deployment.schedule = CronSchedule(cron = pipeline['schedule']) + await deployment.apply() + + return deployment + + except Exception as e: + + print(e) + + await self.close_session() + + async def close_session(self): + + await self.session.close() + + return diff --git a/organization/__init__.py b/organization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index a00a20e..e8be5f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,112 +1,84 @@ -agate==1.6.3 -aiosqlite==0.17.0 -alembic==1.9.0 +aiosqlite==0.18.0 +alembic==1.9.4 anyio==3.6.2 -apprise==1.2.0 +apprise==1.3.0 asgi-lifespan==2.0.0 asyncpg==0.27.0 -attrs==22.1.0 -Babel==2.11.0 -cachetools==5.2.0 +attrs==22.2.0 +cachetools==5.3.0 certifi==2022.12.7 cffi==1.15.1 -charset-normalizer==2.1.1 +charset-normalizer==3.0.1 click==8.1.3 -cloudpickle==2.2.0 -colorama==0.4.5 -commonmark==0.9.1 -coolname==2.1.0 +cloudpickle==2.2.1 +colorama==0.4.6 +coolname==2.2.0 croniter==1.3.8 -cryptography==38.0.4 -dbt-bigquery==1.3.0 -dbt-core==1.3.1 -dbt-extractor==0.4.1 -dbt-postgres==1.3.1 +cryptography==39.0.1 +dateparser==1.1.7 docker==6.0.1 -fastapi==0.88.0 -fsspec==2022.11.0 -future==0.18.2 -google-api-core==2.11.0 -google-auth==2.15.0 -google-cloud-bigquery==2.34.4 -google-cloud-core==2.3.2 -google-cloud-dataproc==5.2.0 -google-cloud-storage==2.7.0 -google-crc32c==1.5.0 -google-resumable-media==2.4.0 -googleapis-common-protos==1.58.0 -greenlet==2.0.1 -griffe==0.25.0 -grpcio==1.51.1 -grpcio-status==1.48.2 +fastapi==0.92.0 +fsspec==2023.1.0 +google-auth==2.16.1 +greenlet==2.0.2 +griffe==0.25.5 h11==0.14.0 h2==4.1.0 -hologram==0.0.15 hpack==4.0.0 -httpcore==0.16.2 -httpx==0.23.1 +httpcore==0.16.3 +httpx==0.23.3 hyperframe==6.0.1 idna==3.4 -importlib-metadata==5.1.0 -isodate==0.6.1 Jinja2==3.1.2 jsonpatch==1.32 jsonpointer==2.3 -jsonschema==3.2.0 -kubernetes==25.3.0 -leather==0.3.4 -Logbook==1.5.3 +jsonschema==4.17.3 +kubernetes==26.1.0 Mako==1.2.4 Markdown==3.4.1 -MarkupSafe==2.1.1 -mashumaro==3.0.4 -minimal-snowplow-tracker==0.0.2 -msgpack==1.0.4 -networkx==2.8.8 +markdown-it-py==2.2.0 +MarkupSafe==2.1.2 +mdurl==0.1.2 oauthlib==3.2.2 -orjson==3.8.3 -packaging==21.3 -parsedatetime==2.4 -pathspec==0.9.0 +orjson==3.8.6 +packaging==23.0 +pathspec==0.11.0 pendulum==2.1.2 -prefect==2.7.2 -prefect-airbyte==0.1.3 -prefect-dbt==0.2.6 -prefect-shell==0.1.3 -proto-plus==1.22.2 -protobuf==3.20.3 -psycopg2-binary==2.9.5 +prefect==2.8.3 +prefect-2-discord==0.1.1 +prefect-airbyte==0.2.0 +prefect-shell==0.1.5 pyasn1==0.4.8 pyasn1-modules==0.2.8 pycparser==2.21 -pydantic==1.10.2 -Pygments==2.13.0 -pyparsing==3.0.9 -pyrsistent==0.19.2 +pydantic==1.10.5 +Pygments==2.14.0 +pyrsistent==0.19.3 python-dateutil==2.8.2 -python-dotenv==0.21.0 -python-slugify==7.0.0 -pytimeparse==1.1.8 -pytz==2022.6 +python-dotenv==1.0.0 +python-slugify==8.0.1 +pytz==2022.7.1 +pytz-deprecation-shim==0.1.0.post0 pytzdata==2020.1 PyYAML==6.0 readchar==4.0.3 -requests==2.28.1 +regex==2022.10.31 +requests==2.28.2 requests-oauthlib==1.3.1 rfc3986==1.5.0 -rich==12.6.0 +rich==13.3.1 rsa==4.9 six==1.16.0 sniffio==1.3.0 -SQLAlchemy==1.4.45 -sqlparse==0.4.3 -starlette==0.22.0 +SQLAlchemy==1.4.46 +starlette==0.25.0 text-unidecode==1.3 toml==0.10.2 typer==0.7.0 -typing_extensions==4.4.0 -urllib3==1.26.13 +typing_extensions==4.5.0 +tzdata==2022.7 +tzlocal==4.2 +urllib3==1.26.14 uvicorn==0.20.0 -websocket-client==1.4.2 -Werkzeug==2.2.2 -zipp==3.11.0 +websocket-client==1.5.1 +websockets==10.4 diff --git a/tasks/Airbyte.py b/tasks/Airbyte.py new file mode 100644 index 0000000..fd470cd --- /dev/null +++ b/tasks/Airbyte.py @@ -0,0 +1,19 @@ +from prefect import task, flow +from pydantic import BaseModel +from prefect_airbyte.connections import trigger_sync + +class Airbyte(BaseModel): + connection_id: str = None + + def __init__(self, connection_id: str) -> None: + super().__init__() + + self.connection_id = connection_id + + def sync(self) -> None: + + trigger_sync.with_options(name="airbyte_sync")( + connection_id=self.connection_id, + poll_interval_s=15, + status_updates=True + ) \ No newline at end of file diff --git a/tasks/Dbt.py b/tasks/Dbt.py new file mode 100644 index 0000000..1580d04 --- /dev/null +++ b/tasks/Dbt.py @@ -0,0 +1,38 @@ +from prefect_shell import shell_run_command +from pathlib import Path +from pydantic import BaseModel + +class Dbt(BaseModel): + dbt_code_path: str = None + dbt_venv_path: str = None + + def __init__(self, dbt_code_path: str, dbt_venv_path: str) -> None: + super().__init__() + + if Path(dbt_code_path).is_dir(): + self.dbt_code_path = dbt_code_path + else: + raise Exception('Dbt organization repo does not exist') + + if Path(dbt_venv_path).is_dir(): + self.dbt_venv_path = dbt_venv_path + else: + raise Exception('Dbt virtual environment is not setup') + + def pull_dbt_repo(self) -> None: + shell_run_command.with_options(name='pull_dbt_repo')(command=f'git pull', cwd=self.dbt_code_path) + + def dbt_deps(self) -> None: + shell_run_command.with_options(name='dbt_deps')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt deps', cwd=self.dbt_code_path) + + def dbt_source_snapshot_freshness(self): + shell_run_command.with_options(name='dbt_source_snapshot_freshness')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt source snapshot-freshness', cwd=self.dbt_code_path) + + def dbt_run(self) -> None: + shell_run_command.with_options(name='dbt_run')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt run', cwd=self.dbt_code_path) + + def dbt_test(self) -> None: + shell_run_command.with_options(name='dbt_test')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt test', cwd=self.dbt_code_path) + + def dbt_docs_generate(self) -> None: + shell_run_command.with_options(name='dbt_test')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt docs generate', cwd=self.dbt_code_path) \ No newline at end of file diff --git a/tasks/__init__.py b/tasks/__init__.py new file mode 100644 index 0000000..e69de29