Skip to content
This repository was archived by the owner on Jul 2, 2024. It is now read-only.

Commit b077692

Browse files
Daniel LennonDaniel Lennon
Daniel Lennon
authored and
Daniel Lennon
committed
Switch from asyncpg to pscopg2
(asyncpg was incompatible with Aircloak) Overhaul bucketing logic Build queries using psycopg2.sql instead of string interpolation Enable export of bucketed data as pandas contructor args Add a jupyter notebook with basic examples etc.
1 parent d92148a commit b077692

7 files changed

+284
-98
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
.venv
22
.vscode
3+
*.pyc

explorer/__init__.py

Whitespace-only changes.

explorer/buckets.py

+89-30
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,104 @@
11
import logging
22

3+
# assume for now that we want at least 20 values per bucket (valid?)
4+
# also that the smallest useful bucket size is at 1/100 of the total range
5+
MAX_BUCKETS = 100
6+
MIN_BUCKET_COUNT = 20
7+
8+
'''
9+
A note about terminology:
10+
Bucket size: The range of values contained in a bucket
11+
Bucket count: The number of values contained in a bucket
12+
Bucket number: The number of buckets contained in a range
13+
14+
For example for 1000 values in the range 0 - 100, we may have:
15+
Bucket size: 5
16+
Bucket count: 50 on average
17+
Bucket number: 20
18+
'''
19+
320

421
class Buckets():
522
def __init__(self):
623
self.buckets = sorted([base * (10 ** exponent)
724
for base in [1, 2, 5] for exponent in range(-4, 20)])
825

9-
def estimate_bucket_size(self, lower, upper):
10-
# If the lower bound is higher than the upper bound, prioritise the lower bound
11-
# to avoid having too many useless buckets
12-
#
13-
if lower > upper:
14-
return self._next_after(lower)
26+
def estimate_bucket_size(self, value_range: float, value_count: int,
27+
num_buckets=MAX_BUCKETS, min_bucket_count=MIN_BUCKET_COUNT) -> int:
28+
'''Estimate a suitable bucket size based on desired precision and size restrictions.
29+
30+
:param value_range: The size of the value range to be bucketed.
31+
:param value_count: The number of values contained in the dataset.
32+
:param num_buckets: The desired number of buckets for sufficient precision / resolution.
33+
:param min_bucket_count: The lowest number of values desired in each bucket
34+
:returns: A suitable bucket size.count
35+
36+
For example, the dataset contains 10_000 values in the range 2042 -> 5683.
37+
The value_range is 5683 - 2042 = 3641
38+
If num_buckets is 100, the average bucket size is 36.4 for an estimated bucket count
39+
of 100.
40+
At the min_bucket_count of 20 we would have 10_000 / 20 = 500 buckets of size 3641 / 500 = 7.2.
41+
So we would like a bucket size of at least 7.2 and we are targeting 36.4 for sufficient precision.
42+
In this range there are two suitable bucket sizes: 10 and 20.
43+
>>> Buckets().estimate_bucket_size(5683 - 2042, 10_000)
44+
10
45+
46+
Note:
47+
- The returned size may not meet both of the desired criteria.
48+
- The min_bucket_count takes priority.
49+
'''
50+
# Estimate lower and upper bounds for the bucket size
51+
precision_bound = value_range / num_buckets
52+
size_bound = value_range / (value_count / min_bucket_count)
53+
54+
bs_candidates = self.buckets_in_range(size_bound, precision_bound)
55+
56+
if len(bs_candidates) == 0:
57+
# No bucket sizes within the range, prioritise the size bound
58+
return self._next_after(size_bound)
1559
else:
16-
bs_candidate_lower = self._next_after(lower)
17-
bs_candidate_upper = self._first_before(upper)
18-
if bs_candidate_lower == bs_candidate_upper:
19-
return bs_candidate_lower
20-
else:
21-
# If both estimates fall outside the intended range, choose the closest
22-
if bs_candidate_upper < lower and bs_candidate_lower > upper:
23-
diff_below = lower - bs_candidate_upper
24-
diff_above = bs_candidate_lower - upper
25-
if diff_below < diff_above:
26-
return bs_candidate_upper
27-
else:
28-
return bs_candidate_lower
29-
# Otherwise if the lower estimate is within the bounds, choose it
30-
elif bs_candidate_lower < upper:
31-
return bs_candidate_lower
32-
# Otherwise check that the upper estimate is within bounds, if so, choose it
33-
elif bs_candidate_upper > lower:
34-
return bs_candidate_upper
35-
# If none of these conditions apply, something has gone wrong...
36-
else:
37-
logging.error(
38-
f'Unable to estimate bucket size for range {lower} -> {upper}')
39-
return 0
60+
# Otherwise choose the largest bucket size within the range
61+
return max(bs_candidates)
62+
63+
# bs_candidate_lower = self._next_after(lower)
64+
# bs_candidate_upper = self._first_before(upper)
65+
# if bs_candidate_lower == bs_candidate_upper:
66+
# # There is only one bucket size that falls within the desired range
67+
# return bs_candidate_lower
68+
# else:
69+
# # If both estimates fall outside the intended range, choose estimate
70+
# # based on the lower bound
71+
# return bs_candidate_lower
72+
# # Otherwise if the lower estimate is within the bounds, choose it
73+
# elif bs_candidate_lower < upper:
74+
# return bs_candidate_lower
75+
# # Otherwise check that the upper estimate is within bounds, if so, choose it
76+
# elif bs_candidate_upper > lower:
77+
# return bs_candidate_upper
78+
# # If none of these conditions apply, something has gone wrong...
79+
# else:
80+
# logging.error(
81+
# f'Unable to estimate bucket size for range {lower} -> {upper}')
82+
# return 0
4083

4184
def _next_after(self, val):
4285
return next(v for v in self.buckets if v > val)
4386

4487
def _first_before(self, val):
4588
return next(v for v in reversed(self.buckets) if v < val)
89+
90+
def buckets_smaller_than(self, val):
91+
return (v for v in self.buckets if v < val)
92+
93+
def buckets_larger_than(self, val):
94+
return (v for v in self.buckets if v > val)
95+
96+
def buckets_in_range(self, lo, hi) -> set:
97+
smaller_than_hi = set(self.buckets_smaller_than(hi))
98+
larger_than_lo = set(self.buckets_larger_than(lo))
99+
return smaller_than_hi & larger_than_lo
100+
101+
102+
if __name__ == "__main__":
103+
import doctest
104+
doctest.testmod()

explorer/connection.py

+20-12
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,31 @@
1-
import asyncio
2-
import asyncpg
1+
import psycopg2
2+
from psycopg2.extras import DictCursor
33
import logging
44

55

66
class AircloakConnection():
7-
def __init__(self, **kwargs):
7+
def __init__(self, *, dbname):
88
self.user = 'daniel-613C7ADF4535BB56DBCD'
99
self.port = 9432
1010
self.host = 'attack.aircloak.com'
11-
self.database = 'gda_banking'
11+
self.dbname = dbname
1212

13-
async def connect(self):
14-
self.conn = await asyncpg.connect(user=self.user, host=self.host, port=self.port, database=self.database)
13+
logging.debug(
14+
f'Connecting to Aircloak: user={self.user}, host={self.host}, port={self.port}, dbname={self.dbname}')
1515

16-
async def close(self):
17-
await self.conn.close()
16+
self.conn = psycopg2.connect(
17+
user=self.user, host=self.host, port=self.port, dbname=self.dbname, cursor_factory=DictCursor)
18+
19+
def close(self):
20+
self.conn.close()
21+
22+
def fetch(self, query):
23+
logging.debug(f'Sending query: {query.as_string(self.conn)}')
24+
with self.conn.cursor() as cur:
25+
cur.execute(query)
26+
result = {
27+
'rows': cur.fetchall(),
28+
'labels': [col.name for col in cur.description]
29+
}
1830

19-
async def run_query(self, query_fn, **query_args):
20-
query_str = query_fn(**query_args)
21-
logging.debug(f'Querying: {query_str}')
22-
result = await self.conn.fetch(query_str)
2331
return result

explorer/numeric.py

+40-46
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,62 @@
1-
import asyncio
2-
import asyncpg
31
import logging
42

5-
from buckets import Buckets
6-
from connection import AircloakConnection
3+
from .buckets import Buckets
4+
from .connection import AircloakConnection
75

8-
import queries
6+
from . import queries
97

10-
# assume for now that we want at least 20 values per bucket (valid?)
11-
# also that the smallest useful bucket size is at 1/100 of the total range
12-
MAX_BUCKETS = 100
13-
MIN_BUCKET_SIZE = 20
148

9+
class Explorer:
10+
def __init__(self, *, dbname):
11+
self.stats = {}
12+
self.ac = AircloakConnection(dbname=dbname)
1513

16-
async def explore_numeric_col(table: str, column: str, max_buckets=MAX_BUCKETS, min_bucket_size=MIN_BUCKET_SIZE):
17-
ac = AircloakConnection()
18-
await ac.connect()
14+
def explore_numeric_col(self, *, table: str, column: str):
15+
stats = self.ac.fetch(
16+
queries.top_level_stats(table=table, column=column))
1917

20-
stats = await ac.run_query(queries.top_level_stats, table=table, column=column)
21-
distincts = await ac.run_query(queries.top_level_distinct, table=table, column=column)
18+
distincts = self.ac.fetch(queries.top_level_distinct(
19+
table=table, column=column))
2220

23-
stats = stats[0]
24-
count_total = stats['count']
25-
suppresed_count = count_suppressed(distincts, 0)
21+
stats = stats['rows'][0]
22+
suppressed_count = count_suppressed(distincts['rows'], column)
2623

27-
suppressed_ratio = suppresed_count / count_total
24+
suppressed_ratio = suppressed_count / stats['count']
2825

29-
if suppressed_ratio > 0.05:
30-
# too many supressed values, lets drill down
31-
value_range = stats['max'] - stats['min']
26+
if suppressed_ratio > 0.05:
27+
# too many supressed values, lets drill down
28+
value_range = stats['max'] - stats['min']
3229

33-
# Estimate lower and upper bounds for the bucket size
34-
bs_lower_bound = value_range / max_buckets
35-
bs_upper_bound = value_range / (count_total / min_bucket_size)
30+
bucket_size = Buckets().estimate_bucket_size(
31+
value_range, stats['count'])
3632

37-
bucket_size = Buckets().estimate_bucket_size(bs_lower_bound, bs_upper_bound)
33+
self.stats[(table, column)] = self.ac.fetch(
34+
queries.bucketed_stats(table=table, column=column, bucket_size=bucket_size))
3835

39-
bucketed_stats = await ac.run_query(queries.bucketed_stats, table=table, column=column, bucket_size=bucket_size)
36+
# TODO: check quality of returned buckets and, if necessary launch more queries with adjusted bucket size.
4037

41-
# TODO: check quality of returned buckets and, if necessary launch more queries.
38+
def histogram(self, *, table, column):
39+
stats = self.stats[(table, column)]['rows']
40+
return [row['bucket'] for row in stats[1:]], [row['count'] for row in stats[1:]]
4241

43-
await ac.close()
42+
def to_dataframe(self, *, table, column):
43+
stats = self.stats[(table, column)]
44+
return {
45+
'data': stats['rows'],
46+
'columns': stats['labels'],
47+
'index': None,
48+
}
4449

45-
return bucketed_stats
50+
def __del__(self):
51+
self.ac.close()
4652

4753

4854
def count_suppressed(rows, col, count_col='count'):
4955
return next(r[count_col] for r in rows if r[col] == None)
5056

5157

52-
def run_exp(exp):
53-
loop = asyncio.get_event_loop()
54-
loop.set_debug(True)
55-
return loop.run_until_complete(exp)
56-
57-
58-
# if __name__ == "__main__":
59-
# async def main():
60-
# logging.basicConfig(level=logging.DEBUG)
61-
# ac = AircloakConnection()
62-
# await ac.connect()
63-
# values = await ac.run_query('bucketed', table='loans', column='amount', bucket_size=10000)
64-
# logging.debug(values)
65-
66-
# loop = asyncio.get_event_loop()
67-
# loop.set_debug()
68-
# loop.run_until_complete(main())
58+
if __name__ == "__main__":
59+
e = Explorer(dbname='gda_banking')
60+
e.explore_numeric_col(table='loans', column='amount')
61+
x, y = e.histogram(table='loans', column='amount')
62+
print(x, y)

explorer/queries.py

+13-10
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
1+
import logging
2+
from psycopg2 import sql
13

2-
def top_level_distinct(table: str, column: str):
3-
return f'''
4+
5+
def top_level_distinct(*, table: str, column: str):
6+
return sql.SQL('''
47
SELECT
58
{column}
69
, count(*)
710
FROM {table}
811
GROUP BY 1
9-
ORDER BY count DESC
10-
'''
12+
ORDER BY 2 DESC
13+
''').format(table=sql.Identifier(table), column=sql.Identifier(column))
1114

1215

13-
def top_level_stats(table: str, column: str):
14-
return f'''
16+
def top_level_stats(*, table: str, column: str):
17+
return sql.SQL('''
1518
SELECT
1619
min({column})
1720
, max({column})
@@ -20,11 +23,11 @@ def top_level_stats(table: str, column: str):
2023
, count(*)
2124
, count_noise(*)
2225
FROM {table}
23-
'''
26+
''').format(table=sql.Identifier(table), column=sql.Identifier(column))
2427

2528

26-
def bucketed_stats(table: str, column: str, bucket_size: int):
27-
return f'''
29+
def bucketed_stats(*, table: str, column: str, bucket_size: int):
30+
return sql.SQL('''
2831
SELECT
2932
bucket({column} by {bucket_size})
3033
, {bucket_size} as bucket_size
@@ -36,4 +39,4 @@ def bucketed_stats(table: str, column: str, bucket_size: int):
3639
, count_noise(*)
3740
FROM {table}
3841
GROUP BY 1
39-
'''
42+
''').format(table=sql.Identifier(table), column=sql.Identifier(column), bucket_size=sql.Literal(bucket_size))

0 commit comments

Comments
 (0)