Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #2

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2543988
update to agree with drain result API
potash Oct 17, 2017
45cabc9
update data.py for result API
potash Oct 18, 2017
1957d57
update requirements.txt to use drain from github
potash Nov 7, 2017
0e7ca29
Merge pull request #3 from potash/issue/result-api
geneorama Nov 8, 2017
cd73362
Merge pull request #4 from potash/update/drain-master
geneorama Nov 8, 2017
ef00eb4
Predict on new addresses (#5)
potash Dec 21, 2017
6e1f6ce
save models in historical runs, too
potash Feb 5, 2018
37e6141
fix investigation inspected, complied features
potash Mar 7, 2018
407de07
cull unnecessary kid features and dead code
potash Mar 7, 2018
9fb3df3
combine paint violations features
potash Mar 7, 2018
66993a6
allow external left sources, e.g. alliance
potash Mar 7, 2018
06edaf3
use 2000 trees
potash Mar 7, 2018
2e81de7
rm wic aggregations, use fewer building permits and violations aggreg…
potash Mar 7, 2018
add079d
no kid and n_estimators experiments
potash Mar 7, 2018
61cd534
predict all addresses
potash Mar 9, 2018
6f19a88
move experiments out of workflows
potash Mar 9, 2018
2a89bc9
no kid features, hardcode commuinity area and ward ranges
potash Mar 9, 2018
b930bda
predict addresses today
potash Mar 9, 2018
caeb119
cleanup
potash Mar 12, 2018
48777b4
cleanup and updates to drain 0.0.6 , update README and requirements
potash Mar 13, 2018
eca0d23
predict on new addresses using geographies
potash Mar 13, 2018
3f0d5fb
Merge pull request #10 from potash/address-only
geneorama Mar 21, 2018
a50e0f6
fix max year
potash Oct 4, 2018
763b509
Merge pull request #11 from Chicago/fix/max_year
geneorama Oct 4, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ pip install -r requirements.txt
### 3. Run models using `drain`.
To fit a current model and make predictions change to `./lead` and run:
```
drain execute lead.model.workflows::bll6_forest_today ...
drain execute -w lead.model.workflows.address_predictions_today ...
```
Here `lead.model.workflows.bll6_forest_today` is a drain workflow, i.e. a function taking no arguments that returns collection of drain steps.
Here `lead.model.workflows.address_predictions_today()` is a drain workflow, i.e. a function taking no arguments that returns collection of drain steps.

For temporal cross validation use the `bll6_forest` workflow.
For temporal cross validation use the `kid_predictions_past` workflow in the same module.

# License

Expand Down
36 changes: 20 additions & 16 deletions lead/features/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,36 @@
'tract':'census_tract_id',
}

def get_deltas():
return {
'address': ['1y', '2y', '5y', '10y', 'all'],
#'complex': ['1y', '2y', '5y', '10y', 'all'],
'block': ['1y','2y','5y'],
'tract': ['1y','2y','3y']
}
deltas = {
'address': ['1y', '2y', '5y', '10y', 'all'],
#'complex': ['1y', '2y', '5y', '10y', 'all'],
'block': ['1y','2y','5y'],
'tract': ['1y','2y','3y']
}

# short list of deltas used for building permits and violations
deltas_thin = {
'address': ['1y', '5y', 'all'],
}

wic = {'kid': ['all']}
#wic = {'kid': ['all']}

def get_args(deltas):
def get_args():
return dict(
buildings = ['building', 'complex', 'block', 'tract'],
assessor = ['address', 'building', 'complex', 'block', 'tract'],
tests = deltas,
investigations = deltas,
#events = deltas,
permits = deltas,
kids = dict(kid=['all'], **deltas),
violations = util.dict_subset(deltas, ('address', 'block')),
wic_enroll = wic,
wic_birth = wic,
wic_prenatal = wic,
permits = deltas_thin,
kids = deltas,
violations = deltas_thin,
#wic_enroll = wic,
#wic_birth = wic,
#wic_prenatal = wic,
)

args = get_args(get_deltas())
args = get_args()

@lru_cache(maxsize=10)
def all_dict(dates=None, lag=None, parallel=True):
Expand Down
2 changes: 1 addition & 1 deletion lead/features/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from drain.step import Step
from drain.aggregate import Aggregate, Count
from drain.aggregation import SpacetimeAggregation
from drain.data import FromSQL, Merge
from drain.data import FromSQL

day = np.timedelta64(1, 'D')

Expand Down
6 changes: 3 additions & 3 deletions lead/features/investigations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from drain.data import FromSQL

day = np.timedelta64(1, 'D')
CLOSURE_CODES = range(1,12+1)
CLOSURE_CODES = [1,4,12] # complied, court, state attorney
DATE_COLUMNS = ['referral_date', 'init_date', 'comply_date', 'closure_date']
DATE_NAMES = ['referral', 'inspection', 'compliance', 'closure']

Expand Down Expand Up @@ -72,8 +72,8 @@ def get_aggregates(self, date, delta):

aggregates = [
Count(),
Aggregate('inspected', 'max', fname=False),
Aggregate('complied', 'max', fname=False),
Aggregate(lambda i: i.inspected.fillna(0), 'max', name='inspected', fname=False),
Aggregate(lambda i: i.complied.fillna(0), 'max', name='complied', fname=False),

Count('hazard_int', prop=True), Count('hazard_ext', prop=True),
Count('hazard', prop=True), Count('hazard_both', prop=True),
Expand Down
38 changes: 8 additions & 30 deletions lead/features/kids.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from drain import data
from drain.data import FromSQL, Merge, Revise
from drain.data import FromSQL, Revise
from drain.util import day
from drain.step import Step
from drain.step import Step, Call, MapResults
from drain.aggregation import SpacetimeAggregation
from drain.aggregate import Fraction, Count, Aggregate, Aggregator, days

Expand Down Expand Up @@ -38,7 +38,7 @@ def revise_kid_addresses(date):
for i in kid_addresses.inputs[0].inputs: i.target = True
for i in kids.inputs[0].inputs: i.target = True

return Merge(inputs=[kids, kid_addresses], on='kid_id')
return Call(kids, 'merge', [MapResults(kid_addresses, 'right')], on='kid_id')

class KidsAggregation(SpacetimeAggregation):
"""
Expand All @@ -55,8 +55,7 @@ def __init__(self, spacedeltas, dates, parallel=False):
kid_addresses = revise_kid_addresses(date=dates[0])
addresses = FromSQL(table='output.addresses')
addresses.target = True
self.inputs = [Merge(inputs=[kid_addresses, addresses],
on='address_id')]
self.inputs =[Call(kid_addresses, 'merge', [MapResults(addresses, 'right')], on='address_id')]

def get_aggregator(self, date, index, delta):
df = self.get_data(date, delta)
Expand All @@ -72,17 +71,6 @@ def get_aggregates(self, date, index, delta):
Aggregate(['test_address_count', 'address_count', 'test_count'],
'max', fname=False),
Aggregate(['max_bll'], 'max', fname=False),
# Comment out this and all other wic aggregates because they can't be lagged
# and they're not useful for predicting poisoning
#Aggregate(lambda k: k.last_wic_date == k.address_wic_max_date,
# 'any', 'last_wic_address', fname=False),
#Aggregate(['address_wic_mother', 'address_wic_infant'], 'any', fname=False),
#Aggregate([days('address_wic_max_date', date),
# days('address_wic_min_date', date),
# days('last_wic_date', date),
# days('first_wic_date', date)],
# ['max'], ['address_wic_min_date', 'address_wic_max_date',
# 'last_wic_date', 'first_wic_date'], fname=False)
]

sample_2y = lambda k: ((k.last_sample_date - k.date_of_birth)/day > 365*2) | (k.max_bll >= 6)
Expand All @@ -91,29 +79,20 @@ def get_aggregates(self, date, index, delta):
aggregates = [
counts,
Aggregate(['test_address_count', 'test_count', 'address_count'],
['median', 'mean', 'min', 'max']),
['mean', 'max']),

Count([lambda k: k.address_test_min_date.notnull(),
lambda k: k.first_sample_date.notnull()], prop=True,
name=['tested_here', 'tested_ever']),


#Count(lambda k: k.first_wic_date.notnull(), prop=True, name='wic'),

#Count([lambda k: k.address_wic_min_date.notnull() & k.address_test_min_date.notnull(),
# lambda k: k.address_wic_min_date.notnull() & k.first_sample_date.notnull()],
# name=['wic_tested_here', 'wic_tested_ever'],
# prop=lambda k: k.first_wic_date.notnull(), prop_name='wic'),
Aggregate([days('address_min_date', 'address_max_date'),
#days('address_wic_min_date', 'address_wic_max_date'),
days('address_test_min_date', 'address_test_max_date')],
['mean'], ['address_total_time', #'address_wic_time',
'address_test_time']),
Aggregate([days('address_min_date', 'address_max_date')],
['mean'], ['address_total_time']),

# the first of these are kid level, not address-kid level
# that means kids get double counted when aggregated to above the address level
# if they lived in multiple addresses on that e.g. census tract. oh well.
Aggregate(['max_bll', 'avg_bll', 'cumulative_bll', 'avg_cumulative_bll',
Aggregate(['max_bll', 'avg_cumulative_bll',
'mean_bll', 'address_max_bll', 'address_mean_bll'],
['mean', 'median', 'min', 'max']),

Expand Down Expand Up @@ -143,7 +122,6 @@ def get_aggregates(self, date, index, delta):
]
if delta == 'all':
aggregates.extend([
#Aggregate(days('address_wic_min_date', date), ['min', 'max'], 'days_since_wic'),
Aggregate(days('date_of_birth', date), ['min', 'max', 'mean'], 'date_of_birth'),
])

Expand Down
17 changes: 8 additions & 9 deletions lead/features/tests.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
from drain import data
from drain.util import day
from drain.data import FromSQL, Merge
from drain.step import Step
from drain.data import FromSQL
from drain.step import Step, Call
from drain.aggregation import SpacetimeAggregation
from drain.aggregate import Count, Fraction, Aggregate, days

import pandas as pd

tests = Merge(inputs=[
Merge(inputs=[
FromSQL(table='output.tests'),
FromSQL(table='output.addresses')], on='address_id'),
tests = Call(
Call(FromSQL(table='output.tests'), 'merge',
[FromSQL(table='output.addresses')], on='address_id'),
'merge',
# get kid first bll6 and bll10 counts to calculate incidences
FromSQL("""
[FromSQL("""
select kid_id, first_bll6_sample_date, first_bll10_sample_date
from output.kids
""")],
on='kid_id')
""")], on='kid_id')
tests.target = True

class TestsAggregation(SpacetimeAggregation):
Expand Down
14 changes: 8 additions & 6 deletions lead/features/violations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
from drain.aggregation import SpacetimeAggregation
from itertools import product

KEYWORDS = ['water', 'paint', 'window', 'wall', 'porch', 'chip', 'flak', 'peel']
KEYWORDS = (['water','window', 'wall', 'porch', '(paint|chip|flak|peel)'],
['water','window','wall','porch','paint'])

STATUS = (['OPEN', 'COMPLIED', 'NO ENTRY'],
['open', 'complied', 'no_entry'])

KEYWORD_COLUMNS = str.join(', ',
("violation_description ~* '{0}' "
"or violation_inspector_comments ~* '{0}' AS {0}".format(k)
for k in KEYWORDS))
"or violation_inspector_comments ~* '{0}' AS {1}".format(*k)
for k in zip(*KEYWORDS)))

STATUS_COLUMNS = str.join(', ',
("violation_status = '{0}' AS {1}".format(*s)
Expand All @@ -37,11 +39,11 @@ def __init__(self, spacedeltas, dates, parallel=False):
def get_aggregates(self, date, data):
aggregates = [
Count(),
Count(KEYWORDS, prop=True),
Count(KEYWORDS[1], prop=True),
Count(STATUS[1], prop=True),
Count([lambda v,k=k,s=s: v[k] & v[s]
for k,s in product(KEYWORDS, STATUS[1])], prop=True,
name=['%s_%s' % p for p in product(KEYWORDS, STATUS[1])]
for k,s in product(KEYWORDS[1], STATUS[1])], prop=True,
name=['%s_%s' % p for p in product(KEYWORDS[1], STATUS[1])]
)
]

Expand Down
18 changes: 9 additions & 9 deletions lead/features/wic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from drain import util, aggregate, data
from drain.aggregate import Aggregate, Count, aggregate_counts, days
from drain.aggregation import SpacetimeAggregation
from drain.step import Construct
from drain.step import Call
from drain.data import FromSQL, binarize, binarize_sets, select_regexes
from drain.util import list_filter_none, union

Expand All @@ -30,9 +30,9 @@
from enroll
""", tables=['aux.kid_wics', 'aux.kid_mothers'], parse_dates=['register_d', 'last_upd_d'])

enroll2 = Construct(binarize, inputs=[enroll], category_classes=['employment', 'occupation', 'clinic'], min_freq=100)
enroll2 = Call(binarize, inputs=[enroll], category_classes=['employment', 'occupation', 'clinic'], min_freq=100)

enroll3 = Construct(binarize_sets, inputs=[enroll2], columns=['assistance', 'language'], cast=True, min_freq=100)
enroll3 = Call(binarize_sets, inputs=[enroll2], columns=['assistance', 'language'], cast=True, min_freq=100)
enroll3.target=True

class EnrollAggregation(SpacetimeAggregation):
Expand All @@ -46,7 +46,7 @@ def __init__(self, spacedeltas, dates, parallel=False):
parallel=parallel)

def get_aggregates(self, date, delta):
enroll = self.inputs[0].get_result()
enroll = self.inputs[0].result
aggregates = [
Aggregate('medical_risk', 'any', fname=False),
Aggregate(['household_size', 'household_income'],
Expand Down Expand Up @@ -74,8 +74,8 @@ def get_aggregates(self, date, delta):
""", tables=['aux.kids', 'aux.kid_mothers'], parse_dates=['date_of_birth'])


births2 = Construct(binarize, inputs=[births], category_classes=['place_type', 'disposition'])
births3 = Construct(binarize_sets, inputs=[births2], columns=['complication'], cast=True)
births2 = Call(binarize, inputs=[births], category_classes=['place_type', 'disposition'])
births3 = Call(binarize_sets, inputs=[births2], columns=['complication'], cast=True)
births3.target = True

class BirthAggregation(SpacetimeAggregation):
Expand All @@ -89,7 +89,7 @@ def __init__(self, spacedeltas, dates, parallel=False):
parallel=parallel)

def get_aggregates(self, date, delta):
births = self.inputs[0].get_result()
births = self.inputs[0].result
aggregates = [
Aggregate('length', 'max', fname=False),
Aggregate('weight', 'max', fname=False),
Expand All @@ -112,7 +112,7 @@ def get_aggregates(self, date, delta):
""", tables=['aux.kids', 'aux.kid_mothers'], parse_dates=['date_of_birth', 'visit_d'])
prenatal.target = True

prenatal2 = Construct(binarize, inputs=[prenatal], category_classes=['service'])
prenatal2 = Call(binarize, inputs=[prenatal], category_classes=['service'])

class PrenatalAggregation(SpacetimeAggregation):
def __init__(self, spacedeltas, dates, parallel=False):
Expand All @@ -125,7 +125,7 @@ def __init__(self, spacedeltas, dates, parallel=False):
parallel=parallel)

def get_aggregates(self, date, delta):
prenatal = self.inputs[0].get_result()
prenatal = self.inputs[0].result

aggregates = [
Count(),
Expand Down
35 changes: 33 additions & 2 deletions lead/model/address.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,43 @@
from drain.step import Step
from drain.util import timestamp, cross_join
from drain.data import FromSQL, Merge
from drain.data import FromSQL

import pandas as pd
import numpy as np
import logging

addresses = FromSQL(table='output.addresses')
# in addition to all addresses, we add all cells in the partition
# created by intersecting blocks, wards and communities
# in anticipation of any new addresses in deployment
addresses = FromSQL("""
with blocks as (
select
b.geoid10::double precision census_block_id,
substring(b.geoid10 for 11)::double precision census_tract_id,
c.area_numbe::int community_area_id,
w.ward::int ward_id
from input.census_blocks b
join input.community_areas c
on st_intersects(b.geom, c.geom)
join input.wards w
on st_intersects(b.geom, w.geom) and st_intersects(c.geom, w.geom)
group by 1,2,3,4
)
select
null address,
null address_lat,
null address_lng,
null as address_id,
null as building_id,
null as complex_id, *
from blocks
UNION ALL
select address, address_lat, address_lng,
address_id, building_id, complex_id,
census_block_id, census_tract_id,
community_area_id, ward_id
from output.addresses
""", tables=['output.addresses', 'input.census_blocks', 'input.census_tracts', 'input.community_areas', 'input.wards'])
addresses.target = True

class LeadAddressLeft(Step):
Expand Down
Loading