Skip to content

Commit

Permalink
Add configuration to python API for config ergonomics
Browse files Browse the repository at this point in the history
  • Loading branch information
juripetersen committed Jul 29, 2024
1 parent 6bbdcd4 commit 3526e7a
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 18 deletions.
6 changes: 4 additions & 2 deletions python/src/pywy/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

from typing import Set, Iterable
from typing import Set, Iterable, Dict
import json
import base64
import cloudpickle
Expand Down Expand Up @@ -74,7 +74,7 @@ class PywyPlan:
"""
graph: WayangGraph

def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
def __init__(self, plugins: Set[Plugin], configuration: Dict[str, str], sinks: Iterable[SinkOperator]):
"""basic Constructor of PywyPlan
this constructor set the plugins and sinks element, and it prepares
Expand All @@ -88,6 +88,7 @@ def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
Description of `sinks`.
"""
self.plugins = plugins
self.configuration = configuration
self.sinks = sinks
self.set_graph()

Expand All @@ -105,6 +106,7 @@ def execute(self):
context = {}
context["origin"] = "python"
context["platforms"] = {}
context["configuration"] = self.configuration

if len(self.plugins) > 0:
context["platforms"] = list(map(lambda pl: next(iter(pl.platforms)).name, self.plugins))
Expand Down
19 changes: 16 additions & 3 deletions python/src/pywy/dataquanta.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

from typing import Set, List, cast
from typing import Dict, Set, List, cast

from pywy.core.core import Plugin, PywyPlan
from pywy.operators.base import PO_T
Expand All @@ -25,14 +25,27 @@
from pywy.basic.model.option import Option
from pywy.basic.model.models import Model


class Configuration:
entries: Dict[str, str]

def __init__(self):
self.entries = {}

def set_property(self, key: str, value: str):
self.entries[key] = value


class WayangContext:
"""
This is the entry point for users to work with Wayang.
"""
plugins: Set[Plugin]
configuration: Configuration

def __init__(self):
def __init__(self, configuration: Configuration = Configuration()):
self.plugins = set()
self.configuration = configuration

"""
add a :class:`Plugin` to the :class:`Context`
Expand Down Expand Up @@ -172,7 +185,7 @@ def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = N
)
]
#print(PywyPlan(self.context.plugins, last))
PywyPlan(self.context.plugins, last).execute()
PywyPlan(self.context.plugins, self.context.configuration.entries, last).execute()

def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator:
self.operator.connect(0, op, port_op)
Expand Down
7 changes: 5 additions & 2 deletions python/src/pywy/tests/word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import unittest
from typing import Tuple, Callable, Iterable
from pywy.dataquanta import WayangContext
from pywy.dataquanta import WayangContext, Configuration
from unittest.mock import Mock
from pywy.platforms.java import JavaPlugin
from pywy.platforms.spark import SparkPlugin
Expand Down Expand Up @@ -53,8 +53,11 @@ def test_to_json(self):
.store_textfile("file:///var/www/html/data/wordcount-out-python.txt", (str, int))
self.assertEqual(True, True)

config = Configuration()
config.set_property("wayang.api.python.worker", "/var/www/html/python/src/pywy/execution/worker.py")

# named functions with signatures
ctx = WayangContext() \
ctx = WayangContext(config) \
.register({JavaPlugin, SparkPlugin}) \
.textfile("file:///var/www/html/README.md") \
.flatmap(fm_func) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import scala.collection.JavaConverters._
class JsonPlanBuilder() {

var planBuilder: PlanBuilder = _
var configuration: Configuration = null
var operators: Map[Long, OperatorFromJson] = Map()
var plugins: List[Plugin] = List(
Java.basicPlugin,
Expand All @@ -78,39 +79,46 @@ class JsonPlanBuilder() {
def fromPlan(plan: PlanFromJson): JsonPlanBuilder = {
setPlatforms(plan.context.platforms)
setOrigin(plan.context.origin)
setConfiguration(plan.context.configuration)
setOperators(plan.operators)

this
}

def setOperators(operators: List[OperatorFromJson]): JsonPlanBuilder = {
setOperatorsRec(operators)

var configuration: Configuration = null

def setConfiguration(config: Map[String, String]): JsonPlanBuilder = {
// Check if a wayang.properties file is declared using env variables, otherwise try default location.
val wayangPropertiesFile: String = sys.env.getOrElse("WAYANG_PROPERTIES_FILE", "file:///wayang.properties")

if (Files.exists(Paths.get("wayang.properties"))) {
println(s"Loading configuration from $wayangPropertiesFile.")
try {
configuration = new Configuration(wayangPropertiesFile)
this.configuration = new Configuration(wayangPropertiesFile)
}
catch {
case _: WayangException =>
println(s"Could not load configuration from $wayangPropertiesFile. Using default Wayang configuration file.")
configuration = new Configuration()
this.configuration = new Configuration()
}
}

// If no wayang.properties file can be found, load default configuration.
else {
println("Using default Wayang configuration file.")
configuration = new Configuration()
this.configuration = new Configuration()

if (config.size == 0) {
println("Using default Wayang configuration file.")
} else {
config.foreach(prop => this.configuration.setProperty(prop._1, prop._2))
}
}

this
}

def setOperators(operators: List[OperatorFromJson]): JsonPlanBuilder = {
setOperatorsRec(operators)

// Create context with plugins
val wayangContext = new WayangContext(configuration)
val wayangContext = new WayangContext(this.configuration)
plugins.foreach(plugin => wayangContext.withPlugin(plugin))

// Check if there is a jdbc remote input. If yes, set configuration appropriately
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson,

class ContextFromJson(val platforms: List[String],
val origin: String,
val configuration: Map[String, String],
) extends Serializable {

//
Expand Down

0 comments on commit 3526e7a

Please sign in to comment.