Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions ddpui/ddpairbyte/airbytehelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def create_airbyte_deployment(org: Org, org_task: OrgTask, server_block: OrgPref

def create_connection(org: Org, payload: AirbyteConnectionCreate):
"""creates an airbyte connection and tracking objects in the database"""

warehouse = OrgWarehouse.objects.filter(org=org).first()
if warehouse is None:
return None, "need to set up a warehouse first"
Expand All @@ -253,6 +254,10 @@ def create_connection(org: Org, payload: AirbyteConnectionCreate):
if clear_task is None:
return None, "clear task not supported"

# Validate description length
if payload.description and len(payload.description) > 100:
return None, "Description cannot exceed 100 characters"

airbyte_conn = airbyte_service.create_connection(org.airbyte_workspace_id, payload)

try:
Expand All @@ -279,7 +284,9 @@ def create_connection(org: Org, payload: AirbyteConnectionCreate):
sync_dataflow.save()

ConnectionMeta.objects.create(
connection_id=airbyte_conn["connectionId"], connection_name=payload.name
connection_id=airbyte_conn["connectionId"],
connection_name=payload.name,
description=payload.description,
)

except Exception as err:
Expand All @@ -302,6 +309,7 @@ def create_connection(org: Org, payload: AirbyteConnectionCreate):
"deploymentId": sync_dataflow.deployment_id,
"resetConnDeploymentId": None,
"clearConnDeploymentId": clear_dataflow.deployment_id,
"description": payload.description,
}
return res, None

Expand Down Expand Up @@ -387,6 +395,11 @@ def get_connections(org: Org) -> Tuple[List[AirbyteGetConnectionsResponse], None

airbyte_connections = airbyte_service.get_webbackend_connections(org.airbyte_workspace_id)

# Fetch all connection descriptions for this org's connections
connection_ids = [conn["connectionId"] for conn in airbyte_connections]
connection_metas = ConnectionMeta.objects.filter(connection_id__in=connection_ids)
description_map = {meta.connection_id: meta.description for meta in connection_metas}

res: list[dict] = []
connections_to_clean_up: list[str] = []
redisclient = RedisClient.get_instance()
Expand Down Expand Up @@ -502,6 +515,7 @@ def ensure_only_one_add_across_parallel_requests(connection_id: str):
if lock and lock["status"] == TaskLockStatus.QUEUED
else None
),
"description": description_map.get(connection["connectionId"]),
}
)

Expand Down Expand Up @@ -581,6 +595,10 @@ def get_one_connection(org: Org, connection_id: str):
reset_lock = TaskLock.objects.filter(orgtask=reset_dataflow_orgtask.orgtask).first()
lock = fetch_orgtask_lock_v1(reset_dataflow_orgtask.orgtask, reset_lock)

# Fetch the description from ConnectionMeta
connection_meta = ConnectionMeta.objects.filter(connection_id=connection_id).first()
description = connection_meta.description if connection_meta else None

res = {
"name": airbyte_conn["name"],
"connectionId": airbyte_conn["connectionId"],
Expand All @@ -599,6 +617,7 @@ def get_one_connection(org: Org, connection_id: str):
),
"lock": lock,
"resetConnDeploymentId": (reset_dataflow.deployment_id if reset_dataflow else None),
"description": description,
}

return res, None
Expand All @@ -624,6 +643,10 @@ def update_connection(org: Org, connection_id: str, payload: AirbyteConnectionUp
if len(payload.streams) == 0:
return None, "must specify stream names"

# Validate description length
if payload.description and len(payload.description) > 100:
return None, "Description cannot exceed 100 characters"

# fetch connection by id from airbyte
connection = airbyte_service.get_connection(org.airbyte_workspace_id, connection_id)
if connection["status"] == AIRBYTE_CONNECTION_DEPRECATED:
Expand All @@ -642,10 +665,15 @@ def update_connection(org: Org, connection_id: str, payload: AirbyteConnectionUp
# update the airbyte connection
res = airbyte_service.update_connection(org.airbyte_workspace_id, payload, connection)

# Update ConnectionMeta with name and description
update_fields = {}
if payload.name:
ConnectionMeta.objects.filter(connection_id=connection_id).update(
connection_name=connection["name"]
)
update_fields["connection_name"] = connection["name"]
if payload.description:
update_fields["description"] = payload.description

if update_fields:
ConnectionMeta.objects.filter(connection_id=connection_id).update(**update_fields)

return res, None

Expand Down
4 changes: 4 additions & 0 deletions ddpui/ddpairbyte/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class AirbyteConnectionCreate(Schema):
syncCatalog: dict
destinationId: str = None
destinationSchema: str = None
description: str = None


class AirbyteConnectionCreateResponse(Schema):
Expand All @@ -84,6 +85,7 @@ class AirbyteConnectionCreateResponse(Schema):
lock: Optional[dict | None]
isRunning: bool = False
resetConnDeploymentId: str = None
description: str = None


class AirbyteGetConnectionsResponse(Schema):
Expand All @@ -102,6 +104,7 @@ class AirbyteGetConnectionsResponse(Schema):
resetConnDeploymentId: str = None
clearConnDeploymentId: str = None
queuedFlowRunWaitTime: DeploymentCurrentQueueTime = None
description: str = None


class AirbyteConnectionUpdate(Schema):
Expand All @@ -113,6 +116,7 @@ class AirbyteConnectionUpdate(Schema):
catalogId: str
destinationId: str = None
destinationSchema: str = None
description: str = None


# response schemas
Expand Down
22 changes: 22 additions & 0 deletions ddpui/migrations/0128_add_description_to_connection_meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 4.2 on 2025-08-18 04:45

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("ddpui", "0127_alter_orgwarehouse_name"),
]

operations = [
migrations.AddField(
model_name="connectionmeta",
name="description",
field=models.CharField(
max_length=100,
blank=True,
help_text="Description of the connection (max 100 characters)",
null=True,
),
),
]
6 changes: 6 additions & 0 deletions ddpui/models/org.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ class ConnectionMeta(models.Model):

connection_id = models.CharField(max_length=36, null=False)
connection_name = models.CharField(max_length=100, null=True)
description = models.CharField(
max_length=100,
null=True,
blank=True,
help_text="Description of the connection (max 100 characters)",
)


class ConnectionJob(models.Model):
Expand Down