Skip to content

Commit

Permalink
[WAYANG-#8] add TextFileSinkOperator
Browse files Browse the repository at this point in the history
Signed-off-by: bertty <[email protected]>
  • Loading branch information
Bertty Contreras-Rojas authored and berttty committed Apr 8, 2022
1 parent 6e04ecf commit 650e127
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 1 deletion.
5 changes: 5 additions & 0 deletions python/src/pywayang/dataquanta.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
from pywayang.operator.base import (WyOperator)
from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator)
from pywayang.operator.sink import TextFileSink


class DataQuanta(GenericTco):
Expand All @@ -22,6 +23,10 @@ def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" :
def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
return DataQuanta(FlatmapOperator(f))

def storeTextFile(self: "DataQuanta[I]", path: str) :
last = DataQuanta(TextFileSink(path))
# TODO add the logic to execute the plan

def getOperator(self):
return self.operator

Expand Down
28 changes: 28 additions & 0 deletions python/src/pywayang/operator/sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from pywayang.operator.base import WyOperator

class SinkUnaryOperator(WyOperator):

def __init__(self, name:str):
super().__init__(name, None, str, 0, 1)

def __str__(self):
return super().__str__()

def __repr__(self):
return super().__repr__()



class TextFileSink(SinkUnaryOperator):

path: str

def __init__(self, path: str):
super().__init__('TextFileSink')
self.path = path

def __str__(self):
return super().__str__()

def __repr__(self):
return super().__repr__()
1 change: 1 addition & 0 deletions python/src/pywayang/platforms/python/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ def __repr__(self):

OperatorMappings.add_mapping(PyFilterOperator())
OperatorMappings.add_mapping(PyTextFileSourceOperator())
OperatorMappings.add_mapping(PyTextFileSinkOperator())

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from pywayang.operator.sink import TextFileSink
from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
from pywayang.platforms.python.channels import (
Channel,
ChannelDescriptor,
PyIteratorChannel,
PyIteratorChannelDescriptor
)
from typing import Set

class PyTextFileSinkOperator(TextFileSink, PythonExecutionOperator):

def __init__(self, origin: TextFileSink = None):
path = None if origin is None else origin.path
super().__init__(path)
pass

def execute(self, inputs: Channel, outputs: Channel):
self.validateChannels(inputs, outputs)
if isinstance(inputs[0], PyIteratorChannel) :
file = open(self.path,'w')
py_in_iter_channel: PyIteratorChannel = inputs[0]
iterable = py_in_iter_channel.provide_iterable();
for element in iterable:
file.write(str(element))
file.close()

else:
raise Exception("Channel Type does not supported")


def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
return {PyIteratorChannelDescriptor}

def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
raise Exception("The PyTextFileSource does not support Output Channels")
4 changes: 3 additions & 1 deletion python/src/pywayang/platforms/python/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator
from pywayang.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator
from pywayang.platforms.python.operators.PyTextFileSinkOperator import PyTextFileSinkOperator

__ALL__ = [
PythonExecutionOperator,
PyFilterOperator,
PyTextFileSourceOperator
PyTextFileSourceOperator,
PyTextFileSinkOperator
]

0 comments on commit 650e127

Please sign in to comment.