diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index 95e310c4..617c1d12 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -4,6 +4,7 @@ import requests from singer_sdk import typing as th # JSON Schema typing helpers +from singer_sdk.helpers.jsonpath import extract_jsonpath from tap_github.client import GitHubRestStream @@ -1407,3 +1408,178 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: th.Property("type", th.StringType), th.Property("site_admin", th.BooleanType), ).to_dict() + + +class WorkflowsStream(GitHubRestStream): + """Defines 'workflows' stream.""" + + MAX_PER_PAGE = 100 + + name = "workflows" + path = "/repos/{org}/{repo}/actions/workflows" + primary_keys = ["id"] + replication_key = "updated_at" + parent_stream_type = RepositoryStream + ignore_parent_replication_key = True + state_partitioning_keys = ["repo", "org"] + records_jsonpath = "$.workflows[*]" + + schema = th.PropertiesList( + # Parent keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # PR keys + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("name", th.StringType), + th.Property("path", th.StringType), + th.Property("state", th.StringType), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("badge_url", th.StringType), + ).to_dict() + + def parse_response(self, response: requests.Response) -> Iterable[dict]: + """Parse the response and return an iterator of result rows.""" + yield from extract_jsonpath(self.records_jsonpath, input=response.json()) + + +class WorkflowRunsStream(GitHubRestStream): + """Defines 'workflow_runs' stream.""" + + name = "workflow_runs" + path = "/repos/{org}/{repo}/actions/runs" + primary_keys = ["id"] + replication_key = "updated_at" + parent_stream_type = RepositoryStream + ignore_parent_replication_key = False + state_partitioning_keys = ["repo", "org"] + records_jsonpath = "$.workflow_runs[*]" + + schema = th.PropertiesList( + # Parent keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # PR keys + th.Property("id", th.IntegerType), + th.Property("name", th.StringType), + th.Property("node_id", th.StringType), + th.Property("head_branch", th.StringType), + th.Property("head_sha", th.StringType), + th.Property("run_number", th.IntegerType), + th.Property("event", th.StringType), + th.Property("status", th.StringType), + th.Property("conclusion", th.StringType), + th.Property("workflow_id", th.IntegerType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property( + "pull_requests", + th.ArrayType( + th.ObjectType( + th.Property("id", th.IntegerType), + th.Property("number", th.IntegerType), + ) + ), + ), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + th.Property("jobs_url", th.StringType), + th.Property("logs_url", th.StringType), + th.Property("check_suite_url", th.StringType), + th.Property("artifacts_url", th.StringType), + th.Property("cancel_url", th.StringType), + th.Property("rerun_url", th.StringType), + th.Property("workflow_url", th.StringType), + ).to_dict() + + def parse_response(self, response: requests.Response) -> Iterable[dict]: + """Parse the response and return an iterator of result rows.""" + yield from extract_jsonpath(self.records_jsonpath, input=response.json()) + + def get_child_context(self, record: dict, context: Optional[dict]) -> dict: + """Return a child context object from the record and optional provided context. + By default, will return context if provided and otherwise the record dict. + Developers may override this behavior to send specific information to child + streams for context. + """ + return { + "org": context["org"] if context else None, + "repo": context["repo"] if context else None, + "run_id": record["id"], + } + + +class WorkflowRunJobsStream(GitHubRestStream): + """Defines 'workflow_run_jobs' stream.""" + + MAX_PER_PAGE = 100 + + name = "workflow_run_jobs" + path = "/repos/{org}/{repo}/actions/runs/{run_id}/jobs" + primary_keys = ["id"] + parent_stream_type = WorkflowRunsStream + ignore_parent_replication_key = False + state_partitioning_keys = ["repo", "org", "run_id"] + records_jsonpath = "$.jobs[*]" + + schema = th.PropertiesList( + # Parent keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # PR keys + th.Property("id", th.IntegerType), + th.Property("run_id", th.IntegerType), + th.Property("run_url", th.StringType), + th.Property("node_id", th.StringType), + th.Property("head_sha", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("status", th.StringType), + th.Property("conclusion", th.StringType), + th.Property("started_at", th.DateTimeType), + th.Property("completed_at", th.DateTimeType), + th.Property("name", th.StringType), + th.Property( + "steps", + th.ArrayType( + th.ObjectType( + th.Property("name", th.StringType), + th.Property("status", th.StringType), + th.Property("conclusion", th.StringType), + th.Property("number", th.IntegerType), + th.Property("started_at", th.DateTimeType), + th.Property("completed_at", th.DateTimeType), + ) + ), + ), + th.Property("check_run_url", th.StringType), + th.Property("labels", th.ArrayType(th.StringType)), + th.Property("runner_id", th.IntegerType), + th.Property("runner_name", th.StringType), + th.Property("runner_group_id", th.IntegerType), + th.Property("runner_group_name", th.StringType), + ).to_dict() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._schema_emitted = False + + def parse_response(self, response: requests.Response) -> Iterable[dict]: + """Parse the response and return an iterator of result rows.""" + yield from extract_jsonpath(self.records_jsonpath, input=response.json()) + + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + params = super().get_url_params(context, next_page_token) + params["filter"] = "all" + return params + + def _write_schema_message(self): + """Write out a SCHEMA message with the stream schema.""" + if not self._schema_emitted: + super()._write_schema_message() + self._schema_emitted = True diff --git a/tap_github/tap.py b/tap_github/tap.py index 8affb5f1..b0eefb35 100644 --- a/tap_github/tap.py +++ b/tap_github/tap.py @@ -21,6 +21,9 @@ RepositoryStream, StargazersStream, StatsContributorsStream, + WorkflowsStream, + WorkflowRunJobsStream, + WorkflowRunsStream, ) from tap_github.user_streams import ( StarredStream, @@ -114,6 +117,9 @@ def discover_streams(self) -> List[Stream]: RepositoryStream(tap=self), StargazersStream(tap=self), StatsContributorsStream(tap=self), + WorkflowsStream(tap=self), + WorkflowRunJobsStream(tap=self), + WorkflowRunsStream(tap=self), ]