Skip to content

Commit

Permalink
[WAYANG-#8] add dataquanta Tests and small corrections
Browse files Browse the repository at this point in the history
Signed-off-by: bertty <[email protected]>
  • Loading branch information
Bertty Contreras-Rojas authored and berttty committed Apr 8, 2022
1 parent d314760 commit e0da05b
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 23 deletions.
4 changes: 2 additions & 2 deletions python/old_code/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ def pre(a:str):
tic = time.perf_counter()
fileop = WayangContext()\
.register(python)\
.textFile("/Users/bertty/databloom/blossom/python/resources/tmp"+str(index))\
.textfile("/Users/bertty/databloom/blossom/python/resources/tmp" + str(index))\
.filter(pre)\
.storeTextFile("/Users/bertty/databloom/blossom/python/resources/out"+str(index))
.store_textfile("/Users/bertty/databloom/blossom/python/resources/out" + str(index))
toc = time.perf_counter()
print(f"Downloaded the tutorial in {toc - tic:0.4f} seconds")

Expand Down
38 changes: 19 additions & 19 deletions python/src/pywy/dataquanta.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from typing import Set
from typing import Set, List, cast

from pywy.core import Translator
from pywy.types import ( GenericTco, Predicate, Function, FlatmapFunction, IterableO )
from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO, T, I, O)
from pywy.operators import *
from pywy.core import PywyPlan
from pywy.core import Plugin


class WayangContext:
"""
This is the entry point for users to work with Wayang.
Expand All @@ -18,6 +19,7 @@ def __init__(self):
"""
add a :class:`Plugin` to the :class:`Context`
"""

def register(self, *plugins: Plugin):
for p in plugins:
self.plugins.add(p)
Expand All @@ -26,12 +28,13 @@ def register(self, *plugins: Plugin):
"""
remove a :class:`Plugin` from the :class:`Context`
"""

def unregister(self, *plugins: Plugin):
for p in plugins:
self.plugins.remove(p)
return self

def textFile(self, file_path: str) -> 'DataQuanta[str]':
def textfile(self, file_path: str) -> 'DataQuanta[str]':
return DataQuanta(self, TextFileSource(file_path))

def __str__(self):
Expand All @@ -40,43 +43,40 @@ def __str__(self):
def __repr__(self):
return self.__str__()


class DataQuanta(GenericTco):
"""
Represents an intermediate result/data flow edge in a [[WayangPlan]].
"""
previous : PywyOperator = None
context: WayangContext

def __init__(self, context:WayangContext, operator: PywyOperator):
def __init__(self, context: WayangContext, operator: PywyOperator):
self.operator = operator
self.context = context

def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" :
return DataQuanta(self.context, self.__connect(FilterOperator(p)))
def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]":
return DataQuanta(self.context, self._connect(FilterOperator(p)))

def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" :
return DataQuanta(self.context,self.__connect(MapOperator(f)))
def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]":
return DataQuanta(self.context, self._connect(MapOperator(f)))

def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
return DataQuanta(self.context,self.__connect(FlatmapOperator(f)))
def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]":
return DataQuanta(self.context, self._connect(FlatmapOperator(f)))

def storeTextFile(self: "DataQuanta[I]", path: str) :
last = self.__connect(TextFileSink(path))
plan = PywyPlan(self.context.plugins, [last])
def store_textfile(self: "DataQuanta[I]", path: str):
last: List[SinkOperator] = [cast(SinkOperator, self._connect(TextFileSink(path)))]
plan = PywyPlan(self.context.plugins, last)

plug = self.context.plugins.pop()
trs: Translator = Translator(plug, plan)
trs: Translator = Translator(plug, plan)
new_plan = trs.translate()
plug.get_executor().execute(new_plan)
# TODO add the logic to execute the plan

def __connect(self, op:PywyOperator, port_op: int = 0) -> PywyOperator:
def _connect(self, op: PywyOperator, port_op: int = 0) -> PywyOperator:
self.operator.connect(0, op, port_op)
return op

def getOperator(self):
return self.operator

def __str__(self):
return str(self.operator)

Expand Down
3 changes: 2 additions & 1 deletion python/src/pywy/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pywy.operators.base import PywyOperator
from pywy.operators.sink import TextFileSink
from pywy.operators.sink import TextFileSink, SinkOperator
from pywy.operators.source import TextFileSource
from pywy.operators.unary import FilterOperator, MapOperator, FlatmapOperator
#
Expand All @@ -8,6 +8,7 @@
TextFileSink,
TextFileSource,
FilterOperator,
SinkOperator
# MapOperator,
# FlatmapOperator
]
2 changes: 1 addition & 1 deletion python/src/pywy/tests/unit/dataquanta/context_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_textfile_withoutPlugin(self):
self.assertIsInstance(context, WayangContext)
self.assertEqual(len(context.plugins), 0)

dataQuanta = context.textFile(path)
dataQuanta = context.textfile(path)

self.assertIsInstance(dataQuanta, DataQuanta)
self.assertIsInstance(dataQuanta.operator, TextFileSource)
Expand Down
134 changes: 134 additions & 0 deletions python/src/pywy/tests/unit/dataquanta/dataquanta_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import unittest
from typing import Tuple, Callable
from unittest.mock import Mock

from pywy.dataquanta import WayangContext
from pywy.dataquanta import DataQuanta
from pywy.operators import *


class TestUnitCoreTranslator(unittest.TestCase):
context: WayangContext

def setUp(self):
self.context = Mock()
pass

def build_seed(self) -> Tuple[PywyOperator, DataQuanta]:
operator = PywyOperator("Empty")
dq = DataQuanta(self.context, operator)
return operator, dq

def test_create(self):
(operator, dq) = self.build_seed()

self.assertIsInstance(dq, DataQuanta)
self.assertEqual(dq.context, self.context)
self.assertEqual(dq.operator, operator)

def test_connect(self):
operator = PywyOperator("Empty1")
operator2 = PywyOperator("Empty2")
dq = DataQuanta(self.context, operator)

self.assertIsNone(operator2.inputOperator[0])
after_operator2 = dq._connect(operator2)
self.assertEqual(operator2, after_operator2)
self.assertIsNotNone(operator2.inputOperator[0])
self.assertEqual(operator, operator2.inputOperator[0])
self.assertEqual(operator.outputOperator[0], operator2)

def validate_filter(self, filtered: DataQuanta, operator: PywyOperator):
self.assertIsInstance(filtered, DataQuanta)
self.assertIsInstance(filtered.operator, FilterOperator)
self.assertEqual(filtered.context, self.context)
self.assertNotEqual(filtered.operator, operator)
self.assertEqual(filtered.operator.inputOperator[0], operator)

def test_filter_lambda(self):
(operator, dq) = self.build_seed()
pred: Callable = lambda x: "" in x
filtered = dq.filter(pred)
self.validate_filter(filtered, operator)

def test_filter_func(self):
(operator, dq) = self.build_seed()

def pred(x: str) -> bool:
return "" in x

filtered = dq.filter(pred)
self.validate_filter(filtered, operator)

def test_filter_func_lambda(self):
(operator, dq) = self.build_seed()

def pred(x):
return "" in x

filtered = dq.filter(lambda x: pred(x))
self.validate_filter(filtered, operator)

def validate_map(self, mapped: DataQuanta, operator: PywyOperator):
self.assertIsInstance(mapped, DataQuanta)
self.assertIsInstance(mapped.operator, MapOperator)
self.assertEqual(mapped.context, self.context)
self.assertNotEqual(mapped.operator, operator)
self.assertEqual(mapped.operator.inputOperator[0], operator)

def test_map_lambda(self):
(operator, dq) = self.build_seed()
func: Callable = lambda x: len(x)
mapped = dq.map(func)
self.validate_map(mapped, operator)

def test_map_func(self):
(operator, dq) = self.build_seed()

def func(x: str) -> int:
return len(x)

mapped = dq.map(func)
self.validate_map(mapped, operator)

def test_map_func_lambda(self):
(operator, dq) = self.build_seed()

def func(x):
return x == 0

mapped = dq.map(lambda x: func(x))
self.validate_map(mapped, operator)

def validate_flatmap(self, flatted: DataQuanta, operator: PywyOperator):
self.assertIsInstance(flatted, DataQuanta)
self.assertIsInstance(flatted.operator, FlatmapOperator)
self.assertEqual(flatted.context, self.context)
self.assertNotEqual(flatted.operator, operator)
self.assertEqual(flatted.operator.inputOperator[0], operator)

def test_flatmap_lambda(self):
(operator, dq) = self.build_seed()
func: Callable = lambda x: x.split(" ")
flatted = dq.flatmap(func)
self.validate_flatmap(flatted, operator)

def test_flatmap_func(self):
(operator, dq) = self.build_seed()

def fmfunc(i: str) -> str:
for x in range(len(i)):
yield str(x)

flatted = dq.flatmap(fmfunc)
self.validate_flatmap(flatted, operator)

def test_flatmap_func_lambda(self):
(operator, dq) = self.build_seed()

def fmfunc(i):
for x in range(len(i)):
yield str(x)

flatted = dq.flatmap(lambda x: fmfunc(x))
self.validate_flatmap(flatted, operator)

0 comments on commit e0da05b

Please sign in to comment.