Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2de3ee0
cleanup
Ishankoradia24 Feb 28, 2023
b5a5ae3
modularized code to setup deployments for multiple organizations with…
Ishankoradia24 Mar 1, 2023
a361e06
resetting organizartion deployments WIP
Ishankoradia24 Mar 2, 2023
448b768
scheduling cron in deployments from config
Ishankoradia24 Mar 3, 2023
f37699c
updated cron expression
Ishankoradia24 Mar 3, 2023
995deb7
Update README.md
Ishankoradia Mar 3, 2023
62a34c0
dependencies update
Ishankoradia24 Mar 3, 2023
a5c8074
minor changes in flow logic
Ishankoradia24 Mar 3, 2023
c381f33
take the prefect worker queue from env
Ishankoradia24 Mar 3, 2023
671c12a
added an env exmaple for config
Ishankoradia24 Mar 3, 2023
3fe3ff1
Work in progress config validation
Ishankoradia24 Mar 5, 2023
4d60c4e
defining pipelines in config
Ishankoradia24 Mar 6, 2023
833e9b8
flows dont need organization name
Ishankoradia24 Mar 6, 2023
8cbf362
fixed the deployment from flow issue
Ishankoradia24 Mar 6, 2023
827b522
updated the typing in flow class
Ishankoradia24 Mar 6, 2023
9d8c100
clean up
Ishankoradia24 Mar 6, 2023
f89a77c
scheudle should be null in config if not used
Ishankoradia24 Mar 6, 2023
b6476e1
setting up clear flow run names and task run names for readibility on UI
Ishankoradia24 Mar 7, 2023
c1be4e7
Merge pull request #3 from DevDataPlatform/feature/tasks-run-name
Ishankoradia Mar 7, 2023
15f66f0
discord package for notifications
Ishankoradia24 Apr 4, 2023
51dd5fe
Update README.md
vinod-rajasekaran Apr 5, 2023
ee305a5
Update README.md
vinod-rajasekaran Apr 5, 2023
9cefcc2
added shri
Ishankoradia24 Jun 20, 2023
310411a
Merge remote-tracking branch 'origin/ddp-org' into ddp-org
Ishankoradia24 Jun 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# CONFIG
DBT_VENV=/path/to/dbt/virtual/env
PREFECT_WORK_QUEUE=ddp
10 changes: 8 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
logs
prefect-env
dbt-env
.env
.*env
*venv
dbt/

*.pyc

.*yaml
.*yml

.DS_store

# prefect artifacts
.prefectignore

*.json
config.json
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,15 @@
# ddp_prefect_starter

- Setup a virtual environment using ``` python3 -m venv <name> ```
- Activate the virtual env ``` source <name>/bin/activate ```
- Install the dependencies from requirements.txt ``` pip3 install -r requirements.txt ```
- If you are using any new packages, install them in your virtual env and use this to update the requirements.txt ``` pip3 freeze > requirements.txt ```
- Start the prefect orion UI server ``` screen -S prefect-server -dm prefect server start ```
- To execute your flow you need to run an agent in different terminal ``` prefect agent start -q default ```. This will run the default queue worker.

# prefect cli tools

- Run the help command to understand the options ``` python3 main.py -h ```
- Setup the organization config in ```config.json``` . The boilerplate for this is available in ``` configExample.json ```
- Run the main file to deploy your organization ``` python3 main.py --deploy <org_name> ``` based on the config entered in ```config.json```
- To reset the deployments after updates in config run ``` python3 main.py --deploy <org_name> --reset yes ```
14 changes: 14 additions & 0 deletions configExample.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"stir": [
{
"connection_ids": ["<connection_ids>"],
"dbt_dir": null,
"schedule": "0 */2 * * *"
},
{
"connection_ids": ["<connection_ids>"],
"dbt_dir": "/Path/to/dbt/code/repo",
"schedule": "0 */2 * * *"
}
]
}
Empty file added deployments/Deployment.py
Empty file.
Empty file added deployments/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions flows/Flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from prefect import flow
from typing import List, Union
from tasks.Airbyte import Airbyte
from tasks.Dbt import Dbt
from pydantic import BaseModel

class Flow(BaseModel):
airbytes: Union[List[Airbyte], None]
dbt: Union[Dbt, None]

@flow(flow_run_name='airbyte_flow')
def airbyte_flow(flow: Flow):
for airbyte in flow.airbytes:
airbyte.sync()

@flow(flow_run_name='dbt_flow')
def dbt_flow(flow: Flow):
flow.dbt.pull_dbt_repo()
flow.dbt.dbt_deps()
flow.dbt.dbt_source_snapshot_freshness()
flow.dbt.dbt_run()
flow.dbt.dbt_test()

@flow(flow_run_name='airbyte_dbt_flow')
def airbyte_dbt_flow(flow: Flow):

for airbyte in flow.airbytes:
airbyte.sync()

flow.dbt.pull_dbt_repo()
flow.dbt.dbt_deps()
flow.dbt.dbt_source_snapshot_freshness()
flow.dbt.dbt_run()
flow.dbt.dbt_test()
Empty file added flows/__init__.py
Empty file.
104 changes: 58 additions & 46 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,74 @@
from prefect import flow, task
from prefect_airbyte.connections import trigger_sync
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_shell import shell_run_command
import sys, getopt, argparse, json, os
from dotenv import load_dotenv
import os
from pathlib import Path
import asyncio
from sqlalchemy import create_engine
from prefect.settings import PREFECT_API_DATABASE_CONNECTION_URL, PREFECT_API_URL
from sqlalchemy.ext.asyncio import create_async_engine, async_session, AsyncSession
from sqlalchemy.orm import sessionmaker

from organization.Organization import Organization

# Load env
load_dotenv()

@flow(name="airbyte-sync")
def run_airbyte_sync():
trigger_sync(
connection_id=os.getenv('DOST_AIRBYTE_CONNECTION'),
poll_interval_s=3,
status_updates=True
)
return 1
# Load config
async def main():

@flow(name="github-pull")
def pull_dost_github_repo():
shell_run_command('rm -rf dbt && git clone '+ os.getenv('DOST_GITHUB_URL'))
return 1
try:
config = None
if not Path('config.json').is_file():
raise Exception('Config file not found')

@flow(name="dbt-transform")
def run_dbt_transform():
trigger_dbt_cli_command(
command="dbt deps", project_dir='dbt'
)
trigger_dbt_cli_command(
command="dbt run", project_dir='dbt'
)
return 1
with open('config.json') as f:
config = json.load(f)

@flow(name="orchestration-flow")
def run_flow():
parser = argparse.ArgumentParser(description='Prefect deployment of NGOs')
parser.add_argument(
'--deploy',
required=True,
choices=['stir', 'sneha', 'shri'],
help='please enter the name of the NGO',
metavar='<org_name>'
)
parser.add_argument(
'--reset',
required=False,
choices=['yes', 'no'],
default='no',
help='resetting the deployments will remove all deployments and create fresh ones',
metavar='<yes or no>'
)
args = parser.parse_args()

#syncing airbyte
run_airbyte_sync()
if args.deploy not in config:
raise Exception(f'Config for {args.deploy} org not found')

# cli args
org_name = args.deploy
reset = args.reset

# pull dbt repo
pull_dost_github_repo()
# create a datbase session
url = PREFECT_API_DATABASE_CONNECTION_URL.value()
engine = create_async_engine(
url,
echo=True,
)
session = sessionmaker(engine, class_=AsyncSession)

#dbt transform
run_dbt_transform()
organization = Organization(name=org_name, session=session(), pipelines=config[org_name])

@flow(name="orchestrate-airbyte")
def run_airbyte_flow():
await organization.deploy(reset == 'yes')

await organization.close_session()

#syncing airbyte
run_airbyte_sync()
except Exception as e:

@flow(name="orchestrate-dbt")
def run_dbt_flow():
print(e)

# pull dbt repo
pull_dost_github_repo()
if __name__ == "__main__":

#dbt transform
run_dbt_transform()
asyncio.run(main())


if __name__ == "__main__":
run_dbt_flow()

38 changes: 0 additions & 38 deletions orchestrate-airbyte-deployment.yaml

This file was deleted.

39 changes: 0 additions & 39 deletions orchestrate-dbt-deployment.yaml

This file was deleted.

38 changes: 0 additions & 38 deletions orchestration-flow-deployment.yaml

This file was deleted.

Loading