Skip to content

Commit

Permalink
[WAYANG-#8] Seed creation of Platforms/python
Browse files Browse the repository at this point in the history
Signed-off-by: bertty <[email protected]>
  • Loading branch information
Bertty Contreras-Rojas authored and berttty committed Apr 8, 2022
1 parent fc94a5f commit 2894e54
Show file tree
Hide file tree
Showing 16 changed files with 236 additions and 26 deletions.
6 changes: 3 additions & 3 deletions python/src/pywayang/dataquanta.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
from pywayang.operator.base import (BaseOperator)
from pywayang.operator.base import (WyOperator)
from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator)


class DataQuanta(GenericTco):
"""
Represents an intermediate result/data flow edge in a [[WayangPlan]].
"""
previous : BaseOperator = None
previous : WyOperator = None

def __init__(self, operator: BaseOperator):
def __init__(self, operator: WyOperator):
self.operator = operator


Expand Down
34 changes: 31 additions & 3 deletions python/src/pywayang/operator/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import (TypeVar, Optional, List)
from typing import (TypeVar, Optional, List, Set)
from pywayang.platforms.python.channels import ChannelDescriptor


class BaseOperator:
class WyOperator:

inputSlot : List[TypeVar]
inputChannel : ChannelDescriptor
inputs : int
outputSlot : List[TypeVar]
OutputChannel: ChannelDescriptor
outputs: int

def __init__(self,
Expand All @@ -21,6 +23,32 @@ def __init__(self,
self.outputSlot = output
self.outputs = output_lenght

def validateInputs(self, vec):
if len(vec) != self.inputs:
raise Exception(
"the inputs channel contains {} elements and need to have {}".format(
len(vec),
self.inputs
)
)
def validateOutputs(self, vec):
if len(vec) != self.outputs:
raise Exception(
"the output channel contains {} elements and need to have {}".format(
len(vec),
self.inputs
)
)
def validateChannels(self, input, output):
self.validateInputs(input)
self.validateOutputs(output)

def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
pass

def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
pass

def __str__(self):
return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format(
str(self.name),
Expand Down
4 changes: 2 additions & 2 deletions python/src/pywayang/operator/source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pywayang.operator.base import BaseOperator
from pywayang.operator.base import WyOperator

class SourceUnaryOperator(BaseOperator):
class SourceUnaryOperator(WyOperator):

def __init__(self, name:str):
super().__init__(name, None, str, 0, 1)
Expand Down
16 changes: 5 additions & 11 deletions python/src/pywayang/operator/unary.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pywayang.operator.base import BaseOperator
from pywayang.operator.base import WyOperator
from pywayang.types import (
GenericTco,
GenericUco,
Expand All @@ -12,7 +12,7 @@
from itertools import chain


class UnaryToUnaryOperator(BaseOperator):
class UnaryToUnaryOperator(WyOperator):

def __init__(self, name:str, input:GenericTco, output:GenericUco):
super().__init__(name, input, output, 1, 1)
Expand All @@ -30,16 +30,10 @@ class FilterOperator(UnaryToUnaryOperator):
predicate: Predicate

def __init__(self, predicate: Predicate):
type = getTypePredicate(predicate)
type = getTypePredicate(predicate) if predicate else None
super().__init__("FilterOperator", type, type)
self.predicate = predicate

def getWrapper(self):
udf = self.predicate
def func(iterator):
return filter(udf, iterator)
return func

def __str__(self):
return super().__str__()

Expand All @@ -51,7 +45,7 @@ class MapOperator(UnaryToUnaryOperator):
function: Function

def __init__(self, function: Function):
types = getTypeFunction(function)
types = getTypeFunction(function) if function else (None, None)
super().__init__("MapOperator", types[0], types[1])
self.function = function

Expand All @@ -73,7 +67,7 @@ class FlatmapOperator(UnaryToUnaryOperator):
fmfunction: FlatmapFunction

def __init__(self, fmfunction: FlatmapFunction):
types = getTypeFlatmapFunction(fmfunction)
types = getTypeFlatmapFunction(fmfunction) if fmfunction else (None, None)
super().__init__("FlatmapOperator", types[0], types[1])
self.fmfunction = fmfunction

Expand Down
Empty file.
Empty file.
60 changes: 60 additions & 0 deletions python/src/pywayang/platforms/python/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import ( Iterable, Callable )

class Channel:

def __init__(self):
pass

def getchannel(self) -> 'Channel':
return self

def gettype(self):
return type(self)

class ChannelDescriptor:

def __init__(self, channelType: type, isReusable: bool, isSuitableForBreakpoint: bool):
self.channelType = channelType
self.isReusable = isReusable
self.isSuitableForBreakpoint = isSuitableForBreakpoint

def create_instance(self) -> Channel:
return self.channelType()


class PyIteratorChannel(Channel):

iterable : Iterable

def __init__(self):
Channel.__init__(self)

def provide_iterable(self) -> Iterable:
return self.iterable

def accept_iterable(self, iterable) -> 'PyIteratorChannel':
self.iterable = iterable
return self

class PyCallableChannel(Channel):

udf : Callable

def __init__(self):
Channel.__init__(self)

def provide_callable(self) -> Callable:
return self.udf

def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
self.udf = udf
return self

@staticmethod
def concatenate(function_a: Callable, function_b: Callable):
def executable(iterable):
return function_a(function_b(iterable))
return executable

PyIteratorChannelDescriptor = ChannelDescriptor(type(PyIteratorChannel()), False, False)
PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False)
Empty file.
Empty file.
35 changes: 35 additions & 0 deletions python/src/pywayang/platforms/python/mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Dict

from pywayang.operator.base import WyOperator
from pywayang.platforms.python.operators import *

class Mapping:
mappings: Dict[str, type]

def __init__(self):
self.mappings = {}

def add_mapping(self, operator: PythonExecutionOperator):
self.mappings[operator.name] = type(operator)

def get_instanceof(self, operator: WyOperator):
template = self.mappings[operator.name]
if template is None:
raise Exception(
"the operator {} does not have valid mapping".format(
operator.name
)
)
return template(operator)


def __str__(self):
return str(self.mappings)

def __repr__(self):
return self.__str__()

OperatorMappings = Mapping()

OperatorMappings.add_mapping(PyFilterOperator())

43 changes: 43 additions & 0 deletions python/src/pywayang/platforms/python/operators/PyFilterOperator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from pywayang.operator.unary import FilterOperator
from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
from pywayang.platforms.python.channels import (Channel, ChannelDescriptor, PyIteratorChannel,
PyIteratorChannelDescriptor, PyCallableChannelDescriptor,
PyCallableChannel)
from typing import Set

class PyFilterOperator(FilterOperator, PythonExecutionOperator):

def __init__(self, origin: FilterOperator = None):
predicate = None if origin is None else origin.predicate
super().__init__(predicate)
pass

def execute(self, inputs: Channel, outputs: Channel):
self.validateChannels(inputs, outputs)
udf = self.predicate
if isinstance(inputs[0], PyIteratorChannel) :
py_in_iter_channel: PyIteratorChannel = inputs[0]
py_out_iter_channel: PyIteratorChannel = outputs[0]
py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable()))
elif isinstance(inputs[0], PyCallableChannel) :
py_in_call_channel: PyCallableChannel = inputs[0]
py_out_call_channel: PyCallableChannel = outputs[0]

def func(iterator):
return filter(udf, iterator)

py_out_call_channel.accept_callable(
PyCallableChannel.concatenate(
func,
py_in_call_channel.provide_callable()
)
)
else:
raise Exception("Channel Type does not supported")


def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}

def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pywayang.operator.base import WyOperator
from pywayang.platforms.python.channels import Channel

class PythonExecutionOperator(WyOperator):

def execute(self, inputs: Channel, output: Channel):
pass
7 changes: 7 additions & 0 deletions python/src/pywayang/platforms/python/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator

__ALL__ = [
PythonExecutionOperator,
PyFilterOperator
]
Empty file.
Empty file.
50 changes: 43 additions & 7 deletions python/src/pywayang/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@

from pywayang.platform import Platform
from pywayang.context import WayangContext
from pywayang.platforms.python.channels import Channel
from pywayang.plugin import java, spark
from pywayang.operator.unary import *

p = Platform("nana")
print(p)
print("LALA "+str(p))
pt = type(p)
print(pt)
p2 = pt("chao")
print(p2)
print(type(p2))


print(str(WayangContext().register(java, spark)))
Expand All @@ -31,8 +37,8 @@ def fmfunc(i:int) -> str:
.textFile("here")\

filterop: FilterOperator = fileop.filter(pre).getOperator()
fop_pre = filterop.getWrapper()
fop_pre_res = fop_pre(["la", "lala"])
#fop_pre = filterop.getWrapper()
#fop_pre_res = fop_pre(["la", "lala"])
#for i in fop_pre_res:
# print(i)

Expand All @@ -55,7 +61,37 @@ def executable(iterable):
return function_b(function_a(iterable))
return executable

res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
res_pro = res(["la", "lala"])
for i in res_pro:
print(i)
#res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
#res_pro = res(["la", "lala"])
#for i in res_pro:
# print(i)

from pywayang.platforms.python.mappings import OperatorMappings
from pywayang.platforms.python.operators import *

print(OperatorMappings)

pyF = PyFilterOperator()
print(pyF)
print(pyF.getInputChannelDescriptors())
print(type(pyF.getInputChannelDescriptors().pop().create_instance()))

qq : Channel = pyF.getInputChannelDescriptors().pop().create_instance()
print(qq)
print(type(qq))
print("ads")


def pre_lala(a:str):
print("executed")
return len(a) > 3

ou1 = filter(pre_lala, ["la", "lala"])
print(ou1)

for i in ou1:
print(i)

pyFM = OperatorMappings.get_instanceof(filterop)
print(pyFM)
print(type(pyFM))

0 comments on commit 2894e54

Please sign in to comment.