Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions tap_github/repository_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import requests
from singer_sdk import typing as th # JSON Schema typing helpers
from singer_sdk.helpers.jsonpath import extract_jsonpath

from tap_github.client import GitHubRestStream

Expand Down Expand Up @@ -1407,3 +1408,178 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]:
th.Property("type", th.StringType),
th.Property("site_admin", th.BooleanType),
).to_dict()


class WorkflowsStream(GitHubRestStream):
"""Defines 'workflows' stream."""

MAX_PER_PAGE = 100

name = "workflows"
path = "/repos/{org}/{repo}/actions/workflows"
primary_keys = ["id"]
replication_key = "updated_at"
parent_stream_type = RepositoryStream
ignore_parent_replication_key = True
state_partitioning_keys = ["repo", "org"]
records_jsonpath = "$.workflows[*]"

schema = th.PropertiesList(
# Parent keys
th.Property("repo", th.StringType),
th.Property("org", th.StringType),
# PR keys
th.Property("id", th.IntegerType),
th.Property("node_id", th.StringType),
th.Property("name", th.StringType),
th.Property("path", th.StringType),
th.Property("state", th.StringType),
th.Property("created_at", th.DateTimeType),
th.Property("updated_at", th.DateTimeType),
th.Property("url", th.StringType),
th.Property("html_url", th.StringType),
th.Property("badge_url", th.StringType),
).to_dict()

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())


class WorkflowRunsStream(GitHubRestStream):
"""Defines 'workflow_runs' stream."""

name = "workflow_runs"
path = "/repos/{org}/{repo}/actions/runs"
primary_keys = ["id"]
replication_key = "updated_at"
parent_stream_type = RepositoryStream
ignore_parent_replication_key = False
state_partitioning_keys = ["repo", "org"]
records_jsonpath = "$.workflow_runs[*]"

schema = th.PropertiesList(
# Parent keys
th.Property("repo", th.StringType),
th.Property("org", th.StringType),
# PR keys
th.Property("id", th.IntegerType),
th.Property("name", th.StringType),
th.Property("node_id", th.StringType),
th.Property("head_branch", th.StringType),
th.Property("head_sha", th.StringType),
th.Property("run_number", th.IntegerType),
th.Property("event", th.StringType),
th.Property("status", th.StringType),
th.Property("conclusion", th.StringType),
th.Property("workflow_id", th.IntegerType),
th.Property("url", th.StringType),
th.Property("html_url", th.StringType),
th.Property(
"pull_requests",
th.ArrayType(
th.ObjectType(
th.Property("id", th.IntegerType),
th.Property("number", th.IntegerType),
)
),
),
th.Property("created_at", th.DateTimeType),
th.Property("updated_at", th.DateTimeType),
th.Property("jobs_url", th.StringType),
th.Property("logs_url", th.StringType),
th.Property("check_suite_url", th.StringType),
th.Property("artifacts_url", th.StringType),
th.Property("cancel_url", th.StringType),
th.Property("rerun_url", th.StringType),
th.Property("workflow_url", th.StringType),
).to_dict()

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a child context object from the record and optional provided context.
By default, will return context if provided and otherwise the record dict.
Developers may override this behavior to send specific information to child
streams for context.
"""
return {
"org": context["org"] if context else None,
"repo": context["repo"] if context else None,
"run_id": record["id"],
}


class WorkflowRunJobsStream(GitHubRestStream):
"""Defines 'workflow_run_jobs' stream."""

MAX_PER_PAGE = 100

name = "workflow_run_jobs"
path = "/repos/{org}/{repo}/actions/runs/{run_id}/jobs"
primary_keys = ["id"]
parent_stream_type = WorkflowRunsStream
ignore_parent_replication_key = False
state_partitioning_keys = ["repo", "org", "run_id"]
records_jsonpath = "$.jobs[*]"

schema = th.PropertiesList(
# Parent keys
th.Property("repo", th.StringType),
th.Property("org", th.StringType),
# PR keys
th.Property("id", th.IntegerType),
th.Property("run_id", th.IntegerType),
th.Property("run_url", th.StringType),
th.Property("node_id", th.StringType),
th.Property("head_sha", th.StringType),
th.Property("url", th.StringType),
th.Property("html_url", th.StringType),
th.Property("status", th.StringType),
th.Property("conclusion", th.StringType),
th.Property("started_at", th.DateTimeType),
th.Property("completed_at", th.DateTimeType),
th.Property("name", th.StringType),
th.Property(
"steps",
th.ArrayType(
th.ObjectType(
th.Property("name", th.StringType),
th.Property("status", th.StringType),
th.Property("conclusion", th.StringType),
th.Property("number", th.IntegerType),
th.Property("started_at", th.DateTimeType),
th.Property("completed_at", th.DateTimeType),
)
),
),
th.Property("check_run_url", th.StringType),
th.Property("labels", th.ArrayType(th.StringType)),
th.Property("runner_id", th.IntegerType),
th.Property("runner_name", th.StringType),
th.Property("runner_group_id", th.IntegerType),
th.Property("runner_group_name", th.StringType),
).to_dict()

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._schema_emitted = False

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
params = super().get_url_params(context, next_page_token)
params["filter"] = "all"
return params

def _write_schema_message(self):
"""Write out a SCHEMA message with the stream schema."""
if not self._schema_emitted:
super()._write_schema_message()
self._schema_emitted = True
6 changes: 6 additions & 0 deletions tap_github/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
RepositoryStream,
StargazersStream,
StatsContributorsStream,
WorkflowsStream,
WorkflowRunJobsStream,
WorkflowRunsStream,
)
from tap_github.user_streams import (
StarredStream,
Expand Down Expand Up @@ -114,6 +117,9 @@ def discover_streams(self) -> List[Stream]:
RepositoryStream(tap=self),
StargazersStream(tap=self),
StatsContributorsStream(tap=self),
WorkflowsStream(tap=self),
WorkflowRunJobsStream(tap=self),
WorkflowRunsStream(tap=self),
]


Expand Down