Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions buildstock_query/aggregate_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ def aggregate_annual(self, *,
else:
upgrade_id = self._bsq._validate_upgrade(params.upgrade_id)
enduse_cols = self._bsq._get_enduse_cols(params.enduses, table='upgrade')

total_weight = self._bsq._get_weight(weights)
enduse_selection = [safunc.sum(enduse * total_weight).label(self._bsq._simple_label(enduse.name))
enduse_selection = [self._bsq._agg_column(enduse, total_weight, params.agg_func)
for enduse in enduse_cols]
if params.get_quartiles:
enduse_selection += [sa.func.approx_percentile(enduse, [0, 0.02, 0.1, 0.25, 0.5, 0.75, 0.9, 0.98, 1]).label(
Expand Down Expand Up @@ -94,6 +93,7 @@ def _aggregate_timeseries_light(self,
new_query = params.copy()
new_query.enduses = [enduse.name]
new_query.split_enduses = False
new_query.get_query_only = True
query = self.aggregate_timeseries(params=new_query)
batch_queries_to_submit.append(query)

Expand All @@ -109,14 +109,17 @@ def _aggregate_timeseries_light(self,
result_dfs = self._bsq.get_batch_query_result(batch_id=batch_query_id, combine=False)
logger.info("Joining the individual enduses result into a single DataFrame")
group_by = self._bsq._clean_group_by(params.group_by)
for res in result_dfs:
res.set_index(group_by, inplace=True)
if not params.collapse_ts and 'time' not in group_by:
group_by.append('time')
for i, res in enumerate(result_dfs):
if group_by:
res.set_index(group_by, inplace=True)
if i > 0:
res.drop(columns=['sample_count', 'units_count'], inplace=True, errors='ignore')
self.result_dfs = result_dfs
joined_enduses_df = result_dfs[0].drop(columns=['query_id'])
for enduse, res in list(zip(params.enduses, result_dfs))[1:]:
if not isinstance(enduse, str):
enduse = enduse.name
joined_enduses_df = joined_enduses_df.join(res[[enduse]])
joined_enduses_df = result_dfs[0]
for res in result_dfs[1:]:
joined_enduses_df = joined_enduses_df.join(res)

logger.info("Joining Completed.")
return joined_enduses_df.reset_index()
Expand All @@ -136,8 +139,7 @@ def aggregate_timeseries(self, params: TSQuery):
[self._bsq._get_table(jl[0]) for jl in params.join_list] # ingress all tables in join list
enduses_cols = self._bsq._get_enduse_cols(params.enduses, table='timeseries')
total_weight = self._bsq._get_weight(params.weights)

enduse_selection = [safunc.sum(enduse * total_weight).label(self._bsq._simple_label(enduse.name))
enduse_selection = [self._bsq._agg_column(enduse, total_weight, params.agg_func)
for enduse in enduses_cols]
group_by = list(params.group_by)
if self._bsq.timestamp_column_name not in group_by and params.collapse_ts:
Expand Down
20 changes: 15 additions & 5 deletions buildstock_query/aggregate_query.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Union, Literal, Sequence
from typing import Optional, Union, Literal, Sequence, Callable
import pandas as pd
import typing
from buildstock_query.schema.query_params import TSQuery, AnnualQuery
Expand All @@ -24,6 +24,7 @@ class BuildStockAggregate:
avoid: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = [],
get_quartiles: bool = False,
get_nonzero_count: bool = False,
agg_func: Optional[Union[str, Callable]] = 'sum'
) -> str:
...

Expand All @@ -40,6 +41,7 @@ class BuildStockAggregate:
avoid: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = [],
get_quartiles: bool = False,
get_nonzero_count: bool = False,
agg_func: Optional[Union[str, Callable]] = 'sum'
) -> pd.DataFrame:
...

Expand All @@ -56,6 +58,7 @@ class BuildStockAggregate:
avoid: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = [],
get_quartiles: bool = False,
get_nonzero_count: bool = False,
agg_func: Optional[Union[str, Callable]] = 'sum'
) -> Union[pd.DataFrame, str]:
"""
Aggregates the baseline annual result on select enduses.
Expand Down Expand Up @@ -93,6 +96,9 @@ class BuildStockAggregate:
get_query_only: Skips submitting the query to Athena and just returns the query string. Useful for batch
submitting multiple queries or debugging

agg_func: The aggregation function to use. Defaults to 'sum'.
See other options in https://prestodb.io/docs/current/functions/aggregate.html

Returns:
if get_query_only is True, returns the query_string, otherwise returns the dataframe
"""
Expand All @@ -118,7 +124,8 @@ class BuildStockAggregate:
split_enduses: bool = False,
collapse_ts: bool = False,
timestamp_grouping_func: Optional[str] = None,
limit: Optional[int] = None
limit: Optional[int] = None,
agg_func: Optional[Union[str, Callable]] = 'sum'
) -> str:
...

Expand All @@ -136,7 +143,8 @@ class BuildStockAggregate:
collapse_ts: bool = False,
timestamp_grouping_func: Optional[str] = None,
get_query_only: Literal[False] = False,
limit: Optional[int] = None
limit: Optional[int] = None,
agg_func: Optional[Union[str, Callable]] = 'sum'
) -> pd.DataFrame:
...

Expand All @@ -154,7 +162,8 @@ class BuildStockAggregate:
split_enduses: bool = False,
collapse_ts: bool = False,
timestamp_grouping_func: Optional[str] = None,
limit: Optional[int] = None
limit: Optional[int] = None,
agg_func: Optional[Union[str, Callable]] = 'sum'
) -> Union[str, pd.DataFrame]:
"""
Aggregates the timeseries result on select enduses.
Expand Down Expand Up @@ -188,7 +197,8 @@ class BuildStockAggregate:
get_query_only: Skips submitting the query to Athena and just returns the query string. Useful for batch
submitting multiple queries or debugging


agg_func: The aggregation function to use. Defaults to 'sum'.
See other options in https://prestodb.io/docs/current/functions/aggregate.html
Returns:
if get_query_only is True, returns the query_string, otherwise, returns the DataFrame

Expand Down
1 change: 1 addition & 0 deletions buildstock_query/db_schema/comstock_default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ timestamp = "time"
completed_status = "completed_status"
unmet_hours_cooling_hr = ""
unmet_hours_heating_hr = ""
upgrade = "apply_upgrade.upgrade"
fuel_totals = [
"simulation_output_report.total_site_electricity_kwh",
"simulation_output_report.total_site_energy_mbtu",
Expand Down
1 change: 1 addition & 0 deletions buildstock_query/db_schema/db_schema_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class ColumnNames(BaseModel):
sample_weight: str
sqft: str
timestamp: str
upgrade: str
completed_status: str
unmet_hours_cooling_hr: str
unmet_hours_heating_hr: str
Expand Down
1 change: 1 addition & 0 deletions buildstock_query/db_schema/resstock_default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ timestamp = "time"
completed_status = "completed_status"
unmet_hours_cooling_hr = "report_simulation_output.unmet_hours_cooling_hr"
unmet_hours_heating_hr = "report_simulation_output.unmet_hours_heating_hr"
upgrade = "upgrade"
fuel_totals = [
'report_simulation_output.energy_use_total_m_btu',
'report_simulation_output.fuel_use_coal_total_m_btu',
Expand Down
1 change: 1 addition & 0 deletions buildstock_query/db_schema/resstock_oedi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ timestamp = "timestamp"
completed_status = "applicability"
unmet_hours_cooling_hr = ""
unmet_hours_heating_hr = ""
upgrade = "upgrade"
fuel_totals = ["out.electricity.total.energy_consumption",
"out.natural_gas.total.energy_consumption",
"out.fuel_oil.total.energy_consumption",
Expand Down
31 changes: 31 additions & 0 deletions buildstock_query/db_schema/resstock_oedi_vu.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[table_suffix]
baseline = "_metadata_state_vu"
timeseries = "_by_state_vu"
upgrades = "_metadata_state_vu"

[column_prefix]
characteristics = "in."
output = "out."

[column_names]
building_id = "bldg_id"
sample_weight = "weight"
sqft = "in.sqft"
timestamp = "timestamp"
completed_status = "applicability"
unmet_hours_cooling_hr = ""
unmet_hours_heating_hr = ""
fuel_totals = ["out.electricity.total.energy_consumption",
"out.natural_gas.total.energy_consumption",
"out.fuel_oil.total.energy_consumption",
"out.propane.total.energy_consumption",
]
upgrade="upgrade"

[completion_values]
success = "true"
fail = ""
unapplicable = "false"

[structure]
unapplicables_have_ts = "true"
43 changes: 37 additions & 6 deletions buildstock_query/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self,
region_name: str = 'us-west-2',
execution_history: Optional[str] = None,
skip_reports: bool = False,
skip_integrity_check: bool = True,
athena_query_reuse: bool = True,
**kwargs,
) -> None:
Expand All @@ -69,6 +70,8 @@ def __init__(self,
custom filename.
skip_reports (bool, optional): If true, skips report printing during initialization. If False (default),
prints report from `buildstock_query.report_query.BuildStockReport.get_success_report`.
skip_integrity_check (bool, optional): If true, skips integrity check during initialization. If False (default),
checks integrity between baseline and timeseries tables. Most people don't need to check this.
athena_query_reuse (bool, optional): When true, Athena will make use of its built-in 7 day query cache.
When false, it will not. Defaults to True. One use case to set this to False is when you have modified
the underlying s3 data or glue schema and want to make sure you are not using the cached results.
Expand Down Expand Up @@ -108,7 +111,7 @@ def __init__(self,
if not skip_reports:
logger.info("Getting Success counts...")
print(self.report.get_success_report())
if self.ts_table is not None:
if self.ts_table is not None and not skip_integrity_check:
self.report.check_ts_bs_integrity()
self.save_cache()

Expand Down Expand Up @@ -164,10 +167,14 @@ def get_upgrade_names(self, get_query_only: Literal[True]) -> str:
def get_upgrade_names(self, get_query_only: bool = False) -> Union[str, dict]:
if self.up_table is None:
raise ValueError("This run has no upgrades")
upgrade_table = self.up_table
if isinstance(self.up_table, sa.Table):
upgrade_table = self.up_table.name
else:
upgrade_table = self._compile(self.up_table)
upgrade_col = self.db_schema.column_names.upgrade
query = f"""
Select cast(upgrade as integer) as upgrade, arbitrary("apply_upgrade.upgrade_name") as upgrade_name
from {upgrade_table}
Select cast(upgrade as integer) as upgrade, arbitrary("{upgrade_col}") as upgrade_name
from ({upgrade_table})
group by 1 order by 1
"""
if get_query_only:
Expand Down Expand Up @@ -300,6 +307,31 @@ def get_results_csv(self,
result = self.execute(query)
return result.set_index(self.bs_bldgid_column.name)

def _get_table_location(self, db_table_name: str) -> str:
table_info = self._aws_glue.get_table(DatabaseName=self.db_name, Name=db_table_name)['Table']
if table_info.get('TableType') != 'VIRTUAL_VIEW':
return table_info['StorageDescriptor']['Location']

try:
import base64
import re
import json
view_original_text = table_info.get('ViewOriginalText', '')
view_original_text = view_original_text[len('/* Presto View: '):-len(' */')]
decoded_json= json.loads(base64.b64decode(view_original_text).decode('utf-8'))
sql_text = decoded_json['originalSql']
match = re.search(r'FROM\s+([^\s,]+)', sql_text, re.IGNORECASE)
if not match:
raise ValueError(f"Could not find source table in view definition for {db_table_name}")
source_table = match.group(1)
source_table_info = self._aws_glue.get_table(
DatabaseName=self.db_name, # Using same database as view
Name=source_table
)['Table']
return source_table_info['StorageDescriptor']['Location']
except Exception as e:
raise ValueError(f"Failed to parse view definition for {db_table_name}: {str(e)}")

def _download_results_csv(self) -> str:
"""Downloads the results csv from s3 and returns the path to the downloaded file.
Returns:
Expand All @@ -313,8 +345,7 @@ def _download_results_csv(self) -> str:
db_table_name = f'{self.table_name}{self.db_schema.table_suffix.baseline}'
else:
db_table_name = self.table_name[0]
baseline_path = self._aws_glue.get_table(DatabaseName=self.db_name,
Name=db_table_name)['Table']['StorageDescriptor']['Location']
baseline_path = self._get_table_location(db_table_name)
bucket = baseline_path.split('/')[2]
key = '/'.join(baseline_path.split('/')[3:])
s3_data = self._aws_s3.list_objects(Bucket=bucket, Prefix=key)
Expand Down
38 changes: 29 additions & 9 deletions buildstock_query/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from pyathena.error import OperationalError
from pyathena.sqlalchemy.base import AthenaDialect
import sqlalchemy as sa
from sqlalchemy.sql import func as safunc
from pyathena.pandas.async_cursor import AsyncPandasCursor
from pyathena.pandas.cursor import PandasCursor
import os
from typing import Union, Optional, Literal, Sequence
from typing import Union, Optional, Literal, Sequence, Callable
import typing
import time
import logging
Expand Down Expand Up @@ -155,7 +156,7 @@ def load_cache(self, path: Optional[str] = None):
pickle_path = pathlib.Path(path) if path else self._get_cache_file_path()
before_count = len(self._query_cache)
saved_cache = load_pickle(pickle_path)
logger.info(f"{len(saved_cache)} queries cache read from {path}.")
logger.info(f"{len(saved_cache)} queries cache read from {pickle_path}.")
self._query_cache.update(saved_cache)
self.last_saved_queries = set(saved_cache)
after_count = len(self._query_cache)
Expand Down Expand Up @@ -518,10 +519,12 @@ def get_pandas(future):
return exe_id, AthenaFutureDf(result_future)
else:
if query not in self._query_cache:
self._query_cache[query] = self._conn.cursor().execute(query,
result_reuse_enable=self.athena_query_reuse,
result_reuse_minutes=60 * 24 * 7,
).as_pandas()
cursor = self._conn.cursor()
self._query_cache[query] = cursor.execute(query,
result_reuse_enable=self.athena_query_reuse,
result_reuse_minutes=60 * 24 * 7,
).as_pandas()
self._log_execution_cost(cursor.query_id)
return self._query_cache[query].copy()

def print_all_batch_query_status(self) -> None:
Expand Down Expand Up @@ -967,9 +970,14 @@ def get_cols(self, table: AnyTableType, fuel_type=None) -> Sequence[DBColType]:
tbl = self._get_table(table)
return [col for col in tbl.columns]

def _simple_label(self, label: str):
def _simple_label(self, label: str, agg_func: Optional[Union[Callable, str]] = None):
label = label.removeprefix(self.db_schema.column_prefix.characteristics)
label = label.removeprefix(self.db_schema.column_prefix.output)

if callable(agg_func):
label += f"__{agg_func.__name__}"
elif isinstance(agg_func, str) and agg_func != 'sum':
label += f"__{agg_func}"
return label

def _add_restrict(self, query, restrict, *, bs_only=False):
Expand Down Expand Up @@ -1035,16 +1043,28 @@ def _add_order_by(self, query, order_by_selection):
query = query.order_by(*a)
return query

def _get_weight(self, weights):
def _get_weight(self, weight_cols):
total_weight = self.sample_wt
for weight_col in weights:
for weight_col in weight_cols:
if isinstance(weight_col, tuple):
tbl = self._get_table(weight_col[1])
total_weight *= tbl.c[weight_col[0]]
else:
total_weight *= self._get_column(weight_col)
return total_weight

def _agg_column(self, column: DBColType, weights, agg_func=None):
label = self._simple_label(column.name, agg_func)
if callable(agg_func):
return agg_func(column).label(label)
if agg_func is None or agg_func in ['sum']:
return safunc.sum(column * weights).label(label)
if agg_func in ['avg']:
return (safunc.sum(column * weights) / safunc.sum(weights)).label(label)
assert isinstance(agg_func, str), f"agg_func {agg_func} is not a string or callable"
agg_func = getattr(safunc, agg_func)
return agg_func(column).label(label)

def delete_everything(self):
"""Deletes the athena tables and data in s3 for the run.
"""
Expand Down
3 changes: 3 additions & 0 deletions buildstock_query/savings_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def savings_shape(
) -> Union[pd.DataFrame, str]:
[self._bsq._get_table(jl[0]) for jl in params.join_list] # ingress all tables in join list

if params.agg_func != 'sum':
raise ValueError("Only 'sum' is supported for savings_shape")

upgrade_id = self._bsq._validate_upgrade(params.upgrade_id)
if params.timestamp_grouping_func and \
params.timestamp_grouping_func not in ['hour', 'day', 'month']:
Expand Down
3 changes: 2 additions & 1 deletion buildstock_query/schema/query_params.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pydantic import BaseModel, Field
from typing import Optional, Union, Sequence
from typing import Optional, Union, Sequence, Callable
from typing import Literal
from buildstock_query.schema.utilities import AnyTableType, AnyColType

Expand All @@ -17,6 +17,7 @@ class AnnualQuery(BaseModel):
get_nonzero_count: bool = False
get_query_only: bool = False
limit: Optional[int] = None
agg_func: Optional[Union[str, Callable]] = 'sum'

class Config:
arbitrary_types_allowed = True
Expand Down
2 changes: 0 additions & 2 deletions buildstock_query/tools/upgrades_visualizer/__init__.py

This file was deleted.

Loading