Skip to content

Commit

Permalink
[WAYANG-#8] Structure the Python file inside of a module
Browse files Browse the repository at this point in the history
Signed-off-by: bertty <[email protected]>
  • Loading branch information
Bertty Contreras-Rojas authored and berttty committed Apr 8, 2022
1 parent 4aa733b commit fc94a5f
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 2 deletions.
35 changes: 35 additions & 0 deletions python/src/pywayang/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pywayang.plugin import Plugin
from pywayang.dataquanta import DataQuanta
from pywayang.operator.source import TextFileSource

class WayangContext:
"""
This is the entry point for users to work with Wayang.
"""
def __init__(self):
self.plugins = set()

"""
add a :class:`Plugin` to the :class:`Context`
"""
def register(self, *p: Plugin):
self.plugins.add(p)
return self

"""
remove a :class:`Plugin` from the :class:`Context`
"""
def unregister(self, p: Plugin):
self.plugins.remove(p)
return self

def textFile(self, file_path: str) -> DataQuanta[str]:
return DataQuanta(TextFileSource(file_path))


def __str__(self):
return "Plugins: {} \n".format(str(self.plugins))

def __repr__(self):
return self.__str__()

32 changes: 32 additions & 0 deletions python/src/pywayang/dataquanta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
from pywayang.operator.base import (BaseOperator)
from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator)


class DataQuanta(GenericTco):
"""
Represents an intermediate result/data flow edge in a [[WayangPlan]].
"""
previous : BaseOperator = None

def __init__(self, operator: BaseOperator):
self.operator = operator


def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" :
return DataQuanta(FilterOperator(p))

def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" :
return DataQuanta(MapOperator(f))

def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
return DataQuanta(FlatmapOperator(f))

def getOperator(self):
return self.operator

def __str__(self):
return str(self.operator)

def __repr__(self):
return self.__str__()
File renamed without changes.
37 changes: 37 additions & 0 deletions python/src/pywayang/operator/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import (TypeVar, Optional, List)


class BaseOperator:

inputSlot : List[TypeVar]
inputs : int
outputSlot : List[TypeVar]
outputs: int

def __init__(self,
name: str,
input: Optional[TypeVar] = None,
output: Optional[TypeVar] = None,
input_lenght: Optional[int] = 1,
output_lenght: Optional[int] = 1
):
self.name = name
self.inputSlot = input
self.inputs = input_lenght
self.outputSlot = output
self.outputs = output_lenght

def __str__(self):
return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format(
str(self.name),
str(self.inputs),
str(self.inputSlot),
str(self.outputs),
str(self.outputSlot),
)

def __repr__(self):
return self.__str__()



28 changes: 28 additions & 0 deletions python/src/pywayang/operator/source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from pywayang.operator.base import BaseOperator

class SourceUnaryOperator(BaseOperator):

def __init__(self, name:str):
super().__init__(name, None, str, 0, 1)

def __str__(self):
return super().__str__()

def __repr__(self):
return super().__repr__()



class TextFileSource(SourceUnaryOperator):

path: str

def __init__(self, path: str):
super().__init__('TextFileSource')
self.path = path

def __str__(self):
return super().__str__()

def __repr__(self):
return super().__repr__()
90 changes: 90 additions & 0 deletions python/src/pywayang/operator/unary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from pywayang.operator.base import BaseOperator
from pywayang.types import (
GenericTco,
GenericUco,
Predicate,
getTypePredicate,
Function,
getTypeFunction,
FlatmapFunction,
getTypeFlatmapFunction
)
from itertools import chain


class UnaryToUnaryOperator(BaseOperator):

def __init__(self, name:str, input:GenericTco, output:GenericUco):
super().__init__(name, input, output, 1, 1)

def __str__(self):
return super().__str__()

def __repr__(self):
return super().__repr__()



class FilterOperator(UnaryToUnaryOperator):

predicate: Predicate

def __init__(self, predicate: Predicate):
type = getTypePredicate(predicate)
super().__init__("FilterOperator", type, type)
self.predicate = predicate

def getWrapper(self):
udf = self.predicate
def func(iterator):
return filter(udf, iterator)
return func

def __str__(self):
return super().__str__()

def __repr__(self):
return super().__repr__()

class MapOperator(UnaryToUnaryOperator):

function: Function

def __init__(self, function: Function):
types = getTypeFunction(function)
super().__init__("MapOperator", types[0], types[1])
self.function = function

def getWrapper(self):
udf = self.function
def func(iterator):
return map(udf, iterator)
return func

def __str__(self):
return super().__str__()

def __repr__(self):
return super().__repr__()


class FlatmapOperator(UnaryToUnaryOperator):

fmfunction: FlatmapFunction

def __init__(self, fmfunction: FlatmapFunction):
types = getTypeFlatmapFunction(fmfunction)
super().__init__("FlatmapOperator", types[0], types[1])
self.fmfunction = fmfunction

def getWrapper(self):
udf = self.fmfunction
def func(iterator):
return chain.from_iterable(map(udf, iterator))
return func

def __str__(self):
return super().__str__()

def __repr__(self):
return super().__repr__()
8 changes: 6 additions & 2 deletions python/src/pywayang/orchestrator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
class Operator:

def __init__(
self, operator_type=None, udf=None, previous=None,
iterator=None, python_exec=False
self,
operator_type=None,
udf=None,
previous=None,
iterator=None,
python_exec=False
):

# Operator ID
Expand Down
24 changes: 24 additions & 0 deletions python/src/pywayang/platform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

class Platform:
"""
A platform describes an execution engine that is used for execute the
wayang plan
Parameters
----------
name: str
platform name, it uses as identification
"""

name : str
#configuration : dict[str, str]

def __init__(self, name):
self.name = name

def __str__(self):
return "name: {}".format(self.name)

def __repr__(self):
return self.__str__()

27 changes: 27 additions & 0 deletions python/src/pywayang/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pywayang.platform import Platform

class Plugin:
"""
A plugin contributes the following components to a :class:`Context`
- mappings
- channels
- configurations
In turn, it may require several :clas:`Platform`s for its operation.
"""

platforms = []

def __init__(self, *platform:Platform):
self.platforms = list(platform)

def __str__(self):
return "Platforms: {}".format(str(self.platforms))

def __repr__(self):
return self.__str__()


# define the basic plugins that can be used
java = Plugin(Platform('java'))
spark = Plugin(Platform('spark'))
flink = Plugin(Platform('flink'))
61 changes: 61 additions & 0 deletions python/src/pywayang/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Iterable

from pywayang.platform import Platform
from pywayang.context import WayangContext
from pywayang.plugin import java, spark
from pywayang.operator.unary import *

p = Platform("nana")
print(p)


print(str(WayangContext().register(java, spark)))

from pywayang.types import Predicate, getTypePredicate

predicate : Predicate = lambda x: x % 2 == 0
getTypePredicate(predicate)

def pre(a:str):
return len(a) > 3

def func(s:str) -> int:
return len(s)

def fmfunc(i:int) -> str:
for x in range(i):
yield str(x)

fileop = WayangContext()\
.register(java)\
.textFile("here")\

filterop: FilterOperator = fileop.filter(pre).getOperator()
fop_pre = filterop.getWrapper()
fop_pre_res = fop_pre(["la", "lala"])
#for i in fop_pre_res:
# print(i)


mapop: MapOperator = fileop.map(func).getOperator()
mop_func = mapop.getWrapper()
mop_func_res = mop_func(["la", "lala"])
#for i in mop_func_res:
# print(i)


fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator()
fmop_func = fmop.getWrapper()
fmop_func_res = fmop_func([2, 3])
#for i in fmop_func_res:
# print(i)

def concatenate(function_a, function_b):
def executable(iterable):
return function_b(function_a(iterable))
return executable

res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
res_pro = res(["la", "lala"])
for i in res_pro:
print(i)
Loading

0 comments on commit fc94a5f

Please sign in to comment.