Skip to content

Commit

Permalink
Wayang 8 (#89)
Browse files Browse the repository at this point in the history
* [WAYANG-8][API-PYTHON] Creation of functions to be consumed by MapPartitionsDescriptor

* [WAYANG-8][API-PYTHON] Included PythonProcessCaller that manages the python process execution and Java - Python connection

* [WAYANG-8][API-PYTHON] POM fixes plus minor test

* [WAYANG-8][API-PYTHON] Python connection through TCP socket enabled

* [WAYANG-8][API-PYTHON] Writing from Java to Python. Not taking into care about Iterator Datatypes.

* [WAYANG-8][API-PYTHON] Java Socket Writter improvements

* [WAYANG-8][API-PYTHON] Python UTF8 Deserializer included

* [WAYANG-8][API-PYTHON] Python UTF8 Reading Stream

* [WAYANG-8][API-PYTHON] Getting results from Python and continue processing

* [WAYANG-8][API-PYTHON] Config files for pywayang

* [WAYANG-8][API-PYTHON] Structures to save the plan with functional fashion plus most basic operators

* [WAYANG-8][API-PYTHON] Main program to test plan executions locally

* [WAYANG-8][API-PYTHON] Minor comments and
TODOs

* [WAYANG-8][API-PYTHON] Most basic test for protobuff communication with java

* [WAYANG-8][API-PYTHON] Addjacency list from PyWayang Plan

* [WAYANG-8][API-PYTHON] Graph traversal implementation with visitor pattern

* [WAYANG-8][API-PYTHON] Protobuf python message generator

* [WAYANG-8][API-PYTHON] Wayang Web Service project structure

* [WAYANG-8][API-PYTHON] Protobuf message generation fixes

* [WAYANG-8][API-PYTHON] Wayang Web Service executes most basic plans directly

* [WAYANG-8][API-PYTHON] Receiving Base64 passing to byte array and unpickling

* [WAYANG-8][API-PYTHON] Updated classes to process a single Serialized UDF

* [WAYANG-8][API-PYTHON] New test with single UDF

* [WAYANG-8][API-PYTHON] Protobuf command

* [WAYANG-8][API-PYTHON] Protobuf message template updated

* [WAYANG-8][API-PYTHON] POM fixes

* [WAYANG-8][API-PYTHON] License comments added

* [WAYANG-8][API-PYTHON] Correction on missing licenses

* [WAYANG-8][API-PYTHON] Serializable module creation

* [WAYANG-8][API-PYTHON] adding protoc to travis

* [WAYANG-8][API-PYTHON] protoc executable path correction

* [WAYANG-8][API-PYTHON] Commenting objc_class_prefix

* [WAYANG-8][API-PYTHON] Obtaining pipelines

* [WAYANG-8][API-PYTHON] Dataquanta writing message

* [WAYANG-8][API-PYTHON] Plan writer pipeline based adjustments

* [WAYANG-8][API-PYTHON] Operator Python executable indicator

* [WAYANG-8][API-PYTHON] Plan writer improved to use less sockets

* [WAYANG-8][API-PYTHON] New version of Wayang protobuf message

* [WAYANG-8][API-PYTHON] Wayang REST improved to allow multi pipelined executions

* [WAYANG-8][API-PYTHON] More test programs

* [WAYANG-8][API-PYTHON] Commentaries and logging for Graph module

* [WAYANG-8][API-PYTHON] Commentaries and logging for Orchestrator module

* [WAYANG-8][API-PYTHON] Commentaries and logging for Protobuf module

* [WAYANG-8][API-PYTHON] Fix usage of relative paths

* [WAYANG-8][API-PYTHON] Scripts to compile protobuf has been deleted. Now Maven executes them

* [WAYANG-8][API-PYTHON] Execution Log configuration

* [WAYANG-8][API-PYTHON] Fix - Python Map partition with single operator

* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan

* [WAYANG-8][API-PYTHON] Plugin selection through Plan Descriptor

* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan with Spark Execution

* [WAYANG-8][API-PYTHON] Pywayang sends protobuf message in API request as bytes using base64

* [WAYANG-8][API-PYTHON] New Operators Flatmap group by, reduce and Reduce By Key. Only Python Side.

* [WAYANG-8][API-PYTHON] Protobuf Wayang Plan message updated to allow more Complex Java-Python Operators

* [WAYANG-8][API-PYTHON] Adding TPC-H 1st Test

* [WAYANG-8][API-PYTHON] Last changes, not working

* [WAYANG-8] Fixing errors with dependencies

* [WAYANG-8] Fix to Pom versions problem

* [WAYANG-8] Protoc path updated

* [WAYANG-8] Correction in the pom.xml for flags

Signed-off-by: bertty <[email protected]>

Co-authored-by: Bertty Contreras-Rojas <[email protected]>
Signed-off-by: bertty <[email protected]>
  • Loading branch information
2 people authored and berttty committed Apr 8, 2022
1 parent 70f41b1 commit edab66a
Show file tree
Hide file tree
Showing 21 changed files with 1,979 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,8 @@
<exclude>**/README.md</exclude>
<exclude>**/general-todos.md</exclude>
<exclude>**/scala_1*</exclude>

<exclude>**/*pb2.py</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
20 changes: 20 additions & 0 deletions pywayang/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from config.config_reader import get_source_types
from config.config_reader import get_sink_types
from config.config_reader import get_boundary_types
51 changes: 51 additions & 0 deletions pywayang/config/config_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import configparser
import os


def get_boundary_types():
config = configparser.ConfigParser()
config.sections()
config.read('../config/pywayang_config.ini')
boundary_types = dict(config.items('BOUNDARY_TYPES'))
boundary_types.pop("variable_to_access")
return boundary_types.values()


def get_source_types():
config = configparser.ConfigParser()
#print("path: ", os.getcwd())
config.read("../config/pywayang_config.ini")
source_types = dict(config.items('SOURCE_TYPES'))
source_types.pop("variable_to_access")
return source_types.values()
#sections_list = config.sections()
#for section in sections_list:
# print(section)
#print("source_types")
#for x in source_types.values():
# print(x)

def get_sink_types():
config = configparser.ConfigParser()
#print("path: ", os.getcwd())
config.read("../config/pywayang_config.ini")
sink_types = dict(config.items('SINK_TYPES'))
sink_types.pop("variable_to_access")
return sink_types.values()
38 changes: 38 additions & 0 deletions pywayang/config/pywayang_config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

[DEFAULT]
variable_to_access = value

[INPUT]
txnname_mod = string1
txnmemo_mod = string2

[MODIFY]
txnname_mod = string3
txnmemo_mod = string4

[BOUNDARY_TYPES]
boundary_type_1 = union

[SOURCE_TYPES]
source_type_1 = source
source_type_2 = text

[SINK_TYPES]
sink_type_1 = sink
sink_type_2 = sonk
19 changes: 19 additions & 0 deletions pywayang/graph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import graph.graph
import graph.node
71 changes: 71 additions & 0 deletions pywayang/graph/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from graph.node import Node
import logging


# Adjacency Matrix used to analise the plan
class Graph:
def __init__(self):
self.graph = {}
self.nodes_no = 0
self.nodes = []

# Fills the Graph
def populate(self, sinks):
for sink in iter(sinks):
self.process_operator(sink)

# Add current operator and set dependencies
def process_operator(self, operator):
self.add_node(operator.operator_type, operator.id, operator)

if len(operator.previous) > 0:
for parent in operator.previous:
if parent:
self.add_node(parent.operator_type, parent.id, parent)
self.add_link(operator.id, parent.id, 1)
self.process_operator(parent)

def add_node(self, name, id, operator):
if id in self.nodes:
return

self.nodes_no += 1
self.nodes.append(id)
new_node = Node(name, id, operator)

self.graph[id] = new_node

def add_link(self, id_child, id_parent, e):
if id_child in self.nodes:
if id_parent in self.nodes:
self.graph[id_child].add_predecessor(id_parent, e)
self.graph[id_parent].add_successor(id_child, e)

def print_adjlist(self):

for key in self.graph:
logging.debug("Node: ", self.graph[key].operator_type, " - ", key)
for key2 in self.graph[key].predecessors:
logging.debug("- Parent: ", self.graph[key2].operator_type, " - ", self.graph[key].predecessors[key2], " - ", key2)
for key2 in self.graph[key].successors:
logging.debug("- Child: ", self.graph[key2].operator_type, " - ", self.graph[key].successors[key2], " - ", key2)

def get_node(self, id):
return self.graph[id]
48 changes: 48 additions & 0 deletions pywayang/graph/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import abc


class Element(metaclass=abc.ABCMeta):
@abc.abstractmethod
def accept(self, visitor, udf, orientation, last_iter):
pass


# Describes an Operator in the Graph
class Node(Element):
def __init__(self, operator_type, id, operator):
self.operator_type = operator_type
self.id = id
self.predecessors = {}
self.successors = {}
self.python_exec = operator.python_exec

# Temporal
self.operator = operator

def add_predecessor(self, id_parent, e):
self.predecessors[id_parent] = e

def add_successor(self, id_child, e):
self.successors[id_child] = e

# Nodes are visited by objects of class Visitant.
# Visitants are being used to execute a UDF through the Graph
def accept(self, visitor, udf, orientation, last_iter):
visitor.visit_node(self, udf, orientation, last_iter)
51 changes: 51 additions & 0 deletions pywayang/graph/traversal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from graph.visitant import Visitant
import logging


# Defines how a UDF will be applied over the Graph
class Traversal:

def __init__(self, graph, origin, udf):
self.graph = graph
self.origin = origin
self.udf = udf
self.app = Visitant(graph, [])

# Starting from Sinks or Sources sets an specific orientation
if origin[0].source:
self.orientation = "successors"
elif origin[0].sink:
self.orientation = "predecessors"
else:
logging.error("Origin point to traverse the plan wrongly defined")
return

for operator in iter(origin):
logging.debug("operator origin: " + str(operator.id))
node = graph.get_node(operator.id)
self.app.visit_node(
node=node,
udf=self.udf,
orientation=self.orientation,
last_iter=None
)

def get_collected_data(self):
return self.app.get_collection()
52 changes: 52 additions & 0 deletions pywayang/graph/visitant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import abc
import logging


class Visitor(metaclass=abc.ABCMeta):
@abc.abstractmethod
def visit_node(self, node, udf, orientation, last_iter):
pass


# Applies a UDF in current Node
class Visitant(Visitor):

def __init__(self, graph, results):
self.collection = results
self.graph = graph

# UDF can store results in ApplyFunction.collection whenever its requires.
# last_iter has the generated current value obtained in the previous iteration
def visit_node(self, node, udf, orientation, last_iter):
logging.debug("Applying UDf" + str(orientation))
current_value = udf(node, last_iter, self.collection)
logging.debug("orientation result " + str(getattr(node, orientation)))
next_iter = getattr(node, orientation)
if len(next_iter) > 0:
for next_iter_id in next_iter:
if next_iter_id:
logging.debug("next_id: " + str(next_iter_id))
next_iter_node = self.graph.get_node(next_iter_id)
logging.debug("next_iter_node: " + next_iter_node.operator_type + " " + str(next_iter_node.id))
next_iter_node.accept(visitor=self, udf=udf, orientation=orientation, last_iter=current_value)
pass

def get_collection(self):
return self.collection
20 changes: 20 additions & 0 deletions pywayang/orchestrator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import orchestrator.plan
import orchestrator.dataquanta
import graph.graph
Loading

0 comments on commit edab66a

Please sign in to comment.