Skip to content

Commit

Permalink
[WAYANG-#8] Change structure for PywyPlan
Browse files Browse the repository at this point in the history
Signed-off-by: bertty <[email protected]>
  • Loading branch information
Bertty Contreras-Rojas authored and berttty committed Apr 8, 2022
1 parent 2205368 commit 8b20bc4
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 147 deletions.
35 changes: 0 additions & 35 deletions python/src/pywy/context.py

This file was deleted.

50 changes: 45 additions & 5 deletions python/src/pywy/dataquanta.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,68 @@
from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
from pywy.wayangplan import *
from pywy.wayangplan.wayang import PywyPlan
from pywy.platforms.basic.plugin import Plugin

class WayangContext:
"""
This is the entry point for users to work with Wayang.
"""
def __init__(self):
self.plugins = set()

"""
add a :class:`Plugin` to the :class:`Context`
"""
def register(self, *p: Plugin):
self.plugins.add(p)
return self

"""
remove a :class:`Plugin` from the :class:`Context`
"""
def unregister(self, p: Plugin):
self.plugins.remove(p)
return self

def textFile(self, file_path: str) -> 'DataQuanta[str]':
return DataQuanta(self, TextFileSource(file_path))

def __str__(self):
return "Plugins: {} \n".format(str(self.plugins))

def __repr__(self):
return self.__str__()

class DataQuanta(GenericTco):
"""
Represents an intermediate result/data flow edge in a [[WayangPlan]].
"""
previous : WyOperator = None

def __init__(self, operator: WyOperator):
def __init__(self, context:WayangContext, operator: WyOperator):
self.operator = operator
self.context = context

def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" :
return DataQuanta(FilterOperator(p))
return DataQuanta(self.context, self.__connect(FilterOperator(p)))

def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" :
return DataQuanta(MapOperator(f))
return DataQuanta(self.context,self.__connect(MapOperator(f)))

def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
return DataQuanta(FlatmapOperator(f))
return DataQuanta(self.context,self.__connect(FlatmapOperator(f)))

def storeTextFile(self: "DataQuanta[I]", path: str) :
last = DataQuanta(TextFileSink(path))
last = self.__connect(TextFileSink(path))
plan = PywyPlan(self.context.plugins, [last])
plan.print()

# TODO add the logic to execute the plan

def __connect(self, op:WyOperator, port_op: int = 0) -> WyOperator:
self.operator.connect(0, op, port_op)
return op

def getOperator(self):
return self.operator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, origin: FilterOperator = None):
pass

def execute(self, inputs: Channel, outputs: Channel):
self.validateChannels(inputs, outputs)
self.validate_channels(inputs, outputs)
udf = self.predicate
if isinstance(inputs[0], PyIteratorChannel) :
py_in_iter_channel: PyIteratorChannel = inputs[0]
Expand All @@ -36,8 +36,8 @@ def func(iterator):
raise Exception("Channel Type does not supported")


def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}

def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, origin: TextFileSink = None):
pass

def execute(self, inputs: Channel, outputs: Channel):
self.validateChannels(inputs, outputs)
self.validate_channels(inputs, outputs)
if isinstance(inputs[0], PyIteratorChannel) :
file = open(self.path,'w')
py_in_iter_channel: PyIteratorChannel = inputs[0]
Expand All @@ -29,8 +29,8 @@ def execute(self, inputs: Channel, outputs: Channel):
raise Exception("Channel Type does not supported")


def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
return {PyIteratorChannelDescriptor}

def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
raise Exception("The PyTextFileSource does not support Output Channels")
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, origin: TextFileSource = None):
pass

def execute(self, inputs: Channel, outputs: Channel):
self.validateChannels(inputs, outputs)
self.validate_channels(inputs, outputs)
if isinstance(outputs[0], PyIteratorChannel) :
py_out_iter_channel: PyIteratorChannel = outputs[0]
py_out_iter_channel.accept_iterable(
Expand All @@ -31,8 +31,8 @@ def execute(self, inputs: Channel, outputs: Channel):
raise Exception("Channel Type does not supported")


def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
raise Exception("The PyTextFileSource does not support Input Channels")

def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
return {PyIteratorChannelDescriptor}
122 changes: 61 additions & 61 deletions python/src/pywy/test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pywy.platforms.basic.platform import Platform
from pywy.context import WayangContext
from pywy.dataquanta import WayangContext
from pywy.platforms.python.channels import Channel
from pywy.platforms.basic.plugin import java, spark
from pywy.plugins import java, spark
from pywy.wayangplan.unary import *

p = Platform("nana")
Expand Down Expand Up @@ -36,62 +36,62 @@ def fmfunc(i:int) -> str:
.filter(pre)\
.storeTextFile("/Users/bertty/databloom/blossom/python/resources/test.output")

filterop: FilterOperator = fileop.filter(pre).getOperator()
#fop_pre = filterop.getWrapper()
#fop_pre_res = fop_pre(["la", "lala"])
#for i in fop_pre_res:
# print(i)


mapop: MapOperator = fileop.map(func).getOperator()
mop_func = mapop.getWrapper()
mop_func_res = mop_func(["la", "lala"])
#for i in mop_func_res:
# print(i)


fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator()
fmop_func = fmop.getWrapper()
fmop_func_res = fmop_func([2, 3])
#for i in fmop_func_res:
# print(i)

def concatenate(function_a, function_b):
def executable(iterable):
return function_b(function_a(iterable))
return executable

#res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
#res_pro = res(["la", "lala"])
#for i in res_pro:
# print(i)

from pywy.platforms.python.mappings import PywyOperatorMappings
from pywy.platforms.python.operators import *

print(PywyOperatorMappings)

pyF = PyFilterOperator()
print(pyF)
print(pyF.getInputChannelDescriptors())
print(type(pyF.getInputChannelDescriptors().pop().create_instance()))

qq : Channel = pyF.getInputChannelDescriptors().pop().create_instance()
print(qq)
print(type(qq))
print("ads")


def pre_lala(a:str):
print("executed")
return len(a) > 3

ou1 = filter(pre_lala, ["la", "lala"])
print(ou1)

for i in ou1:
print(i)

pyFM = PywyOperatorMappings.get_instanceof(filterop)
print(pyFM)
print(type(pyFM))
# filterop: FilterOperator = fileop.filter(pre).getOperator()
# #fop_pre = filterop.getWrapper()
# #fop_pre_res = fop_pre(["la", "lala"])
# #for i in fop_pre_res:
# # print(i)
#
#
# mapop: MapOperator = fileop.map(func).getOperator()
# mop_func = mapop.getWrapper()
# mop_func_res = mop_func(["la", "lala"])
# #for i in mop_func_res:
# # print(i)
#
#
# fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator()
# fmop_func = fmop.getWrapper()
# fmop_func_res = fmop_func([2, 3])
# #for i in fmop_func_res:
# # print(i)
#
# def concatenate(function_a, function_b):
# def executable(iterable):
# return function_b(function_a(iterable))
# return executable
#
# #res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
# #res_pro = res(["la", "lala"])
# #for i in res_pro:
# # print(i)
#
# from pywy.platforms.python.mappings import PywyOperatorMappings
# from pywy.platforms.python.operators import *
#
# print(PywyOperatorMappings)
#
# pyF = PyFilterOperator()
# print(pyF)
# print(pyF.get_input_channeldescriptors())
# print(type(pyF.get_input_channeldescriptors().pop().create_instance()))
#
# qq : Channel = pyF.get_input_channeldescriptors().pop().create_instance()
# print(qq)
# print(type(qq))
# print("ads")
#
#
# def pre_lala(a:str):
# print("executed")
# return len(a) > 3
#
# ou1 = filter(pre_lala, ["la", "lala"])
# print(ou1)
#
# for i in ou1:
# print(i)
#
# pyFM = PywyOperatorMappings.get_instanceof(filterop)
# print(pyFM)
# print(type(pyFM))
4 changes: 2 additions & 2 deletions python/src/pywy/translate/translator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from pywy.platforms.basic.plugin import Plugin
from pywy.wayangplan.wayang import WayangPlan
from pywy.wayangplan.wayang import PywyPlan
from pywy.platforms.basic.mapping import Mapping

class Translator:

def __init__(self, plugin: Plugin, plan: WayangPlan):
def __init__(self, plugin: Plugin, plan: PywyPlan):
self.plugin = plugin
self.plan = plan

Expand Down
16 changes: 7 additions & 9 deletions python/src/pywy/wayangplan/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from pywy.wayangplan.wayang import WayangPlan
from pywy.wayangplan.base import WyOperator
from pywy.wayangplan.sink import TextFileSink
from pywy.wayangplan.source import TextFileSource
from pywy.wayangplan.unary import FilterOperator, MapOperator, FlatmapOperator

#
__ALL__= [
WayangPlan,
WyOperator,
TextFileSink,
TextFileSource,
FilterOperator,
MapOperator,
FlatmapOperator
WyOperator,
TextFileSink,
TextFileSource,
FilterOperator,
# MapOperator,
# FlatmapOperator
]
24 changes: 16 additions & 8 deletions python/src/pywy/wayangplan/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ class WyOperator:

inputSlot : List[TypeVar]
inputChannel : ChannelDescriptor
inputOperator: List['WyOperator']
inputs : int
outputSlot : List[TypeVar]
OutputChannel: ChannelDescriptor
outputChannel: ChannelDescriptor
outputOperator: List['WyOperator']
outputs: int

def __init__(self,
Expand All @@ -22,31 +24,37 @@ def __init__(self,
self.inputs = input_lenght
self.outputSlot = output
self.outputs = output_lenght
self.inputOperator = [None] * self.inputs
self.outputOperator = [None] * self.outputs

def validateInputs(self, vec):
def validate_inputs(self, vec):
if len(vec) != self.inputs:
raise Exception(
"the inputs channel contains {} elements and need to have {}".format(
len(vec),
self.inputs
)
)
def validateOutputs(self, vec):
def validate_outputs(self, vec):
if len(vec) != self.outputs:
raise Exception(
"the output channel contains {} elements and need to have {}".format(
len(vec),
self.inputs
)
)
def validateChannels(self, input, output):
self.validateInputs(input)
self.validateOutputs(output)
def validate_channels(self, input, output):
self.validate_inputs(input)
self.validate_outputs(output)

def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
def connect(self, port:int, that: 'WyOperator', port_that:int):
self.outputOperator[port] = that
that.inputOperator[port_that] = self

def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
pass

def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
pass

def __str__(self):
Expand Down
Loading

0 comments on commit 8b20bc4

Please sign in to comment.