Skip to content

Commit 486f515

Browse files
committed
Checkpoint
1 parent 6252ebd commit 486f515

File tree

4 files changed

+261
-21
lines changed

4 files changed

+261
-21
lines changed

modin/core/dataframe/algebra/default2pandas/default.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -202,9 +202,9 @@ def args_cast(self, *args, **kwargs):
202202
203203
Cast all Modin objects that function arguments contain to its pandas representation.
204204
"""
205-
args = try_cast_to_pandas(args)
206-
kwargs = try_cast_to_pandas(kwargs)
207-
return wrapper(self, *args, **kwargs)
205+
args1 = try_cast_to_pandas(args)
206+
kwargs1 = try_cast_to_pandas(kwargs)
207+
return wrapper(self, *args1, **kwargs1)
208208

209209
return args_cast
210210

modin/core/storage_formats/base/query_compiler.py

+50
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from __future__ import annotations
2121

2222
import abc
23+
from enum import Enum
2324
import warnings
2425
from functools import cached_property
2526
from typing import TYPE_CHECKING, Hashable, List, Literal, Optional
@@ -107,6 +108,43 @@ def axis_setter(self, labels):
107108
return axis_setter
108109

109110

111+
class QCCoercionCost(Enum):
112+
'''
113+
Coercion costs between query compilers can be expressed
114+
as integers in the range -1 to 1000, where 1000 is
115+
considered impossible. Since coercsion costs can be a
116+
function of many variables ( dataset size, partitioning,
117+
network throughput, and query time ) we define a set range
118+
of cost values to simplify comparisons between two query
119+
compilers / engines in a unified way.
120+
121+
COST_UNKNOWN means we do not know the cost associated with changing
122+
query compilers.
123+
124+
COST_ZERO means there is no cost associated, or that the query compilers
125+
are the same.
126+
127+
COST_IMPOSSIBLE means the coercion is effectively impossible, which can
128+
occur if the target system is unable to store the data as a result
129+
of the coercion.
130+
'''
131+
COST_UNKNOWN = -1
132+
COST_ZERO = 0
133+
COST_LOW = 250
134+
COST_MEDIUM = 500
135+
COST_HIGH = 750
136+
COST_IMPOSSIBLE = 1000
137+
138+
def validate_coercsion_cost(cost:QCCoercionCost):
139+
if int(cost) < int(QCCoercionCost.COST_UNKNOWN) or int(cost) > int(QCCoercionCost.COST_IMPOSSIBLE):
140+
raise ValueError("Query compiler coercsion cost out of range")
141+
142+
def __int__(self):
143+
return self.value
144+
145+
def __add__(self, other) -> int:
146+
return int(self) + int(other)
147+
110148
# FIXME: many of the BaseQueryCompiler methods are hiding actual arguments
111149
# by using *args and **kwargs. They should be spread into actual parameters.
112150
# Currently actual arguments are placed in the methods docstrings, but since they're
@@ -247,6 +285,18 @@ def default_to_pandas(self, pandas_op, *args, **kwargs) -> Self:
247285
return [self.__wrap_in_qc(obj) for obj in result]
248286
return self.__wrap_in_qc(result)
249287

288+
def qc_engine_switch_cost(self, other_qc) -> dict[type, int]:
289+
'''
290+
Coercion costs to and from other_qc
291+
292+
Returns a map of type to QCCoercionCost, where type is the type we are casting to.
293+
This provides a mechanism for the query compilers to provide information to
294+
modin on the cost of moving data to another query compiler ( or the other way ).
295+
'''
296+
if isinstance(type(self), type(other_qc)):
297+
return {type(self): QCCoercionCost.COST_ZERO}
298+
return {}
299+
250300
# Abstract Methods and Fields: Must implement in children classes
251301
# In some cases, there you may be able to use the same implementation for
252302
# some of these abstract methods, but for the sake of generality they are

modin/core/storage_formats/pandas/query_compiler_caster.py

+116-18
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,70 @@
2121

2222
import functools
2323
import inspect
24+
from itertools import combinations
2425
from types import FunctionType, MethodType
2526
from typing import Any, Dict, Tuple, TypeVar
2627

2728
from pandas.core.indexes.frozen import FrozenList
2829

29-
from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler
30+
from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler, QCCoercionCost
3031

3132
Fn = TypeVar("Fn", bound=Any)
3233

33-
34+
class QueryCompilerCasterCalculator:
35+
36+
def __init__(self):
37+
self._caster_costing_map = {}
38+
self._data_cls_map = {}
39+
self._qc_list = []
40+
self._qc_cls_list = []
41+
self._result_type = None
42+
43+
def add_query_compiler(self, query_compiler):
44+
if isinstance(query_compiler, type):
45+
# class
46+
qc_type = query_compiler
47+
else:
48+
# instance
49+
qc_type = type(query_compiler)
50+
self._qc_list.append(query_compiler)
51+
self._data_cls_map[qc_type] = query_compiler._modin_frame
52+
self._qc_cls_list.append(qc_type)
53+
54+
def calculate(self):
55+
if self._result_type is not None:
56+
return self._result_type
57+
if len(self._qc_cls_list) == 1:
58+
return self._qc_cls_list[0]
59+
if len(self._qc_cls_list) == 0:
60+
raise ValueError("No query compilers registered")
61+
62+
for (qc_1, qc_2) in combinations(self._qc_list, 2):
63+
costs_1 = qc_1.qc_engine_switch_cost(qc_2)
64+
costs_2 = qc_2.qc_engine_switch_cost(qc_1)
65+
self._add_cost_data(costs_1)
66+
self._add_cost_data(costs_2)
67+
68+
min_value = min(self._caster_costing_map.values())
69+
for key, value in self._caster_costing_map.items():
70+
if min_value == value:
71+
self._result_type = key
72+
break
73+
return self._result_type
74+
75+
def _add_cost_data(self, costs:dict):
76+
for k, v in costs.items():
77+
# filter out any extranious query compilers not in this operation
78+
if k in self._qc_cls_list:
79+
QCCoercionCost.validate_coercsion_cost(v)
80+
# Adds the costs associated with all coercions to a type, k
81+
self._caster_costing_map[k] = v + self._caster_costing_map[k] if k in self._caster_costing_map else v
82+
83+
def result_data_frame(self):
84+
qc_type = self.calculate()
85+
return self._data_cls_map[qc_type]
86+
87+
3488
class QueryCompilerCaster:
3589
"""Cast all query compiler arguments of the member function to current query compiler."""
3690

@@ -55,7 +109,9 @@ def __init_subclass__(
55109
apply_argument_cast(cls)
56110

57111

58-
def cast_nested_args_to_current_qc_type(arguments, current_qc):
112+
def visit_nested_args(arguments,
113+
current_qc:BaseQueryCompiler,
114+
fn:callable):
59115
"""
60116
Cast all arguments in nested fashion to current query compiler.
61117
@@ -70,33 +126,25 @@ def cast_nested_args_to_current_qc_type(arguments, current_qc):
70126
Returns args and kwargs with all query compilers casted to current_qc.
71127
"""
72128

73-
def cast_arg_to_current_qc(arg):
74-
current_qc_type = type(current_qc)
75-
if isinstance(arg, BaseQueryCompiler) and not isinstance(arg, current_qc_type):
76-
data_cls = current_qc._modin_frame
77-
return current_qc_type.from_pandas(arg.to_pandas(), data_cls)
78-
else:
79-
return arg
80-
81129
imutable_types = (FrozenList, tuple)
82130
if isinstance(arguments, imutable_types):
83131
args_type = type(arguments)
84132
arguments = list(arguments)
85-
arguments = cast_nested_args_to_current_qc_type(arguments, current_qc)
133+
arguments = visit_nested_args(arguments, current_qc, fn)
86134

87135
return args_type(arguments)
88136
if isinstance(arguments, list):
89137
for i in range(len(arguments)):
90138
if isinstance(arguments[i], (list, dict)):
91-
cast_nested_args_to_current_qc_type(arguments[i], current_qc)
139+
visit_nested_args(arguments[i], current_qc, fn)
92140
else:
93-
arguments[i] = cast_arg_to_current_qc(arguments[i])
141+
arguments[i] = fn(arguments[i])
94142
elif isinstance(arguments, dict):
95143
for key in arguments:
96144
if isinstance(arguments[key], (list, dict)):
97-
cast_nested_args_to_current_qc_type(arguments[key], current_qc)
145+
visit_nested_args(arguments[key], current_qc, fn)
98146
else:
99-
arguments[key] = cast_arg_to_current_qc(arguments[key])
147+
arguments[key] = fn(arguments[key])
100148
return arguments
101149

102150

@@ -116,6 +164,9 @@ def apply_argument_cast(obj: Fn) -> Fn:
116164
if isinstance(obj, type):
117165
all_attrs = dict(inspect.getmembers(obj))
118166
all_attrs.pop("__abstractmethods__")
167+
all_attrs.pop("__init__")
168+
all_attrs.pop("qc_engine_switch_cost")
169+
all_attrs.pop("from_pandas")
119170

120171
# This is required because inspect converts class methods to member functions
121172
current_class_attrs = vars(obj)
@@ -150,10 +201,57 @@ def cast_args(*args: Tuple, **kwargs: Dict) -> Any:
150201
-------
151202
Any
152203
"""
204+
if len(args) == 0 and len(kwargs) == 0:
205+
return
206+
print(f"Adding wrapper {obj}\n")
153207
current_qc = args[0]
208+
calculator = QueryCompilerCasterCalculator()
209+
calculator.add_query_compiler(current_qc)
210+
211+
def arg_needs_casting(arg):
212+
current_qc_type = type(current_qc)
213+
if not isinstance(arg, BaseQueryCompiler):
214+
return False
215+
if isinstance(arg, current_qc_type):
216+
return False
217+
return True
218+
219+
def register_query_compilers(arg):
220+
if not arg_needs_casting(arg):
221+
return arg
222+
calculator.add_query_compiler(arg)
223+
return arg
224+
225+
def cast_to_qc(arg):
226+
if not arg_needs_casting(arg):
227+
return arg
228+
qc_type = calculator.calculate()
229+
if qc_type == None or qc_type == type(arg):
230+
return arg
231+
frame_data = calculator.result_data_frame()
232+
result = qc_type.from_pandas(arg.to_pandas(), frame_data)
233+
return result
234+
235+
154236
if isinstance(current_qc, BaseQueryCompiler):
155-
kwargs = cast_nested_args_to_current_qc_type(kwargs, current_qc)
156-
args = cast_nested_args_to_current_qc_type(args, current_qc)
237+
visit_nested_args(kwargs, current_qc, register_query_compilers)
238+
visit_nested_args(args, current_qc, register_query_compilers)
239+
240+
args = visit_nested_args(args, current_qc, cast_to_qc)
241+
kwargs = visit_nested_args(kwargs, current_qc, cast_to_qc)
242+
243+
244+
qc = calculator.calculate()
245+
246+
if qc == None or qc == type(current_qc):
247+
return obj(*args, **kwargs)
248+
249+
#breakpoint()
250+
# we need to cast current_qc to a new query compiler
251+
if qc != current_qc:
252+
data_cls = current_qc._modin_frame
253+
return qc.from_pandas(current_qc.to_pandas(), data_cls)
254+
# need to find the new function for obj
157255
return obj(*args, **kwargs)
158256

159257
return cast_args
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
2+
3+
import pandas
4+
from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler, QCCoercionCost
5+
from modin.core.storage_formats.pandas.native_query_compiler import NativeQueryCompiler
6+
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler
7+
from modin.utils import _inherit_docstrings
8+
9+
10+
class CloudQC(NativeQueryCompiler):
11+
'Represents a cloud-hosted query compiler'
12+
def __init__(self, pandas_frame):
13+
self._modin_frame = pandas_frame
14+
super().__init__(pandas_frame)
15+
16+
def qc_engine_switch_cost(self, other_qc):
17+
return {CloudQC: QCCoercionCost.COST_ZERO,
18+
ClusterQC: QCCoercionCost.COST_MEDIUM,
19+
LocalMachineQC: QCCoercionCost.COST_HIGH,
20+
PicoQC: QCCoercionCost.COST_IMPOSSIBLE}
21+
22+
class ClusterQC(NativeQueryCompiler):
23+
'Represents a local network cluster query compiler'
24+
def __init__(self, pandas_frame):
25+
self._modin_frame = pandas_frame
26+
super().__init__(pandas_frame)
27+
28+
def qc_engine_switch_cost(self, other_qc):
29+
return {CloudQC: QCCoercionCost.COST_MEDIUM,
30+
ClusterQC: QCCoercionCost.COST_ZERO,
31+
LocalMachineQC: QCCoercionCost.COST_MEDIUM,
32+
PicoQC: QCCoercionCost.COST_HIGH}
33+
34+
class LocalMachineQC(NativeQueryCompiler):
35+
'Represents a local machine query compiler'
36+
def __init__(self, pandas_frame):
37+
self._modin_frame = pandas_frame
38+
super().__init__(pandas_frame)
39+
40+
def qc_engine_switch_cost(self, other_qc):
41+
return {CloudQC: QCCoercionCost.COST_MEDIUM,
42+
ClusterQC: QCCoercionCost.COST_LOW,
43+
LocalMachineQC: QCCoercionCost.COST_ZERO,
44+
PicoQC: QCCoercionCost.COST_MEDIUM}
45+
46+
class PicoQC(NativeQueryCompiler):
47+
'Represents a query compiler with very few resources'
48+
def __init__(self, pandas_frame):
49+
self._modin_frame = pandas_frame
50+
super().__init__(pandas_frame)
51+
52+
def qc_engine_switch_cost(self, other_qc):
53+
return {CloudQC: QCCoercionCost.COST_LOW,
54+
ClusterQC: QCCoercionCost.COST_LOW,
55+
LocalMachineQC: QCCoercionCost.COST_LOW,
56+
PicoQC: QCCoercionCost.COST_ZERO}
57+
58+
def test_two_same_qc_types_noop():
59+
df = PicoQC(pandas.DataFrame([0, 1, 2]))
60+
df2 = PicoQC(pandas.DataFrame([0, 1, 2]))
61+
df3 = df.concat(axis=1, other=df2)
62+
assert(type(df3) == type(df2))
63+
64+
def test_two_two_qc_types_rhs():
65+
df = PicoQC(pandas.DataFrame([0, 1, 2]))
66+
df2 = ClusterQC(pandas.DataFrame([0, 1, 2]))
67+
df3 = df.concat(axis=1, other=df2)
68+
assert(type(df3) == type(df2))
69+
70+
def test_two_two_qc_types_lhs():
71+
df = PicoQC(pandas.DataFrame([0, 1, 2]))
72+
df2 = ClusterQC(pandas.DataFrame([0, 1, 2]))
73+
df3 = df2.concat(axis=1, other=df)
74+
assert(type(df3) == type(df2)) # should move to cluster
75+
76+
def test_three_two_qc_types_rhs():
77+
pass
78+
79+
def test_three_two_qc_types_lhs():
80+
pass
81+
82+
def test_three_two_qc_types_middle():
83+
pass
84+
85+
def test_three_three_qc_types_rhs():
86+
pass
87+
88+
def test_three_three_qc_types_lhs():
89+
pass
90+
91+
def test_three_three_qc_types_middle():
92+
pass

0 commit comments

Comments
 (0)