Skip to content

Commit

Permalink
feat(core): Support higher-order operators (eosphoros-ai#1984)
Browse files Browse the repository at this point in the history
Co-authored-by: 谨欣 <[email protected]>
  • Loading branch information
fangyinc and 谨欣 authored Sep 9, 2024
1 parent f6d5fc4 commit 65c875d
Show file tree
Hide file tree
Showing 62 changed files with 6,281 additions and 386 deletions.
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
.env
.git/
./.mypy_cache/
models/
plugins/
pilot/data
pilot/message
logs/
venv/
web/node_modules/
web/.next/
web/.env
docs/node_modules/
build/
docs/build/
Expand Down
4 changes: 4 additions & 0 deletions dbgpt/_private/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ def __init__(self) -> None:
os.getenv("MULTI_INSTANCE", "False").lower() == "true"
)

self.SCHEDULER_ENABLED = (
os.getenv("SCHEDULER_ENABLED", "True").lower() == "true"
)

@property
def local_db_manager(self) -> "ConnectorManager":
from dbgpt.datasource.manages import ConnectorManager
Expand Down
14 changes: 13 additions & 1 deletion dbgpt/app/component_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def initialize_components(
system_app.register(
DefaultExecutorFactory, max_workers=param.default_thread_pool_size
)
system_app.register(DefaultScheduler)
system_app.register(DefaultScheduler, scheduler_enable=CFG.SCHEDULER_ENABLED)
system_app.register_instance(controller)
system_app.register(ConnectorManager)

Expand All @@ -60,6 +60,7 @@ def initialize_components(
_initialize_openapi(system_app)
# Register serve apps
register_serve_apps(system_app, CFG, param.port)
_initialize_operators()


def _initialize_model_cache(system_app: SystemApp, port: int):
Expand Down Expand Up @@ -128,3 +129,14 @@ def _initialize_openapi(system_app: SystemApp):
from dbgpt.app.openapi.api_v1.editor.service import EditorService

system_app.register(EditorService)


def _initialize_operators():
from dbgpt.app.operators.converter import StringToInteger
from dbgpt.app.operators.datasource import (
HODatasourceExecutorOperator,
HODatasourceRetrieverOperator,
)
from dbgpt.app.operators.llm import HOLLMOperator, HOStreamingLLMOperator
from dbgpt.app.operators.rag import HOKnowledgeOperator
from dbgpt.serve.agent.resource.datasource import DatasourceResource
4 changes: 3 additions & 1 deletion dbgpt/app/initialization/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ def __init__(
system_app: SystemApp,
scheduler_delay_ms: int = 5000,
scheduler_interval_ms: int = 1000,
scheduler_enable: bool = True,
):
super().__init__(system_app)
self.system_app = system_app
self._scheduler_interval_ms = scheduler_interval_ms
self._scheduler_delay_ms = scheduler_delay_ms
self._stop_event = threading.Event()
self._scheduler_enable = scheduler_enable

def init_app(self, system_app: SystemApp):
self.system_app = system_app
Expand All @@ -39,7 +41,7 @@ def before_stop(self):

def _scheduler(self):
time.sleep(self._scheduler_delay_ms / 1000)
while not self._stop_event.is_set():
while self._scheduler_enable and not self._stop_event.is_set():
try:
schedule.run_pending()
except Exception as e:
Expand Down
4 changes: 4 additions & 0 deletions dbgpt/app/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Operators package.
This package contains all higher-order operators that are used to build workflows.
"""
186 changes: 186 additions & 0 deletions dbgpt/app/operators/converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""Type Converter Operators."""

from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import (
TAGS_ORDER_HIGH,
IOField,
OperatorCategory,
Parameter,
ViewMetadata,
)
from dbgpt.util.i18n_utils import _

_INPUTS_STRING = IOField.build_from(
_("String"),
"string",
str,
description=_("The string to be converted to other types."),
)
_INPUTS_INTEGER = IOField.build_from(
_("Integer"),
"integer",
int,
description=_("The integer to be converted to other types."),
)
_INPUTS_FLOAT = IOField.build_from(
_("Float"),
"float",
float,
description=_("The float to be converted to other types."),
)
_INPUTS_BOOLEAN = IOField.build_from(
_("Boolean"),
"boolean",
bool,
description=_("The boolean to be converted to other types."),
)

_OUTPUTS_STRING = IOField.build_from(
_("String"),
"string",
str,
description=_("The string converted from other types."),
)
_OUTPUTS_INTEGER = IOField.build_from(
_("Integer"),
"integer",
int,
description=_("The integer converted from other types."),
)
_OUTPUTS_FLOAT = IOField.build_from(
_("Float"),
"float",
float,
description=_("The float converted from other types."),
)
_OUTPUTS_BOOLEAN = IOField.build_from(
_("Boolean"),
"boolean",
bool,
description=_("The boolean converted from other types."),
)


class StringToInteger(MapOperator[str, int]):
"""Converts a string to an integer."""

metadata = ViewMetadata(
label=_("String to Integer"),
name="default_converter_string_to_integer",
description=_("Converts a string to an integer."),
category=OperatorCategory.TYPE_CONVERTER,
parameters=[],
inputs=[_INPUTS_STRING],
outputs=[_OUTPUTS_INTEGER],
tags={"order": TAGS_ORDER_HIGH},
)

def __init__(self, **kwargs):
"""Create a new StringToInteger operator."""
super().__init__(map_function=lambda x: int(x), **kwargs)


class StringToFloat(MapOperator[str, float]):
"""Converts a string to a float."""

metadata = ViewMetadata(
label=_("String to Float"),
name="default_converter_string_to_float",
description=_("Converts a string to a float."),
category=OperatorCategory.TYPE_CONVERTER,
parameters=[],
inputs=[_INPUTS_STRING],
outputs=[_OUTPUTS_FLOAT],
tags={"order": TAGS_ORDER_HIGH},
)

def __init__(self, **kwargs):
"""Create a new StringToFloat operator."""
super().__init__(map_function=lambda x: float(x), **kwargs)


class StringToBoolean(MapOperator[str, bool]):
"""Converts a string to a boolean."""

metadata = ViewMetadata(
label=_("String to Boolean"),
name="default_converter_string_to_boolean",
description=_("Converts a string to a boolean, true: 'true', '1', 'y'"),
category=OperatorCategory.TYPE_CONVERTER,
parameters=[
Parameter.build_from(
_("True Values"),
"true_values",
str,
optional=True,
default="true,1,y",
description=_("Comma-separated values that should be treated as True."),
)
],
inputs=[_INPUTS_STRING],
outputs=[_OUTPUTS_BOOLEAN],
tags={"order": TAGS_ORDER_HIGH},
)

def __init__(self, true_values: str = "true,1,y", **kwargs):
"""Create a new StringToBoolean operator."""
true_values_list = true_values.split(",")
true_values_list = [x.strip().lower() for x in true_values_list]
super().__init__(map_function=lambda x: x.lower() in true_values_list, **kwargs)


class IntegerToString(MapOperator[int, str]):
"""Converts an integer to a string."""

metadata = ViewMetadata(
label=_("Integer to String"),
name="default_converter_integer_to_string",
description=_("Converts an integer to a string."),
category=OperatorCategory.TYPE_CONVERTER,
parameters=[],
inputs=[_INPUTS_INTEGER],
outputs=[_OUTPUTS_STRING],
tags={"order": TAGS_ORDER_HIGH},
)

def __init__(self, **kwargs):
"""Create a new IntegerToString operator."""
super().__init__(map_function=lambda x: str(x), **kwargs)


class FloatToString(MapOperator[float, str]):
"""Converts a float to a string."""

metadata = ViewMetadata(
label=_("Float to String"),
name="default_converter_float_to_string",
description=_("Converts a float to a string."),
category=OperatorCategory.TYPE_CONVERTER,
parameters=[],
inputs=[_INPUTS_FLOAT],
outputs=[_OUTPUTS_STRING],
tags={"order": TAGS_ORDER_HIGH},
)

def __init__(self, **kwargs):
"""Create a new FloatToString operator."""
super().__init__(map_function=lambda x: str(x), **kwargs)


class BooleanToString(MapOperator[bool, str]):
"""Converts a boolean to a string."""

metadata = ViewMetadata(
label=_("Boolean to String"),
name="default_converter_boolean_to_string",
description=_("Converts a boolean to a string."),
category=OperatorCategory.TYPE_CONVERTER,
parameters=[],
inputs=[_INPUTS_BOOLEAN],
outputs=[_OUTPUTS_STRING],
tags={"order": TAGS_ORDER_HIGH},
)

def __init__(self, **kwargs):
"""Create a new BooleanToString operator."""
super().__init__(map_function=lambda x: str(x), **kwargs)
Loading

0 comments on commit 65c875d

Please sign in to comment.