From 2de3ee0e727b26f6c60665d87b91986cfb3503bd Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 28 Feb 2023 14:38:32 +0530 Subject: [PATCH 01/22] cleanup --- .gitignore | 6 ++++- orchestrate-airbyte-deployment.yaml | 38 ---------------------------- orchestrate-dbt-deployment.yaml | 39 ----------------------------- orchestration-flow-deployment.yaml | 38 ---------------------------- 4 files changed, 5 insertions(+), 116 deletions(-) delete mode 100644 orchestrate-airbyte-deployment.yaml delete mode 100644 orchestrate-dbt-deployment.yaml delete mode 100644 orchestration-flow-deployment.yaml diff --git a/.gitignore b/.gitignore index cbf5305..ad00c7d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,13 @@ logs prefect-env dbt-env -.env +.*env +*venv dbt/ +.*yaml +.*yml + .DS_store # prefect artifacts diff --git a/orchestrate-airbyte-deployment.yaml b/orchestrate-airbyte-deployment.yaml deleted file mode 100644 index 41a9bc2..0000000 --- a/orchestrate-airbyte-deployment.yaml +++ /dev/null @@ -1,38 +0,0 @@ -### -### A complete description of a Prefect Deployment for flow 'orchestrate-airbyte' -### -name: dost-orchestrate-airbyte -description: null -version: b493fbec7c9fb61bdf5a55f04d9cdfd5 -# The work queue that will handle this deployment's runs -work_queue_name: default -tags: -- staging -parameters: {} -schedule: null -infra_overrides: {} -infrastructure: - type: process - env: {} - labels: {} - name: null - command: null - stream_output: true - working_dir: null - block_type_slug: process - _block_type_slug: process - -### -### DO NOT EDIT BELOW THIS LINE -### -flow_name: orchestrate-airbyte -manifest_path: null -storage: null -path: /Users/dorjayyolmo/Dev/data/prefect_practise -entrypoint: main.py:orchestrate-airbyte -parameter_openapi_schema: - title: Parameters - type: object - properties: {} - required: null - definitions: null diff --git a/orchestrate-dbt-deployment.yaml b/orchestrate-dbt-deployment.yaml deleted file mode 100644 index 390faff..0000000 --- a/orchestrate-dbt-deployment.yaml +++ /dev/null @@ -1,39 +0,0 @@ -### -### A complete description of a Prefect Deployment for flow 'orchestrate-dbt' -### -name: dost-orchestrate-dbt -description: null -version: 61247154705433137246cd6f816e8246 -# The work queue that will handle this deployment's runs -work_queue_name: default -tags: - - staging -parameters: {} -schedule: - interval: 3600 -infra_overrides: {} -infrastructure: - type: process - env: {} - labels: {} - name: null - command: null - stream_output: true - working_dir: null - block_type_slug: process - _block_type_slug: process - -### -### DO NOT EDIT BELOW THIS LINE -### -flow_name: orchestrate-dbt -manifest_path: null -storage: null -path: /Users/dorjayyolmo/Dev/data/prefect_practise -entrypoint: main.py:orchestrate-dbt -parameter_openapi_schema: - title: Parameters - type: object - properties: {} - required: null - definitions: null diff --git a/orchestration-flow-deployment.yaml b/orchestration-flow-deployment.yaml deleted file mode 100644 index ec06f37..0000000 --- a/orchestration-flow-deployment.yaml +++ /dev/null @@ -1,38 +0,0 @@ -### -### A complete description of a Prefect Deployment for flow 'orchestration-flow' -### -name: dost-orchestrate-flow -description: null -version: 894a85addfd1a0c2d4ef1729a3066908 -# The work queue that will handle this deployment's runs -work_queue_name: default -tags: -- staging -parameters: {} -schedule: null -infra_overrides: {} -infrastructure: - type: process - env: {} - labels: {} - name: null - command: null - stream_output: true - working_dir: null - block_type_slug: process - _block_type_slug: process - -### -### DO NOT EDIT BELOW THIS LINE -### -flow_name: orchestration-flow -manifest_path: null -storage: null -path: /Users/dorjayyolmo/Dev/data/prefect_practise -entrypoint: main.py:orchestration-flow -parameter_openapi_schema: - title: Parameters - type: object - properties: {} - required: null - definitions: null From b5a5ae33384fc0a8a4f7f29b92e4430164263be9 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Wed, 1 Mar 2023 17:05:32 +0530 Subject: [PATCH 02/22] modularized code to setup deployments for multiple organizations with cli toolkit --- .gitignore | 4 +- configExample.json | 10 +++ deployments/Deployment.py | 0 deployments/__init__.py | 0 flows/Flow.py | 41 ++++++++++++ flows/__init__.py | 0 main.py | 132 +++++++++++++++++++++++++------------- tasks/Airbyte.py | 18 ++++++ tasks/Dbt.py | 38 +++++++++++ tasks/__init__.py | 0 10 files changed, 196 insertions(+), 47 deletions(-) create mode 100644 configExample.json create mode 100644 deployments/Deployment.py create mode 100644 deployments/__init__.py create mode 100644 flows/Flow.py create mode 100644 flows/__init__.py create mode 100644 tasks/Airbyte.py create mode 100644 tasks/Dbt.py create mode 100644 tasks/__init__.py diff --git a/.gitignore b/.gitignore index ad00c7d..ad54096 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ dbt-env *venv dbt/ +*.pyc + .*yaml .*yml @@ -13,4 +15,4 @@ dbt/ # prefect artifacts .prefectignore -*.json \ No newline at end of file +config.json \ No newline at end of file diff --git a/configExample.json b/configExample.json new file mode 100644 index 0000000..dabc7b4 --- /dev/null +++ b/configExample.json @@ -0,0 +1,10 @@ +{ + "stir": { + "dbt_dir": "/Path/to/dbt/code", + "connection_ids": ["list of connection ids"] + }, + "sneha": { + "dbt_dir": "/Path/to/dbt/code", + "connection_ids": ["list of connection ids"] + } +} diff --git a/deployments/Deployment.py b/deployments/Deployment.py new file mode 100644 index 0000000..e69de29 diff --git a/deployments/__init__.py b/deployments/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/Flow.py b/flows/Flow.py new file mode 100644 index 0000000..22065c6 --- /dev/null +++ b/flows/Flow.py @@ -0,0 +1,41 @@ +from prefect import flow +from tasks.Airbyte import Airbyte +from tasks.Dbt import Dbt +from pydantic import BaseModel + +class Flow(BaseModel): + airbyte: Airbyte = None + dbt: Dbt = None + org_name: str = None + class Config: + arbitrary_types_allowed=True + + def __init__(self, airbyte: Airbyte, dbt: Dbt, org_name: str): + super().__init__() + + self.airbyte = airbyte + self.dbt = dbt + self.org_name = org_name + + @flow(name=f'{org_name} airbyte_flow') + def airbyte_flow(self): + self.airbyte.sync() + + @flow(name='dbt_flow') + def dbt_flow(self): + self.dbt.pull_dbt_repo + self.dbt.dbt_deps() + self.dbt.dbt_source_snapshot_freshness() + self.dbt.dbt_run() + self.dbt.dbt_test() + + @flow(name='airbyte_dbt_flow') + def airbyte_dbt_flow(self): + + self.airbyte.sync() + + self.dbt.pull_dbt_repo() + self.dbt.dbt_deps() + self.dbt.dbt_source_snapshot_freshness() + self.dbt.dbt_run() + self.dbt.dbt_test() \ No newline at end of file diff --git a/flows/__init__.py b/flows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py index c664aa7..31f3a9c 100644 --- a/main.py +++ b/main.py @@ -1,62 +1,102 @@ -from prefect import flow, task -from prefect_airbyte.connections import trigger_sync -from prefect_dbt.cli.commands import trigger_dbt_cli_command -from prefect_shell import shell_run_command +import sys, getopt, argparse, json, os +from pydantic import BaseModel from dotenv import load_dotenv -import os +from typing import List +from pathlib import Path +from prefect.deployments import Deployment -load_dotenv() +from tasks.Dbt import Dbt +from tasks.Airbyte import Airbyte +from flows.Flow import Flow -@flow(name="airbyte-sync") -def run_airbyte_sync(): - trigger_sync( - connection_id=os.getenv('DOST_AIRBYTE_CONNECTION'), - poll_interval_s=3, - status_updates=True - ) - return 1 +# Load env +load_dotenv() -@flow(name="github-pull") -def pull_dost_github_repo(): - shell_run_command('rm -rf dbt && git clone '+ os.getenv('DOST_GITHUB_URL')) - return 1 +# Load config -@flow(name="dbt-transform") -def run_dbt_transform(): - trigger_dbt_cli_command( - command="dbt deps", project_dir='dbt' - ) - trigger_dbt_cli_command( - command="dbt run", project_dir='dbt' - ) - return 1 +class Organization(BaseModel): + name: str = None + connection_ids: List[str] = [] + dbt_dir: str = None -@flow(name="orchestration-flow") -def run_flow(): + def __init__(self, name: str, connection_ids: List[str], dbt_dir: str): + super().__init__() - #syncing airbyte - run_airbyte_sync() + self.name = name + self.connection_ids = connection_ids + self.dbt_dir = dbt_dir + + def deploy(self): + airbyte_objs = [] + for connection_id in self.connection_ids: + airbyte = Airbyte(connection_id=connection_id) + airbyte_objs.append(airbyte) - # pull dbt repo - pull_dost_github_repo() + dbt_obj = Dbt(self.dbt_dir, os.getenv('DBT_VENV')) - #dbt transform - run_dbt_transform() -@flow(name="orchestrate-airbyte") -def run_airbyte_flow(): + for airbyte_obj in airbyte_objs: - #syncing airbyte - run_airbyte_sync() + flow = Flow(airbyte=airbyte_obj, dbt=dbt_obj, org_name=self.name) -@flow(name="orchestrate-dbt") -def run_dbt_flow(): + # Deploy a dbt flow + Deployment.build_from_flow( + flow=flow.dbt_flow.with_options(name=f'{self.name}_dbt_flow'), + name=f"{self.name} - dbt", + work_queue_name="ddp", + tags = [self.name], + apply=True + ) - # pull dbt repo - pull_dost_github_repo() + # Deploy a airbyte flow + Deployment.build_from_flow( + flow=flow.airbyte_flow.with_options(name=f'{self.name}_airbyte_flow'), + name=f"{self.name} - airbyte", + work_queue_name="ddp", + tags = [airbyte.connection_id, self.name], + apply=True, + ) - #dbt transform - run_dbt_transform() + # Deploy a airbyte + dbt flow + Deployment.build_from_flow( + flow=flow.airbyte_dbt_flow.with_options(name=f'{self.name}_airbyte_dbt_flow'), + name=f"{self.name} - airbyte + dbt", + work_queue_name="ddp", + tags = [airbyte.connection_id, self.name], + apply=True + ) if __name__ == "__main__": - run_dbt_flow() \ No newline at end of file + + try: + config = None + if not Path('config.json').is_file(): + raise Exception('Config file not found') + + with open('config.json') as f: + config = json.load(f) + + parser = argparse.ArgumentParser(description='Prefect deployment of NGOs') + parser.add_argument( + '--deploy', + required=True, + choices=['stir', 'sneha'], + help='please enter the name of the NGO', + metavar='' + ) + args = parser.parse_args() + + if args.deploy not in config: + raise Exception(f'Config for {args.deploy} org not found') + + org_name = args.deploy + connection_ids = config[org_name]['connection_ids'] + dbt_dir = config[org_name]['dbt_dir'] + + organization = Organization(org_name, connection_ids, dbt_dir) + + organization.deploy() + + except Exception as e: + + print(e) \ No newline at end of file diff --git a/tasks/Airbyte.py b/tasks/Airbyte.py new file mode 100644 index 0000000..f68279f --- /dev/null +++ b/tasks/Airbyte.py @@ -0,0 +1,18 @@ +from pydantic import BaseModel +from prefect_airbyte.connections import trigger_sync + +class Airbyte(BaseModel): + connection_id: str = None + + def __init__(self, connection_id: str) -> None: + super().__init__() + + self.connection_id = connection_id + + def sync(self) -> None: + + trigger_sync( + connection_id=self.connection_id, + poll_interval_s=3, + status_updates=True + ) \ No newline at end of file diff --git a/tasks/Dbt.py b/tasks/Dbt.py new file mode 100644 index 0000000..9824442 --- /dev/null +++ b/tasks/Dbt.py @@ -0,0 +1,38 @@ +from prefect_shell import shell_run_command +from pathlib import Path +from pydantic import BaseModel + +class Dbt(BaseModel): + dbt_code_path: str = None + dbt_venv_path: str = None + + def __init__(self, dbt_code_path: str, dbt_venv_path: str) -> None: + super().__init__() + + if Path(dbt_code_path).is_dir(): + self.dbt_code_path = dbt_code_path + else: + raise Exception('Dbt organization repo does not exist') + + if Path(dbt_venv_path).is_dir(): + self.dbt_venv_path = dbt_venv_path + else: + raise Exception('Dbt virtual environment is not setup') + + def pull_dbt_repo(self) -> None: + shell_run_command(command=f'git pull', cwd=self.dbt_code_path) + + def dbt_deps(self) -> None: + shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt deps', cwd=self.dbt_code_path) + + def dbt_source_snapshot_freshness(self): + shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt source snapshot-freshness', cwd=self.dbt_code_path) + + def dbt_run(self) -> None: + shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt run', cwd=self.dbt_code_path) + + def dbt_test(self) -> None: + shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt test', cwd=self.dbt_code_path) + + def dbt_docs_generate(self) -> None: + shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt docs generate', cwd=self.dbt_code_path) \ No newline at end of file diff --git a/tasks/__init__.py b/tasks/__init__.py new file mode 100644 index 0000000..e69de29 From a361e06ea6edb3e78b27d4e6f3d3c547835da8e1 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Thu, 2 Mar 2023 19:15:33 +0530 Subject: [PATCH 03/22] resetting organizartion deployments WIP --- main.py | 90 +++++++++++------------------------- organization/Organization.py | 89 +++++++++++++++++++++++++++++++++++ organization/__init__.py | 0 3 files changed, 116 insertions(+), 63 deletions(-) create mode 100644 organization/Organization.py create mode 100644 organization/__init__.py diff --git a/main.py b/main.py index 31f3a9c..3e905a5 100644 --- a/main.py +++ b/main.py @@ -1,72 +1,19 @@ import sys, getopt, argparse, json, os -from pydantic import BaseModel from dotenv import load_dotenv -from typing import List from pathlib import Path -from prefect.deployments import Deployment +import asyncio +from sqlalchemy import create_engine +from prefect.settings import PREFECT_API_DATABASE_CONNECTION_URL +from sqlalchemy.ext.asyncio import create_async_engine, async_session, AsyncSession +from sqlalchemy.orm import sessionmaker -from tasks.Dbt import Dbt -from tasks.Airbyte import Airbyte -from flows.Flow import Flow +from organization.Organization import Organization # Load env load_dotenv() # Load config - -class Organization(BaseModel): - name: str = None - connection_ids: List[str] = [] - dbt_dir: str = None - - def __init__(self, name: str, connection_ids: List[str], dbt_dir: str): - super().__init__() - - self.name = name - self.connection_ids = connection_ids - self.dbt_dir = dbt_dir - - def deploy(self): - airbyte_objs = [] - for connection_id in self.connection_ids: - airbyte = Airbyte(connection_id=connection_id) - airbyte_objs.append(airbyte) - - dbt_obj = Dbt(self.dbt_dir, os.getenv('DBT_VENV')) - - - for airbyte_obj in airbyte_objs: - - flow = Flow(airbyte=airbyte_obj, dbt=dbt_obj, org_name=self.name) - - # Deploy a dbt flow - Deployment.build_from_flow( - flow=flow.dbt_flow.with_options(name=f'{self.name}_dbt_flow'), - name=f"{self.name} - dbt", - work_queue_name="ddp", - tags = [self.name], - apply=True - ) - - # Deploy a airbyte flow - Deployment.build_from_flow( - flow=flow.airbyte_flow.with_options(name=f'{self.name}_airbyte_flow'), - name=f"{self.name} - airbyte", - work_queue_name="ddp", - tags = [airbyte.connection_id, self.name], - apply=True, - ) - - # Deploy a airbyte + dbt flow - Deployment.build_from_flow( - flow=flow.airbyte_dbt_flow.with_options(name=f'{self.name}_airbyte_dbt_flow'), - name=f"{self.name} - airbyte + dbt", - work_queue_name="ddp", - tags = [airbyte.connection_id, self.name], - apply=True - ) - -if __name__ == "__main__": +async def main(): try: config = None @@ -92,11 +39,28 @@ def deploy(self): org_name = args.deploy connection_ids = config[org_name]['connection_ids'] dbt_dir = config[org_name]['dbt_dir'] + + # create a datbase session + url = PREFECT_API_DATABASE_CONNECTION_URL.value() + engine = create_async_engine( + url, + echo=True, + ) + session = sessionmaker(engine, class_=AsyncSession) - organization = Organization(org_name, connection_ids, dbt_dir) + organization = Organization(org_name, connection_ids, dbt_dir, session()) - organization.deploy() + await organization.reset_deployments() + + await organization.close_session() except Exception as e: - print(e) \ No newline at end of file + print(e) + +if __name__ == "__main__": + + asyncio.run(main()) + + + \ No newline at end of file diff --git a/organization/Organization.py b/organization/Organization.py new file mode 100644 index 0000000..1336b55 --- /dev/null +++ b/organization/Organization.py @@ -0,0 +1,89 @@ +import os +from typing import List +from pydantic import BaseModel +from prefect.deployments import Deployment +from prefect.server.models.deployments import read_deployments, delete_deployment +from prefect.server.schemas.filters import DeploymentFilter, DeploymentFilterTags +import asyncio +from sqlalchemy.orm import sessionmaker, Session + +from tasks.Dbt import Dbt +from tasks.Airbyte import Airbyte +from flows.Flow import Flow + +class Organization(BaseModel): + name: str = None + connection_ids: List[str] = [] + dbt_dir: str = None + session: Session = None + + class Config: + arbitrary_types_allowed=True + + def __init__(self, name: str, connection_ids: List[str], dbt_dir: str, session: Session): + super().__init__() + + self.name = name + self.connection_ids = connection_ids + self.dbt_dir = dbt_dir + + self.session = session + + def deploy(self): + airbyte_objs = [] + for connection_id in self.connection_ids: + airbyte = Airbyte(connection_id=connection_id) + airbyte_objs.append(airbyte) + + dbt_obj = Dbt(self.dbt_dir, os.getenv('DBT_VENV')) + + for airbyte_obj in airbyte_objs: + + flow = Flow(airbyte=airbyte_obj, dbt=dbt_obj, org_name=self.name) + + # Deploy a dbt flow + Deployment.build_from_flow( + flow=flow.dbt_flow.with_options(name=f'{self.name}_dbt_flow'), + name=f"{self.name} - dbt", + work_queue_name="ddp", + tags = [self.name], + apply=True + ) + + # Deploy a airbyte flow + Deployment.build_from_flow( + flow=flow.airbyte_flow.with_options(name=f'{self.name}_airbyte_flow'), + name=f"{self.name} - airbyte", + work_queue_name="ddp", + tags = [airbyte.connection_id, self.name], + apply=True, + ) + + # Deploy a airbyte + dbt flow + Deployment.build_from_flow( + flow=flow.airbyte_dbt_flow.with_options(name=f'{self.name}_airbyte_dbt_flow'), + name=f"{self.name} - airbyte + dbt", + work_queue_name="ddp", + tags = [airbyte.connection_id, self.name], + apply=True + ) + + async def reset_deployments(self): + + deploymentFilter = DeploymentFilterTags(all_=['stir']) + + deps = await read_deployments(self.session, deployment_filter=deploymentFilter) + + print(deps) + + for dep in deps: + a = await delete_deployment(self.session, dep.id) + print(a) + + return + + async def close_session(self): + + await self.session.close() + + return diff --git a/organization/__init__.py b/organization/__init__.py new file mode 100644 index 0000000..e69de29 From 448b7686c66df63226a09c69322bc5129c607483 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Fri, 3 Mar 2023 11:44:45 +0530 Subject: [PATCH 04/22] scheduling cron in deployments from config --- main.py | 25 +++++-- organization/Organization.py | 127 +++++++++++++++++++++-------------- 2 files changed, 98 insertions(+), 54 deletions(-) diff --git a/main.py b/main.py index 3e905a5..11829ff 100644 --- a/main.py +++ b/main.py @@ -3,7 +3,7 @@ from pathlib import Path import asyncio from sqlalchemy import create_engine -from prefect.settings import PREFECT_API_DATABASE_CONNECTION_URL +from prefect.settings import PREFECT_API_DATABASE_CONNECTION_URL, PREFECT_API_URL from sqlalchemy.ext.asyncio import create_async_engine, async_session, AsyncSession from sqlalchemy.orm import sessionmaker @@ -31,14 +31,27 @@ async def main(): help='please enter the name of the NGO', metavar='' ) + parser.add_argument( + '--reset', + required=False, + choices=['yes', 'no'], + default='no', + help='resetting the deployments will remove all deployments and create fresh ones', + metavar='' + ) args = parser.parse_args() if args.deploy not in config: raise Exception(f'Config for {args.deploy} org not found') + # cli args org_name = args.deploy + reset = args.reset + + # config params connection_ids = config[org_name]['connection_ids'] dbt_dir = config[org_name]['dbt_dir'] + schedule = config[org_name]['schedule'] # create a datbase session url = PREFECT_API_DATABASE_CONNECTION_URL.value() @@ -47,10 +60,14 @@ async def main(): echo=True, ) session = sessionmaker(engine, class_=AsyncSession) - - organization = Organization(org_name, connection_ids, dbt_dir, session()) - await organization.reset_deployments() + organization = Organization(org_name, connection_ids, dbt_dir, session(), schedule) + + match reset: + case 'yes': + await organization.reset_deployments() + case 'no': + await organization.deploy() await organization.close_session() diff --git a/organization/Organization.py b/organization/Organization.py index 1336b55..fb2801f 100644 --- a/organization/Organization.py +++ b/organization/Organization.py @@ -1,11 +1,14 @@ import os +import requests from typing import List from pydantic import BaseModel from prefect.deployments import Deployment from prefect.server.models.deployments import read_deployments, delete_deployment -from prefect.server.schemas.filters import DeploymentFilter, DeploymentFilterTags -import asyncio -from sqlalchemy.orm import sessionmaker, Session +from prefect.server.models.flows import read_flows, delete_flow +from prefect.server.schemas.filters import DeploymentFilterTags +from prefect.server.schemas.schedules import CronSchedule +from prefect.settings import PREFECT_UI_API_URL +from sqlalchemy.orm import Session from tasks.Dbt import Dbt from tasks.Airbyte import Airbyte @@ -16,72 +19,96 @@ class Organization(BaseModel): connection_ids: List[str] = [] dbt_dir: str = None session: Session = None + schedule: str = None class Config: arbitrary_types_allowed=True - def __init__(self, name: str, connection_ids: List[str], dbt_dir: str, session: Session): + def __init__(self, name: str, connection_ids: List[str], dbt_dir: str, session: Session, schedule: str): super().__init__() self.name = name self.connection_ids = connection_ids self.dbt_dir = dbt_dir + self.schedule = schedule self.session = session - def deploy(self): - airbyte_objs = [] - for connection_id in self.connection_ids: - airbyte = Airbyte(connection_id=connection_id) - airbyte_objs.append(airbyte) - - dbt_obj = Dbt(self.dbt_dir, os.getenv('DBT_VENV')) - - for airbyte_obj in airbyte_objs: - - flow = Flow(airbyte=airbyte_obj, dbt=dbt_obj, org_name=self.name) - - # Deploy a dbt flow - Deployment.build_from_flow( - flow=flow.dbt_flow.with_options(name=f'{self.name}_dbt_flow'), - name=f"{self.name} - dbt", - work_queue_name="ddp", - tags = [self.name], - apply=True - ) - - # Deploy a airbyte flow - Deployment.build_from_flow( - flow=flow.airbyte_flow.with_options(name=f'{self.name}_airbyte_flow'), - name=f"{self.name} - airbyte", - work_queue_name="ddp", - tags = [airbyte.connection_id, self.name], - apply=True, - ) - - # Deploy a airbyte + dbt flow - Deployment.build_from_flow( - flow=flow.airbyte_dbt_flow.with_options(name=f'{self.name}_airbyte_dbt_flow'), - name=f"{self.name} - airbyte + dbt", - work_queue_name="ddp", - tags = [airbyte.connection_id, self.name], - apply=True - ) + async def deploy(self): + try: + airbyte_objs: List[Airbyte] = [] + for connection_id in self.connection_ids: + airbyte = Airbyte(connection_id=connection_id) + airbyte_objs.append(airbyte) + + dbt_obj = Dbt(self.dbt_dir, os.getenv('DBT_VENV')) + + for airbyte_obj in airbyte_objs: + + flow = Flow(airbyte=airbyte_obj, dbt=dbt_obj, org_name=self.name) + + # Deploy a dbt flow + deployment = await Deployment.build_from_flow( + flow=flow.dbt_flow.with_options(name=f'{self.name}_dbt_flow'), + name=f"{self.name} - dbt", + work_queue_name="ddp", + tags = [self.name], + ) + deployment.schedule = CronSchedule(cron = self.schedule) + await deployment.apply() + + # Deploy a airbyte flow + deployment = await Deployment.build_from_flow( + flow=flow.airbyte_flow.with_options(name=f'{self.name}_airbyte_flow'), + name=f"{self.name} - airbyte", + work_queue_name="ddp", + tags = [airbyte_obj.connection_id, self.name], + ) + deployment.schedule = CronSchedule(cron = self.schedule) + await deployment.apply() + + # Deploy a airbyte + dbt flow + await Deployment.build_from_flow( + flow=flow.airbyte_dbt_flow.with_options(name=f'{self.name}_airbyte_dbt_flow'), + name=f"{self.name} - airbyte + dbt", + work_queue_name="ddp", + tags = [airbyte_obj.connection_id, self.name], + ) + deployment.schedule = CronSchedule(cron = self.schedule) + await deployment.apply() + + except Exception as e: + + print(e) + + await self.close_session() async def reset_deployments(self): - deploymentFilter = DeploymentFilterTags(all_=['stir']) + try: - deps = await read_deployments(self.session, deployment_filter=deploymentFilter) + deploymentFilter = DeploymentFilterTags(all_=[self.name]) - print(deps) + flows = await read_flows(self.session, deployment_filter=deploymentFilter) - for dep in deps: - a = await delete_deployment(self.session, dep.id) - print(a) + api_url = PREFECT_UI_API_URL.value() + + # delete a flow removes the deployments too + for flow in flows: + requests.delete(api_url + f'/flows/{flow.id}') + # the code below doesn't seem to work + # await delete_flow(self.session, flow.id) + + await self.deploy() + + return + + except Exception as e: + + print(e) + + await self.close_session() - return - async def close_session(self): await self.session.close() From f37699c6c7eb3ca7d6ce8c3ee7617eb5924fd727 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Fri, 3 Mar 2023 11:54:20 +0530 Subject: [PATCH 05/22] updated cron expression --- configExample.json | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/configExample.json b/configExample.json index dabc7b4..cf8c607 100644 --- a/configExample.json +++ b/configExample.json @@ -1,10 +1,12 @@ { "stir": { "dbt_dir": "/Path/to/dbt/code", - "connection_ids": ["list of connection ids"] + "connection_ids": ["list of connection ids"], + "schedule": "0 */2 * * *" }, "sneha": { "dbt_dir": "/Path/to/dbt/code", - "connection_ids": ["list of connection ids"] + "connection_ids": ["list of connection ids"], + "schedule": "crons expression" } } From 995deb7b569f9250e138cb361605c83e2f738b6b Mon Sep 17 00:00:00 2001 From: Ishankoradia <39583356+Ishankoradia@users.noreply.github.com> Date: Fri, 3 Mar 2023 12:00:04 +0530 Subject: [PATCH 06/22] Update README.md --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 5694f07..0ca7cff 100644 --- a/README.md +++ b/README.md @@ -1 +1,15 @@ # ddp_prefect_starter + +- Setup a virtual environment using ``` python3 -m venv ``` +- Activate the virtual env ``` source /bin/activate ``` +- Install the dependencies from requirements.txt ``` pip3 install -r requirements.txt ``` +- If you are using any new packages, install them in your virtual env and use this to update the requirements.txt ``` pip3 freeze > requirements.txt ``` +- Start the prefect orion UI server ``` prefect orion start ``` +- To execute your flow you need to run an agent in different terminal ``` prefect agent start -q default ```. This will run the default queue worker. + +# prefect cli tools + +- Run the help command to understand the options ``` python3 main.py -h ``` +- Setup the organization config in ```config.json``` . The boilerplate for this is available in ``` configExample.json ``` +- Run the main file to deploy your organization ``` python3 main.py --deploy ``` based on the config entered in ```config.json``` +- To reset the deployments after updates in config run ``` python3 main.py --deploy --reset yes ``` From 62a34c00cdab2ce7d6fda75eb2f965848e85f65c Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Fri, 3 Mar 2023 12:03:57 +0530 Subject: [PATCH 07/22] dependencies update --- requirements.txt | 123 ++++++++++++++++++----------------------------- 1 file changed, 47 insertions(+), 76 deletions(-) diff --git a/requirements.txt b/requirements.txt index a00a20e..ed33587 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,112 +1,83 @@ -agate==1.6.3 -aiosqlite==0.17.0 -alembic==1.9.0 +aiosqlite==0.18.0 +alembic==1.9.4 anyio==3.6.2 -apprise==1.2.0 +apprise==1.3.0 asgi-lifespan==2.0.0 asyncpg==0.27.0 -attrs==22.1.0 -Babel==2.11.0 -cachetools==5.2.0 +attrs==22.2.0 +cachetools==5.3.0 certifi==2022.12.7 cffi==1.15.1 -charset-normalizer==2.1.1 +charset-normalizer==3.0.1 click==8.1.3 -cloudpickle==2.2.0 -colorama==0.4.5 -commonmark==0.9.1 -coolname==2.1.0 +cloudpickle==2.2.1 +colorama==0.4.6 +coolname==2.2.0 croniter==1.3.8 -cryptography==38.0.4 -dbt-bigquery==1.3.0 -dbt-core==1.3.1 -dbt-extractor==0.4.1 -dbt-postgres==1.3.1 +cryptography==39.0.1 +dateparser==1.1.7 docker==6.0.1 -fastapi==0.88.0 -fsspec==2022.11.0 -future==0.18.2 -google-api-core==2.11.0 -google-auth==2.15.0 -google-cloud-bigquery==2.34.4 -google-cloud-core==2.3.2 -google-cloud-dataproc==5.2.0 -google-cloud-storage==2.7.0 -google-crc32c==1.5.0 -google-resumable-media==2.4.0 -googleapis-common-protos==1.58.0 -greenlet==2.0.1 -griffe==0.25.0 -grpcio==1.51.1 -grpcio-status==1.48.2 +fastapi==0.92.0 +fsspec==2023.1.0 +google-auth==2.16.1 +greenlet==2.0.2 +griffe==0.25.5 h11==0.14.0 h2==4.1.0 -hologram==0.0.15 hpack==4.0.0 -httpcore==0.16.2 -httpx==0.23.1 +httpcore==0.16.3 +httpx==0.23.3 hyperframe==6.0.1 idna==3.4 -importlib-metadata==5.1.0 -isodate==0.6.1 Jinja2==3.1.2 jsonpatch==1.32 jsonpointer==2.3 -jsonschema==3.2.0 -kubernetes==25.3.0 -leather==0.3.4 -Logbook==1.5.3 +jsonschema==4.17.3 +kubernetes==26.1.0 Mako==1.2.4 Markdown==3.4.1 -MarkupSafe==2.1.1 -mashumaro==3.0.4 -minimal-snowplow-tracker==0.0.2 -msgpack==1.0.4 -networkx==2.8.8 +markdown-it-py==2.2.0 +MarkupSafe==2.1.2 +mdurl==0.1.2 oauthlib==3.2.2 -orjson==3.8.3 -packaging==21.3 -parsedatetime==2.4 -pathspec==0.9.0 +orjson==3.8.6 +packaging==23.0 +pathspec==0.11.0 pendulum==2.1.2 -prefect==2.7.2 -prefect-airbyte==0.1.3 -prefect-dbt==0.2.6 -prefect-shell==0.1.3 -proto-plus==1.22.2 -protobuf==3.20.3 -psycopg2-binary==2.9.5 +prefect==2.8.3 +prefect-airbyte==0.2.0 +prefect-shell==0.1.5 pyasn1==0.4.8 pyasn1-modules==0.2.8 pycparser==2.21 -pydantic==1.10.2 -Pygments==2.13.0 -pyparsing==3.0.9 -pyrsistent==0.19.2 +pydantic==1.10.5 +Pygments==2.14.0 +pyrsistent==0.19.3 python-dateutil==2.8.2 -python-dotenv==0.21.0 -python-slugify==7.0.0 -pytimeparse==1.1.8 -pytz==2022.6 +python-dotenv==1.0.0 +python-slugify==8.0.1 +pytz==2022.7.1 +pytz-deprecation-shim==0.1.0.post0 pytzdata==2020.1 PyYAML==6.0 readchar==4.0.3 -requests==2.28.1 +regex==2022.10.31 +requests==2.28.2 requests-oauthlib==1.3.1 rfc3986==1.5.0 -rich==12.6.0 +rich==13.3.1 rsa==4.9 six==1.16.0 sniffio==1.3.0 -SQLAlchemy==1.4.45 -sqlparse==0.4.3 -starlette==0.22.0 +SQLAlchemy==1.4.46 +starlette==0.25.0 text-unidecode==1.3 toml==0.10.2 typer==0.7.0 -typing_extensions==4.4.0 -urllib3==1.26.13 +typing_extensions==4.5.0 +tzdata==2022.7 +tzlocal==4.2 +urllib3==1.26.14 uvicorn==0.20.0 -websocket-client==1.4.2 -Werkzeug==2.2.2 -zipp==3.11.0 +websocket-client==1.5.1 +websockets==10.4 From a5c80741d60742395ae86ed32f07886231bbf394 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Fri, 3 Mar 2023 12:33:22 +0530 Subject: [PATCH 08/22] minor changes in flow logic --- organization/Organization.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/organization/Organization.py b/organization/Organization.py index fb2801f..4c8b058 100644 --- a/organization/Organization.py +++ b/organization/Organization.py @@ -43,19 +43,22 @@ async def deploy(self): dbt_obj = Dbt(self.dbt_dir, os.getenv('DBT_VENV')) + flow = Flow(airbyte=None, dbt=dbt_obj, org_name=self.name) + + # Deploy a dbt flow + deployment = await Deployment.build_from_flow( + flow=flow.dbt_flow.with_options(name=f'{self.name}_dbt_flow'), + name=f"{self.name} - dbt", + work_queue_name="ddp", + tags = [self.name], + ) + if self.schedule: + deployment.schedule = CronSchedule(cron = self.schedule) + await deployment.apply() + for airbyte_obj in airbyte_objs: - flow = Flow(airbyte=airbyte_obj, dbt=dbt_obj, org_name=self.name) - - # Deploy a dbt flow - deployment = await Deployment.build_from_flow( - flow=flow.dbt_flow.with_options(name=f'{self.name}_dbt_flow'), - name=f"{self.name} - dbt", - work_queue_name="ddp", - tags = [self.name], - ) - deployment.schedule = CronSchedule(cron = self.schedule) - await deployment.apply() + flow.airbyte = airbyte_obj # Deploy a airbyte flow deployment = await Deployment.build_from_flow( @@ -64,17 +67,19 @@ async def deploy(self): work_queue_name="ddp", tags = [airbyte_obj.connection_id, self.name], ) - deployment.schedule = CronSchedule(cron = self.schedule) + if self.schedule: + deployment.schedule = CronSchedule(cron = self.schedule) await deployment.apply() # Deploy a airbyte + dbt flow - await Deployment.build_from_flow( + deployment = await Deployment.build_from_flow( flow=flow.airbyte_dbt_flow.with_options(name=f'{self.name}_airbyte_dbt_flow'), name=f"{self.name} - airbyte + dbt", work_queue_name="ddp", tags = [airbyte_obj.connection_id, self.name], ) - deployment.schedule = CronSchedule(cron = self.schedule) + if self.schedule: + deployment.schedule = CronSchedule(cron = self.schedule) await deployment.apply() except Exception as e: From c381f33dd98c67c812f6acbf9af24a12d12d5557 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Fri, 3 Mar 2023 14:49:26 +0530 Subject: [PATCH 09/22] take the prefect worker queue from env --- organization/Organization.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/organization/Organization.py b/organization/Organization.py index 4c8b058..48bc833 100644 --- a/organization/Organization.py +++ b/organization/Organization.py @@ -49,7 +49,7 @@ async def deploy(self): deployment = await Deployment.build_from_flow( flow=flow.dbt_flow.with_options(name=f'{self.name}_dbt_flow'), name=f"{self.name} - dbt", - work_queue_name="ddp", + work_queue_name=os.getenv('PREFECT_WORK_QUEUE'), tags = [self.name], ) if self.schedule: @@ -64,7 +64,7 @@ async def deploy(self): deployment = await Deployment.build_from_flow( flow=flow.airbyte_flow.with_options(name=f'{self.name}_airbyte_flow'), name=f"{self.name} - airbyte", - work_queue_name="ddp", + work_queue_name=os.getenv('PREFECT_WORK_QUEUE'), tags = [airbyte_obj.connection_id, self.name], ) if self.schedule: @@ -75,7 +75,7 @@ async def deploy(self): deployment = await Deployment.build_from_flow( flow=flow.airbyte_dbt_flow.with_options(name=f'{self.name}_airbyte_dbt_flow'), name=f"{self.name} - airbyte + dbt", - work_queue_name="ddp", + work_queue_name=os.getenv('PREFECT_WORK_QUEUE'), tags = [airbyte_obj.connection_id, self.name], ) if self.schedule: From 671c12aac4de95d3568a36f1dd8c3b846cd07668 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Fri, 3 Mar 2023 14:50:21 +0530 Subject: [PATCH 10/22] added an env exmaple for config --- .env.example | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .env.example diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..02f2ff1 --- /dev/null +++ b/.env.example @@ -0,0 +1,3 @@ +# CONFIG +DBT_VENV=/path/to/dbt/virtual/env +PREFECT_WORK_QUEUE=ddp \ No newline at end of file From 3fe3ff1640936165dcc896b60671f0457b9722a5 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Sun, 5 Mar 2023 16:37:09 +0530 Subject: [PATCH 11/22] Work in progress config validation --- flows/Flow.py | 15 ++-- main.py | 14 +--- organization/Organization.py | 142 ++++++++++++++++------------------- 3 files changed, 76 insertions(+), 95 deletions(-) diff --git a/flows/Flow.py b/flows/Flow.py index 22065c6..5dbceac 100644 --- a/flows/Flow.py +++ b/flows/Flow.py @@ -1,29 +1,31 @@ from prefect import flow +from typing import List from tasks.Airbyte import Airbyte from tasks.Dbt import Dbt from pydantic import BaseModel class Flow(BaseModel): - airbyte: Airbyte = None + airbytes: List[Airbyte] = None dbt: Dbt = None org_name: str = None class Config: arbitrary_types_allowed=True - def __init__(self, airbyte: Airbyte, dbt: Dbt, org_name: str): + def __init__(self, airbyte_arr: List[Airbyte], dbt: Dbt, org_name: str): super().__init__() - self.airbyte = airbyte + self.airbytes = airbyte_arr self.dbt = dbt self.org_name = org_name @flow(name=f'{org_name} airbyte_flow') def airbyte_flow(self): - self.airbyte.sync() + for airbyte in self.airbytes: + airbyte.sync() @flow(name='dbt_flow') def dbt_flow(self): - self.dbt.pull_dbt_repo + self.dbt.pull_dbt_repo() self.dbt.dbt_deps() self.dbt.dbt_source_snapshot_freshness() self.dbt.dbt_run() @@ -32,7 +34,8 @@ def dbt_flow(self): @flow(name='airbyte_dbt_flow') def airbyte_dbt_flow(self): - self.airbyte.sync() + for airbyte in self.airbytes: + airbyte.sync() self.dbt.pull_dbt_repo() self.dbt.dbt_deps() diff --git a/main.py b/main.py index 11829ff..20a774f 100644 --- a/main.py +++ b/main.py @@ -48,11 +48,6 @@ async def main(): org_name = args.deploy reset = args.reset - # config params - connection_ids = config[org_name]['connection_ids'] - dbt_dir = config[org_name]['dbt_dir'] - schedule = config[org_name]['schedule'] - # create a datbase session url = PREFECT_API_DATABASE_CONNECTION_URL.value() engine = create_async_engine( @@ -61,14 +56,9 @@ async def main(): ) session = sessionmaker(engine, class_=AsyncSession) - organization = Organization(org_name, connection_ids, dbt_dir, session(), schedule) - - match reset: - case 'yes': - await organization.reset_deployments() - case 'no': - await organization.deploy() + organization = Organization(name=org_name, session=session(), pipelines=config[org_name]) + organization.deploy(reset == 'yes') await organization.close_session() except Exception as e: diff --git a/organization/Organization.py b/organization/Organization.py index 48bc833..5152b3c 100644 --- a/organization/Organization.py +++ b/organization/Organization.py @@ -1,6 +1,6 @@ import os import requests -from typing import List +from typing import List, Mapping, Union from pydantic import BaseModel from prefect.deployments import Deployment from prefect.server.models.deployments import read_deployments, delete_deployment @@ -16,98 +16,86 @@ class Organization(BaseModel): name: str = None - connection_ids: List[str] = [] - dbt_dir: str = None + pipelines: List[Mapping[str, Union[str, List[str]]]] session: Session = None - schedule: str = None class Config: arbitrary_types_allowed=True - def __init__(self, name: str, connection_ids: List[str], dbt_dir: str, session: Session, schedule: str): + def __init__(self, name: str, session: Session, pipelines: List[Mapping[str, Union[str, List[str]]]]): super().__init__() self.name = name - self.connection_ids = connection_ids - self.dbt_dir = dbt_dir - self.schedule = schedule - + self.pipelines = pipelines self.session = session - async def deploy(self): + async def deploy(self, reset = False): try: - airbyte_objs: List[Airbyte] = [] - for connection_id in self.connection_ids: - airbyte = Airbyte(connection_id=connection_id) - airbyte_objs.append(airbyte) - - dbt_obj = Dbt(self.dbt_dir, os.getenv('DBT_VENV')) - - flow = Flow(airbyte=None, dbt=dbt_obj, org_name=self.name) - - # Deploy a dbt flow - deployment = await Deployment.build_from_flow( - flow=flow.dbt_flow.with_options(name=f'{self.name}_dbt_flow'), - name=f"{self.name} - dbt", - work_queue_name=os.getenv('PREFECT_WORK_QUEUE'), - tags = [self.name], - ) - if self.schedule: - deployment.schedule = CronSchedule(cron = self.schedule) - await deployment.apply() + + if reset: + + deploymentFilter = DeploymentFilterTags(all_=[self.name]) + flows = await read_flows(self.session, deployment_filter=deploymentFilter) + api_url = PREFECT_UI_API_URL.value() + + # delete a flow removes the deployments too + for flow in flows: + requests.delete(api_url + f'/flows/{flow.id}') + # the code below doesn't seem to work + # await delete_flow(self.session, flow.id) + + for pipeline in self.pipelines: + + # Each pipeline will deployed and run as a flow as per the schedule + + airbyte_objs = [] + dbt_obj = None + flow_name = '' + deployment_name = '' + flow_function = None + tags = [self.name] + work_queue_name = os.getenv('PREFECT_WORK_QUEUE'), + + has_airbyte = False + has_dbt = False + + # Check if airbyte connections are part of pipeline + if 'connection_ids' in pipeline and len(pipeline['connection_ids']) > 0: + has_airbyte = True + for connection_id in pipeline['connection_ids']: + airbyte = Airbyte(connection_id=connection_id) + airbyte_objs.append(airbyte) + + # Check if dbt transformation is part of repo + if 'dbt_dir' in pipeline: + has_dbt = True + dbt_obj = Dbt(pipeline['dbt_dir'], os.getenv('DBT_VENV')) + + flow = Flow(airbyte_arr=airbyte_objs, dbt=dbt_obj, org_name=self.name) + + if has_airbyte and has_dbt: + flow_name = f'{self.name}_airbyte_dbt_flow' + deployment_name = f'{self.name}_airbyte_dbt_deploy' + flow_function = getattr(flow, 'airbyte_dbt_flow') # Callable + elif has_airbyte: + flow_name = f'{self.name}_airbyte_flow' + deployment_name = f'{self.name}_airbyte_deploy' + flow_function = getattr(flow, 'airbyte_flow') # Callable + elif has_dbt: + flow_name = f'{self.name}_dbt_flow' + deployment_name = f'{self.name}_dbt_deploy' + flow_function = getattr(flow, 'dbt_flow') # Callable - for airbyte_obj in airbyte_objs: - - flow.airbyte = airbyte_obj - - # Deploy a airbyte flow deployment = await Deployment.build_from_flow( - flow=flow.airbyte_flow.with_options(name=f'{self.name}_airbyte_flow'), - name=f"{self.name} - airbyte", - work_queue_name=os.getenv('PREFECT_WORK_QUEUE'), - tags = [airbyte_obj.connection_id, self.name], + flow=flow_function.with_options(name=flow_name), + name=deployment_name, + work_queue_name=work_queue_name, + tags = tags, ) - if self.schedule: - deployment.schedule = CronSchedule(cron = self.schedule) + if 'schedule' in pipeline: + deployment.schedule = CronSchedule(cron = pipeline['schedule']) await deployment.apply() - # Deploy a airbyte + dbt flow - deployment = await Deployment.build_from_flow( - flow=flow.airbyte_dbt_flow.with_options(name=f'{self.name}_airbyte_dbt_flow'), - name=f"{self.name} - airbyte + dbt", - work_queue_name=os.getenv('PREFECT_WORK_QUEUE'), - tags = [airbyte_obj.connection_id, self.name], - ) - if self.schedule: - deployment.schedule = CronSchedule(cron = self.schedule) - await deployment.apply() - - except Exception as e: - - print(e) - - await self.close_session() - - async def reset_deployments(self): - - try: - - deploymentFilter = DeploymentFilterTags(all_=[self.name]) - - flows = await read_flows(self.session, deployment_filter=deploymentFilter) - - api_url = PREFECT_UI_API_URL.value() - - # delete a flow removes the deployments too - for flow in flows: - requests.delete(api_url + f'/flows/{flow.id}') - # the code below doesn't seem to work - # await delete_flow(self.session, flow.id) - - await self.deploy() - - return - except Exception as e: print(e) From 4d60c4e54537f95b8a7117b50fd7f67c0135f026 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 6 Mar 2023 09:58:56 +0530 Subject: [PATCH 12/22] defining pipelines in config --- configExample.json | 22 ++++++++++++---------- main.py | 3 ++- organization/Organization.py | 9 +++++---- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/configExample.json b/configExample.json index cf8c607..797c186 100644 --- a/configExample.json +++ b/configExample.json @@ -1,12 +1,14 @@ { - "stir": { - "dbt_dir": "/Path/to/dbt/code", - "connection_ids": ["list of connection ids"], - "schedule": "0 */2 * * *" - }, - "sneha": { - "dbt_dir": "/Path/to/dbt/code", - "connection_ids": ["list of connection ids"], - "schedule": "crons expression" - } + "stir": [ + { + "connection_ids": [""], + "dbt_dir": null, + "schedule": "0 */2 * * *" + }, + { + "connection_ids": [""], + "dbt_dir": "/Path/to/dbt/code/repo", + "schedule": "0 */2 * * *" + } + ] } diff --git a/main.py b/main.py index 20a774f..93fe2d6 100644 --- a/main.py +++ b/main.py @@ -58,7 +58,8 @@ async def main(): organization = Organization(name=org_name, session=session(), pipelines=config[org_name]) - organization.deploy(reset == 'yes') + await organization.deploy(reset == 'yes') + await organization.close_session() except Exception as e: diff --git a/organization/Organization.py b/organization/Organization.py index 5152b3c..dfca384 100644 --- a/organization/Organization.py +++ b/organization/Organization.py @@ -16,13 +16,13 @@ class Organization(BaseModel): name: str = None - pipelines: List[Mapping[str, Union[str, List[str]]]] + pipelines: List = None session: Session = None class Config: arbitrary_types_allowed=True - def __init__(self, name: str, session: Session, pipelines: List[Mapping[str, Union[str, List[str]]]]): + def __init__(self, name: str, session: Session, pipelines: List): super().__init__() self.name = name @@ -44,6 +44,8 @@ async def deploy(self, reset = False): # the code below doesn't seem to work # await delete_flow(self.session, flow.id) + work_queue_name = os.getenv('PREFECT_WORK_QUEUE') + for pipeline in self.pipelines: # Each pipeline will deployed and run as a flow as per the schedule @@ -54,7 +56,6 @@ async def deploy(self, reset = False): deployment_name = '' flow_function = None tags = [self.name] - work_queue_name = os.getenv('PREFECT_WORK_QUEUE'), has_airbyte = False has_dbt = False @@ -67,7 +68,7 @@ async def deploy(self, reset = False): airbyte_objs.append(airbyte) # Check if dbt transformation is part of repo - if 'dbt_dir' in pipeline: + if 'dbt_dir' in pipeline and pipeline['dbt_dir'] is not None: has_dbt = True dbt_obj = Dbt(pipeline['dbt_dir'], os.getenv('DBT_VENV')) From 833e9b8636c053d596f2e836d4cbcc20843f1086 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 6 Mar 2023 13:43:45 +0530 Subject: [PATCH 13/22] flows dont need organization name --- flows/Flow.py | 10 ++++------ organization/Organization.py | 6 ++++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flows/Flow.py b/flows/Flow.py index 5dbceac..b37e35a 100644 --- a/flows/Flow.py +++ b/flows/Flow.py @@ -7,23 +7,21 @@ class Flow(BaseModel): airbytes: List[Airbyte] = None dbt: Dbt = None - org_name: str = None class Config: arbitrary_types_allowed=True - def __init__(self, airbyte_arr: List[Airbyte], dbt: Dbt, org_name: str): + def __init__(self, airbyte_arr: List[Airbyte], dbt: Dbt): super().__init__() self.airbytes = airbyte_arr self.dbt = dbt - self.org_name = org_name - @flow(name=f'{org_name} airbyte_flow') + @flow def airbyte_flow(self): for airbyte in self.airbytes: airbyte.sync() - @flow(name='dbt_flow') + @flow def dbt_flow(self): self.dbt.pull_dbt_repo() self.dbt.dbt_deps() @@ -31,7 +29,7 @@ def dbt_flow(self): self.dbt.dbt_run() self.dbt.dbt_test() - @flow(name='airbyte_dbt_flow') + @flow def airbyte_dbt_flow(self): for airbyte in self.airbytes: diff --git a/organization/Organization.py b/organization/Organization.py index dfca384..afb3283 100644 --- a/organization/Organization.py +++ b/organization/Organization.py @@ -72,7 +72,7 @@ async def deploy(self, reset = False): has_dbt = True dbt_obj = Dbt(pipeline['dbt_dir'], os.getenv('DBT_VENV')) - flow = Flow(airbyte_arr=airbyte_objs, dbt=dbt_obj, org_name=self.name) + flow = Flow(airbyte_arr=airbyte_objs, dbt=dbt_obj) if has_airbyte and has_dbt: flow_name = f'{self.name}_airbyte_dbt_flow' @@ -93,10 +93,12 @@ async def deploy(self, reset = False): work_queue_name=work_queue_name, tags = tags, ) - if 'schedule' in pipeline: + if 'schedule' in pipeline and len(pipeline['schedule']) > 3: deployment.schedule = CronSchedule(cron = pipeline['schedule']) await deployment.apply() + return deployment + except Exception as e: print(e) From 8cbf36201e5a36e757ef13e7b45ee2dc58af4755 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 6 Mar 2023 17:34:35 +0530 Subject: [PATCH 14/22] fixed the deployment from flow issue --- flows/Flow.py | 54 +++++++++++++++--------------------- organization/Organization.py | 17 +++++++----- tasks/Airbyte.py | 2 +- 3 files changed, 34 insertions(+), 39 deletions(-) diff --git a/flows/Flow.py b/flows/Flow.py index b37e35a..07242a2 100644 --- a/flows/Flow.py +++ b/flows/Flow.py @@ -5,38 +5,30 @@ from pydantic import BaseModel class Flow(BaseModel): - airbytes: List[Airbyte] = None - dbt: Dbt = None - class Config: - arbitrary_types_allowed=True + airbytes: List[Airbyte] | None + dbt: Dbt | None - def __init__(self, airbyte_arr: List[Airbyte], dbt: Dbt): - super().__init__() +@flow +def airbyte_flow(flow: Flow): + for airbyte in flow.airbytes: + airbyte.sync() - self.airbytes = airbyte_arr - self.dbt = dbt +@flow +def dbt_flow(flow: Flow): + flow.dbt.pull_dbt_repo() + flow.dbt.dbt_deps() + flow.dbt.dbt_source_snapshot_freshness() + flow.dbt.dbt_run() + flow.dbt.dbt_test() - @flow - def airbyte_flow(self): - for airbyte in self.airbytes: - airbyte.sync() +@flow +def airbyte_dbt_flow(flow: Flow): + + for airbyte in flow.airbytes: + airbyte.sync() - @flow - def dbt_flow(self): - self.dbt.pull_dbt_repo() - self.dbt.dbt_deps() - self.dbt.dbt_source_snapshot_freshness() - self.dbt.dbt_run() - self.dbt.dbt_test() - - @flow - def airbyte_dbt_flow(self): - - for airbyte in self.airbytes: - airbyte.sync() - - self.dbt.pull_dbt_repo() - self.dbt.dbt_deps() - self.dbt.dbt_source_snapshot_freshness() - self.dbt.dbt_run() - self.dbt.dbt_test() \ No newline at end of file + flow.dbt.pull_dbt_repo() + flow.dbt.dbt_deps() + flow.dbt.dbt_source_snapshot_freshness() + flow.dbt.dbt_run() + flow.dbt.dbt_test() \ No newline at end of file diff --git a/organization/Organization.py b/organization/Organization.py index afb3283..5d54749 100644 --- a/organization/Organization.py +++ b/organization/Organization.py @@ -1,5 +1,6 @@ import os import requests +from prefect import flow from typing import List, Mapping, Union from pydantic import BaseModel from prefect.deployments import Deployment @@ -12,7 +13,7 @@ from tasks.Dbt import Dbt from tasks.Airbyte import Airbyte -from flows.Flow import Flow +from flows.Flow import airbyte_flow, dbt_flow, airbyte_dbt_flow class Organization(BaseModel): name: str = None @@ -61,7 +62,7 @@ async def deploy(self, reset = False): has_dbt = False # Check if airbyte connections are part of pipeline - if 'connection_ids' in pipeline and len(pipeline['connection_ids']) > 0: + if 'connection_ids' in pipeline and pipeline['connection_ids'] is not None and len(pipeline['connection_ids']) > 0: has_airbyte = True for connection_id in pipeline['connection_ids']: airbyte = Airbyte(connection_id=connection_id) @@ -72,20 +73,21 @@ async def deploy(self, reset = False): has_dbt = True dbt_obj = Dbt(pipeline['dbt_dir'], os.getenv('DBT_VENV')) - flow = Flow(airbyte_arr=airbyte_objs, dbt=dbt_obj) - if has_airbyte and has_dbt: flow_name = f'{self.name}_airbyte_dbt_flow' deployment_name = f'{self.name}_airbyte_dbt_deploy' - flow_function = getattr(flow, 'airbyte_dbt_flow') # Callable + # flow_function = getattr(flow, 'airbyte_dbt_flow') # Callable + flow_function = airbyte_dbt_flow elif has_airbyte: flow_name = f'{self.name}_airbyte_flow' deployment_name = f'{self.name}_airbyte_deploy' - flow_function = getattr(flow, 'airbyte_flow') # Callable + # flow_function = getattr(flow, 'airbyte_flow') # Callable + flow_function = airbyte_flow elif has_dbt: flow_name = f'{self.name}_dbt_flow' deployment_name = f'{self.name}_dbt_deploy' - flow_function = getattr(flow, 'dbt_flow') # Callable + # flow_function = getattr(flow, 'dbt_flow') # Callable + flow_function = dbt_flow deployment = await Deployment.build_from_flow( flow=flow_function.with_options(name=flow_name), @@ -93,6 +95,7 @@ async def deploy(self, reset = False): work_queue_name=work_queue_name, tags = tags, ) + deployment.parameters = {'flow': {'airbytes': airbyte_objs, 'dbt': dbt_obj}} if 'schedule' in pipeline and len(pipeline['schedule']) > 3: deployment.schedule = CronSchedule(cron = pipeline['schedule']) await deployment.apply() diff --git a/tasks/Airbyte.py b/tasks/Airbyte.py index f68279f..0451ae5 100644 --- a/tasks/Airbyte.py +++ b/tasks/Airbyte.py @@ -13,6 +13,6 @@ def sync(self) -> None: trigger_sync( connection_id=self.connection_id, - poll_interval_s=3, + poll_interval_s=15, status_updates=True ) \ No newline at end of file From 827b5227ba8ab8d1c33e87ce3149c1e652feb517 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 6 Mar 2023 17:40:42 +0530 Subject: [PATCH 15/22] updated the typing in flow class --- flows/Flow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flows/Flow.py b/flows/Flow.py index 07242a2..f484ead 100644 --- a/flows/Flow.py +++ b/flows/Flow.py @@ -1,12 +1,12 @@ from prefect import flow -from typing import List +from typing import List, Union from tasks.Airbyte import Airbyte from tasks.Dbt import Dbt from pydantic import BaseModel class Flow(BaseModel): - airbytes: List[Airbyte] | None - dbt: Dbt | None + airbytes: Union[List[Airbyte], None] + dbt: Union[Dbt, None] @flow def airbyte_flow(flow: Flow): From 9d8c100f39448f6b43389f9320c3031c2a747f03 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 6 Mar 2023 17:59:36 +0530 Subject: [PATCH 16/22] clean up --- tasks/Dbt.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tasks/Dbt.py b/tasks/Dbt.py index 9824442..88584c2 100644 --- a/tasks/Dbt.py +++ b/tasks/Dbt.py @@ -23,16 +23,16 @@ def pull_dbt_repo(self) -> None: shell_run_command(command=f'git pull', cwd=self.dbt_code_path) def dbt_deps(self) -> None: - shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt deps', cwd=self.dbt_code_path) + shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt deps', cwd=self.dbt_code_path) def dbt_source_snapshot_freshness(self): - shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt source snapshot-freshness', cwd=self.dbt_code_path) + shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt source snapshot-freshness', cwd=self.dbt_code_path) def dbt_run(self) -> None: - shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt run', cwd=self.dbt_code_path) + shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt run', cwd=self.dbt_code_path) def dbt_test(self) -> None: - shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt test', cwd=self.dbt_code_path) + shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt test', cwd=self.dbt_code_path) def dbt_docs_generate(self) -> None: - shell_run_command(helper_command= f'source ${self.dbt_venv_path}/bin/activate', command=f'dbt docs generate', cwd=self.dbt_code_path) \ No newline at end of file + shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt docs generate', cwd=self.dbt_code_path) \ No newline at end of file From f89a77c8c681147e85df7ad3b52b1cbe7ddf2476 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 6 Mar 2023 18:12:44 +0530 Subject: [PATCH 17/22] scheudle should be null in config if not used --- organization/Organization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/organization/Organization.py b/organization/Organization.py index 5d54749..be4b10e 100644 --- a/organization/Organization.py +++ b/organization/Organization.py @@ -96,7 +96,7 @@ async def deploy(self, reset = False): tags = tags, ) deployment.parameters = {'flow': {'airbytes': airbyte_objs, 'dbt': dbt_obj}} - if 'schedule' in pipeline and len(pipeline['schedule']) > 3: + if 'schedule' in pipeline and pipeline['schedule'] is not None and len(pipeline['schedule']) > 3: deployment.schedule = CronSchedule(cron = pipeline['schedule']) await deployment.apply() From b6476e1dece826dc42acf0fc0a969fa09d62cdd8 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 7 Mar 2023 16:48:38 +0530 Subject: [PATCH 18/22] setting up clear flow run names and task run names for readibility on UI --- flows/Flow.py | 6 +++--- tasks/Airbyte.py | 3 ++- tasks/Dbt.py | 12 ++++++------ 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/flows/Flow.py b/flows/Flow.py index f484ead..077309e 100644 --- a/flows/Flow.py +++ b/flows/Flow.py @@ -8,12 +8,12 @@ class Flow(BaseModel): airbytes: Union[List[Airbyte], None] dbt: Union[Dbt, None] -@flow +@flow(flow_run_name='airbyte_flow') def airbyte_flow(flow: Flow): for airbyte in flow.airbytes: airbyte.sync() -@flow +@flow(flow_run_name='dbt_flow') def dbt_flow(flow: Flow): flow.dbt.pull_dbt_repo() flow.dbt.dbt_deps() @@ -21,7 +21,7 @@ def dbt_flow(flow: Flow): flow.dbt.dbt_run() flow.dbt.dbt_test() -@flow +@flow(flow_run_name='airbyte_dbt_flow') def airbyte_dbt_flow(flow: Flow): for airbyte in flow.airbytes: diff --git a/tasks/Airbyte.py b/tasks/Airbyte.py index 0451ae5..fd470cd 100644 --- a/tasks/Airbyte.py +++ b/tasks/Airbyte.py @@ -1,3 +1,4 @@ +from prefect import task, flow from pydantic import BaseModel from prefect_airbyte.connections import trigger_sync @@ -11,7 +12,7 @@ def __init__(self, connection_id: str) -> None: def sync(self) -> None: - trigger_sync( + trigger_sync.with_options(name="airbyte_sync")( connection_id=self.connection_id, poll_interval_s=15, status_updates=True diff --git a/tasks/Dbt.py b/tasks/Dbt.py index 88584c2..1580d04 100644 --- a/tasks/Dbt.py +++ b/tasks/Dbt.py @@ -20,19 +20,19 @@ def __init__(self, dbt_code_path: str, dbt_venv_path: str) -> None: raise Exception('Dbt virtual environment is not setup') def pull_dbt_repo(self) -> None: - shell_run_command(command=f'git pull', cwd=self.dbt_code_path) + shell_run_command.with_options(name='pull_dbt_repo')(command=f'git pull', cwd=self.dbt_code_path) def dbt_deps(self) -> None: - shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt deps', cwd=self.dbt_code_path) + shell_run_command.with_options(name='dbt_deps')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt deps', cwd=self.dbt_code_path) def dbt_source_snapshot_freshness(self): - shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt source snapshot-freshness', cwd=self.dbt_code_path) + shell_run_command.with_options(name='dbt_source_snapshot_freshness')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt source snapshot-freshness', cwd=self.dbt_code_path) def dbt_run(self) -> None: - shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt run', cwd=self.dbt_code_path) + shell_run_command.with_options(name='dbt_run')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt run', cwd=self.dbt_code_path) def dbt_test(self) -> None: - shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt test', cwd=self.dbt_code_path) + shell_run_command.with_options(name='dbt_test')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt test', cwd=self.dbt_code_path) def dbt_docs_generate(self) -> None: - shell_run_command(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt docs generate', cwd=self.dbt_code_path) \ No newline at end of file + shell_run_command.with_options(name='dbt_test')(helper_command= f'source {self.dbt_venv_path}/bin/activate', command=f'dbt docs generate', cwd=self.dbt_code_path) \ No newline at end of file From 15f66f0e91cb9f1dcea8ee58766da142e9fc912b Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 4 Apr 2023 11:58:55 +0530 Subject: [PATCH 19/22] discord package for notifications --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index ed33587..e8be5f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,6 +45,7 @@ packaging==23.0 pathspec==0.11.0 pendulum==2.1.2 prefect==2.8.3 +prefect-2-discord==0.1.1 prefect-airbyte==0.2.0 prefect-shell==0.1.5 pyasn1==0.4.8 From 51dd5fe45546f5b3f243d6cfc0621b5126af3a1b Mon Sep 17 00:00:00 2001 From: vinod-rajasekaran Date: Wed, 5 Apr 2023 21:01:38 +0530 Subject: [PATCH 20/22] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0ca7cff..66bbde4 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ - Activate the virtual env ``` source /bin/activate ``` - Install the dependencies from requirements.txt ``` pip3 install -r requirements.txt ``` - If you are using any new packages, install them in your virtual env and use this to update the requirements.txt ``` pip3 freeze > requirements.txt ``` -- Start the prefect orion UI server ``` prefect orion start ``` +- Start the prefect orion UI server ``` prefect server start ``` - To execute your flow you need to run an agent in different terminal ``` prefect agent start -q default ```. This will run the default queue worker. # prefect cli tools From ee305a56ecacb5557b9c620523e0d5fe6c2e5b3c Mon Sep 17 00:00:00 2001 From: vinod-rajasekaran Date: Wed, 5 Apr 2023 21:02:16 +0530 Subject: [PATCH 21/22] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 66bbde4..026431e 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ - Activate the virtual env ``` source /bin/activate ``` - Install the dependencies from requirements.txt ``` pip3 install -r requirements.txt ``` - If you are using any new packages, install them in your virtual env and use this to update the requirements.txt ``` pip3 freeze > requirements.txt ``` -- Start the prefect orion UI server ``` prefect server start ``` +- Start the prefect orion UI server ``` screen -S prefect-server -dm prefect server start ``` - To execute your flow you need to run an agent in different terminal ``` prefect agent start -q default ```. This will run the default queue worker. # prefect cli tools From 9cefcc2d5a79ccda06720e0563b4a92d207e274f Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 20 Jun 2023 18:28:30 +0530 Subject: [PATCH 22/22] added shri --- main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.py b/main.py index 93fe2d6..731eb5f 100644 --- a/main.py +++ b/main.py @@ -27,7 +27,7 @@ async def main(): parser.add_argument( '--deploy', required=True, - choices=['stir', 'sneha'], + choices=['stir', 'sneha', 'shri'], help='please enter the name of the NGO', metavar='' )