Skip to content

Commit

Permalink
Basic func tests (#7)
Browse files Browse the repository at this point in the history
### Summary

Adding in the basic function tests provided by
[dbt-tests-adapter](https://github.com/dbt-labs/dbt-core/tree/HEAD/tests/adapter).
This includes basic tests that every adapter plugin is expected to pass.

### Description

Merged[ rest-api-poc
branch](https://github.com/dremio/dbt-dremio/tree/rest-api-poc) with
this branch, which is why API changes are included. Along with basic
tests, changes have been made to impl.py and connections.py so the
create_schema macro points to Python code. In this Python code, we use
the Dremio rest api to create a space and any required folders.

TODO: Implement drop_schema() and the delete call in API. 

### Related Issue

#6

### Additional Reviewers
@ArgusLi 
@jlarue26

Co-authored-by: Jared LaRue <[email protected]>
  • Loading branch information
ravjotbrar and Jared LaRue authored Oct 16, 2022
1 parent b9ced2f commit eb38160
Show file tree
Hide file tree
Showing 23 changed files with 1,381 additions and 312 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ MANIFEST
pip-log.txt
pip-delete-this-directory.txt

# Debug Logs
logs/

# Unit test / coverage reports
htmlcov/
.tox/
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/dremio/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.1.0b_odbc"
version = "1.1.0b"
11 changes: 8 additions & 3 deletions dbt/adapters/dremio/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
## __init__.py
# __init__.py
from .basic import login
from .endpoints import sql_endpoint
from .endpoints import job_status
from .endpoints import (
delete_catalog,
sql_endpoint,
job_status,
create_catalog_api,
get_catalog_item,
)
112 changes: 93 additions & 19 deletions dbt/adapters/dremio/api/cursor.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,103 @@
from dbt.adapters.dremio.api.endpoints import sql_endpoint, job_status, job_results, job_cancel
import agate

from dbt.adapters.dremio.api.endpoints import (
sql_endpoint,
job_status,
job_results,
job_cancel_api,
)
from dbt.adapters.dremio.api.parameters import Parameters

from dbt.events import AdapterLogger

logger = AdapterLogger("dremio")


class DremioCursor:
def __init__(self, api_parameters: Parameters):
self._parameters = api_parameters
self._job_id = None

self._closed = False

self._job_id = None
self._rowcount = -1
self._job_results = None
self._table_results: agate.Table = None

@property
def parameters(self):
return self._parameters

@property
def closed(self):
return self._closed

@closed.setter
def closed(self, new_closed_value):
self._closed = new_closed_value

@property
def rowcount(self):
return self._rowcount

@property
def table(self) -> agate.Table:
return self._table_results

def job_results(self):
if self.closed:
raise Exception("CursorClosed")
if not self.closed:
json_payload = job_results(self._parameters, self._job_id, offset=0, limit=100, ssl_verify=True)
if self._job_results == None:
self._populate_job_results()

return self._job_results

return json_payload

def job_cancel(self):
#cancels current job
# cancels current job
logger.debug(f"Cancelling job {self._job_id}")
return job_cancel(self._parameters, self._job_id)
return job_cancel_api(self._parameters, self._job_id)

def close(self):
if self.closed:
raise Exception("CursorClosed")
self._initialize()
self.closed = True

def execute(self, sql, bindings=None):
if self.closed:
raise Exception("CursorClosed")
if bindings is None:
json_payload = sql_endpoint(self._parameters, sql, context=None, ssl_verify=True)
self._initialize()

json_payload = sql_endpoint(
self._parameters, sql, context=None, ssl_verify=True
)

self._job_id = json_payload["id"]

self._populate_rowcount()
self._populate_job_results()
self._populate_results_table()

else:
raise Exception("Bindings not currently supported.")

@property
def rowcount(self):
def fetchone(self):
row = None
if self._table_results != None:
row = self._table_results.rows[0]
return row

def fetchall(self):
logger.debug(f"The fetch result is: {self._table_results.rows}")
return self._table_results.rows

def _initialize(self):
self._job_id = None
self._rowcount = -1
self._table_results = None
self._job_results = None

def _populate_rowcount(self):
if self.closed:
raise Exception("CursorClosed")
## keep checking job status until status is one of COMPLETE, CANCELLED or FAILED
Expand All @@ -60,21 +111,44 @@ def rowcount(self):
while True:
if job_status_state != last_job_state:
logger.debug(f"Job State = {job_status_state}")
if job_status_state == "COMPLETED" or job_status_state == "CANCELLED" or job_status_state == "FAILED":
if (
job_status_state == "COMPLETED"
or job_status_state == "CANCELLED"
or job_status_state == "FAILED"
):
break
last_job_state = job_status_state
job_status_response = job_status(self._parameters, job_id, ssl_verify=True)
job_status_state = job_status_response["jobState"]

#this is done as job status does not return a rowCount if there are no rows affected (even in completed job_state)
#pyodbc Cursor documentation states "[rowCount] is -1 if no SQL has been executed or if the number of rows is unknown.
# this is done as job status does not return a rowCount if there are no rows affected (even in completed job_state)
# pyodbc Cursor documentation states "[rowCount] is -1 if no SQL has been executed or if the number of rows is unknown.
# Note that it is not uncommon for databases to report -1 immediately after a SQL select statement for performance reasons."
if "rowCount" not in job_status_response:
rows = -1
logger.debug("rowCount does not exist in job_status payload")
else:
rows = job_status_response["rowCount"]

return rows


self._rowcount = rows

def _populate_job_results(self):
if self._job_results == None:
self._job_results = job_results(
self._parameters, self._job_id, offset=0, limit=100, ssl_verify=True
)

def _populate_results_table(self):
if self._job_results != None:
tester = agate.TypeTester()
json_rows = self._job_results["rows"]
self._table_results = json_rows
for col in self._job_results["schema"]:
name = col["name"]
data_type_str = col["type"]["name"]
if data_type_str == "BIGINT":
tester = agate.TypeTester(force={f"{name}": agate.Number()})

self._table_results = agate.Table.from_object(
json_rows, column_types=tester
)
Loading

0 comments on commit eb38160

Please sign in to comment.