-
Notifications
You must be signed in to change notification settings - Fork 0
Python Classes
This section describes the various Python classes and their methods used throughout this project. The goal is that future developers (or maintainers) can utilize this wiki to better understand the code, and consequently, add additional features/functionality as needed. Below you will find a detailed description of 3 Classes and their methods.
class ConfigThis class uses Airflow's BaseHook class to set connection details for the project's database, S3 bucket, and APIs. The config class is currently only configured to accept API connection details from the data.mo.gov API; however, additional connections can be configured as needed.
All connections must be defined within Airflow, then they can be extracted by running the following commands:
from airflow.hooks.base_hook import BaseHook
DATABASE_CONN = BaseHook.get_connection('postgres_default')The _CONN objects produce the following class variables:
-
DATABASE_USERNAMEstring: PostgreSQL DB username -
DATABASE_PASSWORDstring: PostgreSQL DB password -
DATABASE_PORTnumeric: PostgreSQL DB port -
DATABASE_NAMEstring: PostgreSQL DB name -
S3_BUCKETstring: AWS S3 bucket name -
AWS_ACCESS_KEY_IDstring: AWS access key -
AWS_SECRET_ACCESS_KEYstring: AWS secret access key -
AWS_REGION_NAMEstring: AWS region name -
API_TOKENstring: API access token -
API_HOSTstring: API base URL -
API_USER_EMAILstring: API user email -
API_USER_PWDstring: API user password
class ScraperThis is a simple class that was built to extract and transform data. Originally designed to handle the fetching of data from a URL, this class also has methods that transform "static" files in an AWS S3 bucket.
url- A URL string where the scraper will begin scraping data from. This is a required argument; however, if you are using the scraper for transformation of static files in an S3 bucket (method:
s3_transform_to_s3), the url parameter is not used and can be set toNone.
Config- An instance of the Config class. This is a required argument of the scraper class and is used to define the following class variables:
-
bucket_namestring: name of the S3 bucket (data source/sink) to connect to -
aws_access_key_idstring: AWS access key -
aws_secret_access_keystring: AWS secret access key -
api_tokenstring: API token for API configured in Config class -
api_user_emailstring: email associated with API -
api_user_pwdstring: password associated with API
-
connect_s3_sink(self)- Method to create AWS S3 connection object
s3_conn. Uses the various AWS class variables to make connection. This connection must be made before using any other class method.
url_to_s3(self, filename, filters=None, nullstr='')- Method used to extract data from a URL and upload data as
.csvfile to the S3 bucket. - Arguments:
-
filenamestring: name given to data file uploaded to S3 -
filtersdictionary:{column <string>: accepted values <list>}this argument is used to filter a specific column to only include a set of values -
nullstrstring: string assigned to null values upon writing to.csvfile; only used when filtering
-
api_to_s3(self, filename, table_name, limit=2000)- Method used to fetch data from an API and upload as
.csvfile to S3 bucket. Designed for specific use with data.mo.gov API. - Arguments:
-
filenamestring: name given to data file uploaded to S3 -
table_namestring: name of data set fetched from API -
limitnumeric: number of records to fetch from API
-
url_transform_to_s3(self, filename, transformer, sep='|')- Method used to fetch data from a URL and transform that data before uploading the data to S3 bucket.
- Arguments:
-
filenamestring: name given to data file uploaded to S3 -
transformerfunction: transformation function applied to data at URL (functions defined in and called from/scripts/url_transformers.py -
sepstring: CSV file delimiter
-
s3_transform_to_s3(self, data, output_filename, resource_path, transformer, sep='|')- Method used to transform static data files within the S3 bucket. Method uses the
dataargument to identify which files need transformation before uploading the transformed files back to the S3 bucket. - Arguments:
-
datastring: file or folder name in which the data for transformation can be found -
output_filenamestring: name given to transformed data file uploaded to S3 -
resource_pathstring: path to project's/resourcesdirectory; used to load JSON files as dictionary objects that can be used to guide transformations -
transformerfunction: transformation function applied to data in S3 (functions defined in and called from/scripts/s3_transformers.py -
sepstring: CSV file delimiter
-
class DatabaseA simple class used to upload .csv files from the project's S3 bucket to the PostgrSQL database. The main method of this class csv_to_table uses a SQL copy command to quickly load data into the appropriate database tables.
Config- An instance of the Config class. This is a required argument of the database class and is used to define the following class variables:
-
hoststring: PostgreSQL database connection string -
usernamestring: PostgreSQL database username -
passwordstring: PostgreSQL database password -
portnumeric: PostgreSQL database port -
dbnamestring: PostgreSQL database name -
bucket_namestring: name of the S3 bucket (data source) to connect to -
aws_access_key_idstring: AWS access key -
aws_secret_access_keystring: AWS secret access key
-
connect(self)- Method to create database connection object
conn. Uses the various database class variables to make connection. This connection must be made before loading data to the database.
connect_s3_source(self)- Method to create AWS S3 connection object
s3_conn. Uses the various AWS class variables to make connection. This connection must be made before loading data to the database.
csv_to_table(self, filename, table_name, sep=',', nullstr='NaN')- Main method used to load data from flat files within the S3 bucket to database tables. Uses a PostgreSQL
copycommand to efficiently load data. - Arguments:
-
filenamestring: name of data file within S3 to be uploaded to the database -
table_namestring: name of the database table to copy data to -
sepstring: CSV file delimiter -
nullstrstring: string assigned to null values in CSV file
-
close(self)- Method closes PostgreSQL database connection. AWS connection automatically closes after set time.
This section shows the functions (aka "callables") that are used by Airflow's Python Operators throughout this workflow. The callables utilize the classes and methods described above. Tasks include downloading data from the web and saving it as a flat file in S3, copying flat files to database tables, or transforming static files to make the data more compatible with the database. There are 5 main callables; however, the count of these functions is expected to grow as additional data sources (with new input/output logic) are incorporated into the 211Dashboard pipeline.
def scrape_file(**kwargs):
'''Calls "url_to_s3" method of Scraper class'''
s = Scraper(kwargs['url'], Config)
s.connect_s3_sink()
s.url_to_s3(filename=kwargs['filename'],
filters=kwargs['filters'],
nullstr=kwargs['nullstr'])def scrape_api(**kwargs):
'''Calls "api_to_s3" method of Scraper class.'''
s = Scraper(kwargs['url'], Config)
s.connect_s3_sink()
s.api_to_s3(filename=kwargs['filename'],
table_name=kwargs['table_name'],
limit=kwargs['limit'])def scrape_transform(**kwargs):
'''Calls "url_transform_to_s3" method of Scraper class'''
s = Scraper(kwargs['url'], Config)
s.connect_s3_sink()
s.url_transform_to_s3(filename=kwargs['filename'],
transformer=kwargs['transformer'],
sep=kwargs['sep'])def transform_static_s3(**kwargs):
'''Calls "s3_transform_to_s3" method of Scraper class'''
s = Scraper(None, Config) # None type url
s.connect_s3_sink()
s.s3_transform_to_s3(data=kwargs['data'],
output_filename=kwargs['filename'],
resource_path=kwargs['resource_path'],
transformer=kwargs['transformer'],
sep=kwargs['sep'])def load_file(**kwargs):
'''Calls "csv_to_table" method of Database class'''
#NOTE table_name must be truncated before
db = Database(Config)
db.connect()
db.connect_s3_source()
db.csv_to_table(filename=kwargs['filename'],
table_name=kwargs['table_name'],
sep=kwargs['sep'],
nullstr=kwargs['nullstr'])
db.close()