9
9
from collections import OrderedDict
10
10
from copy import deepcopy
11
11
from datetime import datetime , timedelta
12
+ from distutils .version import LooseVersion
12
13
import json
13
14
import logging
14
15
from multiprocessing .pool import ThreadPool
@@ -899,8 +900,8 @@ def resolve_postagg(postagg, post_aggs, agg_names, visited_postaggs, metrics_dic
899
900
missing_postagg , post_aggs , agg_names , visited_postaggs , metrics_dict )
900
901
post_aggs [postagg .metric_name ] = DruidDatasource .get_post_agg (postagg .json_obj )
901
902
902
- @classmethod
903
- def metrics_and_post_aggs (cls , metrics , metrics_dict ):
903
+ @staticmethod
904
+ def metrics_and_post_aggs (metrics , metrics_dict , druid_version = None ):
904
905
# Separate metrics into those that are aggregations
905
906
# and those that are post aggregations
906
907
saved_agg_names = set ()
@@ -920,9 +921,13 @@ def metrics_and_post_aggs(cls, metrics, metrics_dict):
920
921
for postagg_name in postagg_names :
921
922
postagg = metrics_dict [postagg_name ]
922
923
visited_postaggs .add (postagg_name )
923
- cls .resolve_postagg (
924
+ DruidDatasource .resolve_postagg (
924
925
postagg , post_aggs , saved_agg_names , visited_postaggs , metrics_dict )
925
- aggs = cls .get_aggregations (metrics_dict , saved_agg_names , adhoc_agg_configs )
926
+ aggs = DruidDatasource .get_aggregations (
927
+ metrics_dict ,
928
+ saved_agg_names ,
929
+ adhoc_agg_configs ,
930
+ )
926
931
return aggs , post_aggs
927
932
928
933
def values_for_column (self ,
@@ -997,11 +1002,12 @@ def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter):
997
1002
998
1003
@staticmethod
999
1004
def druid_type_from_adhoc_metric (adhoc_metric ):
1000
- column_type = adhoc_metric .get ('column' ).get ('type' ).lower ()
1001
- aggregate = adhoc_metric .get ('aggregate' ).lower ()
1002
- if (aggregate == 'count' ):
1005
+ column_type = adhoc_metric ['column' ]['type' ].lower ()
1006
+ aggregate = adhoc_metric ['aggregate' ].lower ()
1007
+
1008
+ if aggregate == 'count' :
1003
1009
return 'count'
1004
- if ( aggregate == 'count_distinct' ) :
1010
+ if aggregate == 'count_distinct' :
1005
1011
return 'cardinality'
1006
1012
else :
1007
1013
return column_type + aggregate .capitalize ()
@@ -1132,6 +1138,17 @@ def run_query( # noqa / druid
1132
1138
metrics_dict = {m .metric_name : m for m in self .metrics }
1133
1139
columns_dict = {c .column_name : c for c in self .columns }
1134
1140
1141
+ if (
1142
+ self .cluster and
1143
+ LooseVersion (self .cluster .get_druid_version ()) < LooseVersion ('0.11.0' )
1144
+ ):
1145
+ for metric in metrics :
1146
+ if (
1147
+ utils .is_adhoc_metric (metric ) and
1148
+ metric ['column' ]['type' ].upper () == 'FLOAT'
1149
+ ):
1150
+ metric ['column' ]['type' ] = 'DOUBLE'
1151
+
1135
1152
aggregations , post_aggs = DruidDatasource .metrics_and_post_aggs (
1136
1153
metrics ,
1137
1154
metrics_dict )
@@ -1187,7 +1204,7 @@ def run_query( # noqa / druid
1187
1204
pre_qry = deepcopy (qry )
1188
1205
if timeseries_limit_metric :
1189
1206
order_by = timeseries_limit_metric
1190
- aggs_dict , post_aggs_dict = self .metrics_and_post_aggs (
1207
+ aggs_dict , post_aggs_dict = DruidDatasource .metrics_and_post_aggs (
1191
1208
[timeseries_limit_metric ],
1192
1209
metrics_dict )
1193
1210
if phase == 1 :
@@ -1256,7 +1273,7 @@ def run_query( # noqa / druid
1256
1273
1257
1274
if timeseries_limit_metric :
1258
1275
order_by = timeseries_limit_metric
1259
- aggs_dict , post_aggs_dict = self .metrics_and_post_aggs (
1276
+ aggs_dict , post_aggs_dict = DruidDatasource .metrics_and_post_aggs (
1260
1277
[timeseries_limit_metric ],
1261
1278
metrics_dict )
1262
1279
if phase == 1 :
0 commit comments