diff --git a/nemoguardrails/actions/llm/generation.py b/nemoguardrails/actions/llm/generation.py index a230e5ce3..67ab83ef9 100644 --- a/nemoguardrails/actions/llm/generation.py +++ b/nemoguardrails/actions/llm/generation.py @@ -135,7 +135,7 @@ async def init(self): self._init_flows_index(), ) - def _extract_user_message_example(self, flow: Flow) -> None: + def _extract_user_message_example(self, flow: Flow): """Heuristic to extract user message examples from a flow.""" elements = [ item diff --git a/nemoguardrails/colang/runtime.py b/nemoguardrails/colang/runtime.py index a70bd9648..770feafb5 100644 --- a/nemoguardrails/colang/runtime.py +++ b/nemoguardrails/colang/runtime.py @@ -34,28 +34,32 @@ def __init__(self, config: RailsConfig, verbose: bool = False): # Register the actions with the dispatcher. self.action_dispatcher = ActionDispatcher( config_path=config.config_path, - import_paths=list(config.imported_paths.values()), + import_paths=list( + config.imported_paths.values() if config.imported_paths else [] + ), ) if hasattr(self, "_run_output_rails_in_parallel_streaming"): self.action_dispatcher.register_action( - self._run_output_rails_in_parallel_streaming, + getattr(self, "_run_output_rails_in_parallel_streaming"), name="run_output_rails_in_parallel_streaming", ) if hasattr(self, "_run_flows_in_parallel"): self.action_dispatcher.register_action( - self._run_flows_in_parallel, name="run_flows_in_parallel" + getattr(self, "_run_flows_in_parallel"), name="run_flows_in_parallel" ) if hasattr(self, "_run_input_rails_in_parallel"): self.action_dispatcher.register_action( - self._run_input_rails_in_parallel, name="run_input_rails_in_parallel" + getattr(self, "_run_input_rails_in_parallel"), + name="run_input_rails_in_parallel", ) if hasattr(self, "_run_output_rails_in_parallel"): self.action_dispatcher.register_action( - self._run_output_rails_in_parallel, name="run_output_rails_in_parallel" + getattr(self, "_run_output_rails_in_parallel"), + name="run_output_rails_in_parallel", ) # The list of additional parameters that can be passed to the actions. diff --git a/nemoguardrails/colang/v1_0/lang/colang_parser.py b/nemoguardrails/colang/v1_0/lang/colang_parser.py index 4b00c17fd..727194245 100644 --- a/nemoguardrails/colang/v1_0/lang/colang_parser.py +++ b/nemoguardrails/colang/v1_0/lang/colang_parser.py @@ -16,7 +16,7 @@ import json import re from ast import literal_eval -from typing import List, Optional +from typing import Any, Dict, List, Optional import yaml @@ -126,7 +126,7 @@ def __init__( self.current_params_indentation = 1 # The current element i.e. user, bot, event, if ... - self.current_element = None + self.current_element: Optional[Dict[str, Any]] = None # The flows that have been parsed self.flows = {} @@ -264,7 +264,7 @@ def _normalize_line_text(self): flow_hash = string_hash(flow_text) - self.text += " anonymous-" + flow_hash + self.text += " anonymous-" + str(flow_hash) # Below are some more advanced normalizations @@ -313,8 +313,9 @@ def _create_namespace(self, namespace): # Now, append the new one self.current_namespaces.append(namespace) self.current_namespace = ".".join(self.current_namespaces) - self.current_indentation = self.next_line["indentation"] - self.current_indentations.append(self.next_line["indentation"]) + next_indentation = self.next_line["indentation"] if self.next_line else 0 + self.current_indentation = next_indentation + self.current_indentations.append(next_indentation) # Reset the branches and the ifs on a new flow self.branches = [] @@ -335,7 +336,11 @@ def _ignore_block_body(self): def _include_source_mappings(self): # Include the source mapping information if required if self.include_source_mapping: - if self.current_element and "_source_mapping" not in self.current_element: + if ( + self.current_element is not None + and isinstance(self.current_element, dict) + and "_source_mapping" not in self.current_element + ): self.current_element["_source_mapping"] = { "filename": self.filename, "line_number": self.current_line["number"], @@ -790,7 +795,7 @@ def _process_define(self): # If we're dealing with a topic, then we expand the flow definition if define_token == "topic": - self._insert_topic_flow_definition() + # TODO: Implement topic flow definition insertion return # Compute the symbol type @@ -957,14 +962,18 @@ def _extract_params(self, param_lines: Optional[List] = None): if isinstance(yaml_value, str): yaml_value = {"$0": yaml_value} - # self.current_element.update(yaml_value) - for k in yaml_value.keys(): - # if the key tarts with $, we remove it - param_name = k - if param_name[0] == "$": - param_name = param_name[1:] + if ( + self.current_element is not None + and isinstance(self.current_element, dict) + and yaml_value is not None + ): + for k in yaml_value.keys(): + # if the key tarts with $, we remove it + param_name = k + if param_name[0] == "$": + param_name = param_name[1:] - self.current_element[param_name] = yaml_value[k] + self.current_element[param_name] = yaml_value[k] def _is_test_flow(self): """Returns true if the current flow is a test one. @@ -1005,11 +1014,13 @@ def _is_sample_flow(self): def _parse_when(self): # TODO: deal with "when" after "else when" assert ( - self.next_line["indentation"] > self.current_line["indentation"] + self.next_line is not None + and self.next_line["indentation"] > self.current_line["indentation"] ), "Expected indented block after 'when' statement." # Create the new branch - new_branch = {"elements": [], "indentation": self.next_line["indentation"]} + next_indentation = self.next_line["indentation"] if self.next_line else 0 + new_branch = {"elements": [], "indentation": next_indentation} # # on else, we need to pop the previous branch # if self.main_token == "else when": @@ -1040,13 +1051,16 @@ def _parse_when(self): # continue # else # ... + next_indentation = ( + self.next_line["indentation"] if self.next_line else 0 + ) self.lines.insert( self.current_line_idx + 1, { "text": f"continue", # We keep the line mapping the same "number": self.current_line["number"], - "indentation": self.next_line["indentation"], + "indentation": next_indentation, }, ) self.lines.insert( @@ -1320,9 +1334,11 @@ def _parse_bot(self): "text": f"{utterance_text}", # We keep the line mapping the same "number": self.current_line["number"], - "indentation": self.current_indentation + 2 - if i == len(indented_lines) - else indented_lines[i]["indentation"], + "indentation": ( + self.current_indentation + 2 + if i == len(indented_lines) + else indented_lines[i]["indentation"] + ), }, ) @@ -1343,7 +1359,9 @@ def _parse_bot(self): if utterance_id is None: self.current_element["bot"] = { "_type": "element", - "text": utterance_text[1:-1], + "text": ( + utterance_text[1:-1] if utterance_text is not None else "" + ), } # if we have quick_replies, we move them in the element @@ -1361,7 +1379,13 @@ def _parse_bot(self): # If there was a bot message with a snippet, we also add an expect # TODO: can this be handled better? try: - if "snippet" in self.current_element["bot"]: + if ( + self.current_element is not None + and isinstance(self.current_element, dict) + and "bot" in self.current_element + and isinstance(self.current_element["bot"], dict) + and "snippet" in self.current_element["bot"] + ): self.branches[-1]["elements"].append( { "expect": "snippet", @@ -1425,7 +1449,8 @@ def _parse_do(self): # if we need to save the return values, we store the info if "=" in flow_name: - return_vars, flow_name = get_stripped_tokens(split_max(flow_name, "=", 1)) + stripped_tokens = get_stripped_tokens(split_max(flow_name, "=", 1)) + return_vars, flow_name = stripped_tokens[0], stripped_tokens[1] else: return_vars = None @@ -1475,8 +1500,9 @@ def _parse_meta(self): branch_elements.insert(0, {"meta": {}}) # Update the elements coming from the parameters - for k in self.current_element.keys(): - branch_elements[0]["meta"][k] = self.current_element[k] + if self.current_element is not None: + for k in self.current_element.keys(): + branch_elements[0]["meta"][k] = self.current_element[k] def _parse_generic(self): value = split_max(self.text, " ", 1)[1].strip() @@ -1545,7 +1571,9 @@ def _parse_if_branch(self, if_condition): self.ifs.append( { "element": self.current_element, - "indentation": self.next_line["indentation"], + "indentation": ( + self.next_line["indentation"] if self.next_line is not None else 0 + ), # We also record this to match it with the else "keyword_indentation": self.current_indentation, } @@ -1588,7 +1616,9 @@ def _parse_while(self): self.branches.append( { "elements": self.current_element["do"], - "indentation": self.next_line["indentation"], + "indentation": ( + self.next_line["indentation"] if self.next_line is not None else 0 + ), } ) @@ -1602,7 +1632,9 @@ def _parse_any(self): self.branches.append( { "elements": self.current_element["any"], - "indentation": self.next_line["indentation"], + "indentation": ( + self.next_line["indentation"] if self.next_line is not None else 0 + ), } ) @@ -1631,7 +1663,9 @@ def _parse_infer(self): self.branches.append( { "elements": self.current_element["infer"], - "indentation": self.next_line["indentation"], + "indentation": ( + self.next_line["indentation"] if self.next_line is not None else 0 + ), } ) @@ -1767,15 +1801,15 @@ def parse(self): exception = Exception(error) # Decorate the exception with where the parsing failed - exception.filename = self.filename - exception.line = self.current_line["number"] - exception.error = str(ex) + setattr(exception, "filename", self.filename) + setattr(exception, "line", self.current_line["number"]) + setattr(exception, "error", str(ex)) raise exception self.current_line_idx += 1 - result = {"flows": self.flows} + result: Dict[str, Any] = {"flows": self.flows} if self.imports: result["imports"] = self.imports @@ -1818,7 +1852,7 @@ def parse_snippets_and_imports(self): """ snippets = {} imports = [] - snippet = None + snippet: Optional[Dict[str, Any]] = None while self.current_line_idx < len(self.lines): self._fetch_current_line() @@ -1833,6 +1867,7 @@ def parse_snippets_and_imports(self): for k in self.current_line.keys(): d[k] = self.current_line[k] d["filename"] = self.filename + assert snippet is not None # Type checker hint snippet["lines"].append(d) self.current_line_idx += 1 diff --git a/nemoguardrails/colang/v1_0/lang/comd_parser.py b/nemoguardrails/colang/v1_0/lang/comd_parser.py index 56a695f3c..1c07cba86 100644 --- a/nemoguardrails/colang/v1_0/lang/comd_parser.py +++ b/nemoguardrails/colang/v1_0/lang/comd_parser.py @@ -362,21 +362,22 @@ def parse_md_file(file_name, content=None): continue # Make sure we have the type of the symbol in the name of the symbol - sym = _get_typed_symbol_name(sym, symbol_type) + if sym is not None: + sym = _get_typed_symbol_name(sym, symbol_type) - # For objects, we translate the "string" type to "kb:Object:prop|partial" - param_type = _get_param_type(parts[1]) - if symbol_type == "object" and param_type in ["string", "text"]: - object_name = split_max(sym, ":", 1)[1] - param_type = f"kb:{object_name}:{parts[0]}|partial" + # For objects, we translate the "string" type to "kb:Object:prop|partial" + param_type = _get_param_type(parts[1]) + if symbol_type == "object" and param_type in ["string", "text"]: + object_name = split_max(sym, ":", 1)[1] + param_type = f"kb:{object_name}:{parts[0]}|partial" - # TODO: figure out a cleaner way to deal with this - # For the "type:time" type, we transform it into "lookup:time" - if param_type == "type:time": - param_type = "lookup:time" + # TODO: figure out a cleaner way to deal with this + # For the "type:time" type, we transform it into "lookup:time" + if param_type == "type:time": + param_type = "lookup:time" - result["mappings"].append((f"{sym}:{parts[0]}", param_type)) - symbol_params.append(parts[0]) + result["mappings"].append((f"{sym}:{parts[0]}", param_type)) + symbol_params.append(parts[0]) elif line.startswith("-") or line.startswith("*"): if sym is None: diff --git a/nemoguardrails/colang/v1_0/lang/coyml_parser.py b/nemoguardrails/colang/v1_0/lang/coyml_parser.py index 029d260f8..5fc376729 100644 --- a/nemoguardrails/colang/v1_0/lang/coyml_parser.py +++ b/nemoguardrails/colang/v1_0/lang/coyml_parser.py @@ -421,14 +421,20 @@ def _extract_elements(items: List) -> List[dict]: # for `if` flow elements, we have to go recursively if element["_type"] == "if": if_element = element - then_elements = _extract_elements(if_element["then"]) - else_elements = _extract_elements(if_element["else"]) + then_items = ( + if_element["then"] if isinstance(if_element["then"], list) else [] + ) + else_items = ( + if_element["else"] if isinstance(if_element["else"], list) else [] + ) + then_elements = _extract_elements(then_items) + else_elements = _extract_elements(else_items) # Remove the raw info del if_element["then"] del if_element["else"] - if_element["_next_else"] = len(then_elements) + 1 + if_element["_next_else"] = str(len(then_elements) + 1) # Add the "if" elements.append(if_element) @@ -438,8 +444,10 @@ def _extract_elements(items: List) -> List[dict]: # if we have "else" elements, we need to adjust also add a jump if len(else_elements) > 0: - elements.append({"_type": "jump", "_next": len(else_elements) + 1}) - if_element["_next_else"] += 1 + elements.append( + {"_type": "jump", "_next": str(len(else_elements) + 1)} + ) + if_element["_next_else"] = str(int(if_element["_next_else"]) + 1) # Add the "else" elements elements.extend(else_elements) @@ -447,21 +455,24 @@ def _extract_elements(items: List) -> List[dict]: # WHILE elif element["_type"] == "while": while_element = element - do_elements = _extract_elements(while_element["do"]) + do_items = ( + while_element["do"] if isinstance(while_element["do"], list) else [] + ) + do_elements = _extract_elements(do_items) n = len(do_elements) # Remove the raw info del while_element["do"] # On break we have to skip n elements and 1 jump, hence we go to n+2 - while_element["_next_on_break"] = n + 2 + while_element["_next_on_break"] = str(n + 2) # We need to compute the jumps on break and on continue for each element for j in range(n): # however, we make sure we don't override an inner loop if "_next_on_break" not in do_elements[j]: - do_elements[j]["_next_on_break"] = n + 1 - j - do_elements[j]["_next_on_continue"] = -1 * j - 1 + do_elements[j]["_next_on_break"] = str(n + 1 - j) + do_elements[j]["_next_on_continue"] = str(-1 * j - 1) # Add the "while" elements.append(while_element) @@ -501,7 +512,7 @@ def _extract_elements(items: List) -> List[dict]: branch_element = { "_type": "branch", # these are the relative positions to the current position - "branch_heads": [], + "branch_heads": [], # type: ignore } branch_element_pos = len(elements) elements.append(branch_element) @@ -525,7 +536,7 @@ def _extract_elements(items: List) -> List[dict]: ] # Create the jump element - jump_element = {"_type": "jump", "_next": 1} + jump_element = {"_type": "jump", "_next": 1} # type: ignore # We compute how far we need to jump based on the remaining branches j = branch_idx + 1 diff --git a/nemoguardrails/colang/v1_0/lang/utils.py b/nemoguardrails/colang/v1_0/lang/utils.py index bde3e33d3..cde9cae51 100644 --- a/nemoguardrails/colang/v1_0/lang/utils.py +++ b/nemoguardrails/colang/v1_0/lang/utils.py @@ -88,11 +88,14 @@ def get_numbered_lines(content: str): current_comment = None multiline_string = False current_string = None + multiline_indentation = 0 while i < len(raw_lines): raw_line = raw_lines[i].strip() # handle multiline string if multiline_string: + if current_string is None: + current_string = "" current_string += "\n" + raw_line if raw_line.endswith('"'): multiline_string = False @@ -145,6 +148,8 @@ def get_numbered_lines(content: str): continue if multiline_comment: + if current_comment is None: + current_comment = "" if raw_line.endswith('"""'): current_comment += "\n" + raw_line[0:-3] multiline_comment = False diff --git a/nemoguardrails/colang/v1_0/runtime/flows.py b/nemoguardrails/colang/v1_0/runtime/flows.py index 9654c5029..16d206589 100644 --- a/nemoguardrails/colang/v1_0/runtime/flows.py +++ b/nemoguardrails/colang/v1_0/runtime/flows.py @@ -19,10 +19,14 @@ from dataclasses import dataclass, field from enum import Enum from time import time -from typing import Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +if TYPE_CHECKING: + from nemoguardrails.rails.llm.config import RailsConfig from nemoguardrails.colang.v1_0.runtime.eval import eval_expression from nemoguardrails.colang.v1_0.runtime.sliding import slide +from nemoguardrails.rails.llm.config import RailsConfig from nemoguardrails.utils import new_event_dict, new_uuid @@ -92,7 +96,7 @@ class FlowState: status: FlowStatus = FlowStatus.ACTIVE # The UID of the flows that interrupted this one - interrupted_by = None + interrupted_by: Optional[str] = None @dataclass @@ -260,10 +264,10 @@ def _call_subflow(new_state: State, flow_state: FlowState) -> Optional[FlowState # Basic support for referring a subflow using a variable if subflow_id.startswith("$"): - subflow_id = eval_expression(subflow_id, new_state.context) + subflow_id = str(eval_expression(subflow_id, new_state.context)) # parameter support - if _flow_id_has_params(subflow_id): + if isinstance(subflow_id, str) and _flow_id_has_params(subflow_id): flow_params = _get_flow_params(subflow_id) subflow_id = _normalize_flow_id(subflow_id) new_state.context_updates.update(flow_params) @@ -316,7 +320,9 @@ def _slide_with_subflows(state: State, flow_state: FlowState) -> Optional[int]: should_continue = True while should_continue: should_continue = False - flow_state.head = slide(state, flow_config, flow_state.head) + new_head = slide(state, flow_config, flow_state.head) + if new_head is not None: + flow_state.head = new_head # We check if we reached a point where we need to call a subflow if flow_state.head >= 0: @@ -329,6 +335,8 @@ def _slide_with_subflows(state: State, flow_state: FlowState) -> Optional[int]: # And if we don't have a next step yet, we set it to the next element _record_next_step(state, flow_state, flow_config) + return None + def compute_next_state(state: State, event: dict) -> State: """Computes the next state of the flow-driven system. @@ -470,6 +478,8 @@ def compute_next_state(state: State, event: dict) -> State: # We try to slide first, just in case a flow starts with sliding logic start_head = slide(new_state, flow_config, 0) + if start_head is None: + start_head = 0 # If the first element matches the current event, # or, if the flow is explicitly started by a `start_flow` event, @@ -483,7 +493,7 @@ def compute_next_state(state: State, event: dict) -> State: uid=flow_uid, flow_id=flow_config.id, # When we have a match, we skip the element that was matched and move the head to the next one - head=start_head + (1 if _is_start_match else 0), + head=int(start_head) + (1 if _is_start_match else 0), ) if params := event.get("params"): new_state.context_updates.update(params) @@ -523,6 +533,7 @@ def compute_next_state(state: State, event: dict) -> State: # We are only interested when the extension flow actually decided, not just started. if ( decision_flow_config + and decision_flow_state is not None and decision_flow_config.is_extension and decision_flow_state.head > 1 ): @@ -610,7 +621,7 @@ def _step_to_event(step: dict) -> dict: def compute_next_steps( history: List[dict], flow_configs: Dict[str, FlowConfig], - rails_config: "RailsConfig", + rails_config: RailsConfig, processing_log: List[dict], ) -> List[dict]: """Computes the next step in a flow-driven system given a history of events. @@ -629,7 +640,7 @@ def compute_next_steps( ) # First, we process the history and apply any alterations e.g. 'hide_prev_turn' - actual_history = [] + actual_history: List[dict] = [] for event in history: if event["type"] == "hide_prev_turn": # we look up the last `UtteranceUserActionFinished` event and remove everything after @@ -709,14 +720,16 @@ def compute_context(history: List[dict]): Returns: dict: The computed context. """ - context = { + context: Dict[str, Any] = { "last_user_message": None, "last_bot_message": None, } for event in history: if event["type"] == "ContextUpdate": - context.update(event["data"]) + data = event.get("data") + if data is not None and isinstance(data, dict): + context.update(data) if event["type"] == "UserMessage": context["last_user_message"] = event["text"] @@ -730,7 +743,7 @@ def compute_context(history: List[dict]): return context -def _get_flow_params(flow_id: str) -> dict: +def _get_flow_params(flow_id: str) -> Dict[str, Optional[str]]: """Return the arguments in a flow id as a dictionary. Args: @@ -740,7 +753,7 @@ def _get_flow_params(flow_id: str) -> dict: A dictionary of arguments in the flow id. """ flow_id = flow_id.strip() - params = {} + params: Dict[str, Optional[str]] = {} if "(" in flow_id and ")" in flow_id: arg_string = flow_id.split("(")[1].split(")")[0] diff --git a/nemoguardrails/colang/v1_0/runtime/runtime.py b/nemoguardrails/colang/v1_0/runtime/runtime.py index f97c5bb3c..77b2daa6c 100644 --- a/nemoguardrails/colang/v1_0/runtime/runtime.py +++ b/nemoguardrails/colang/v1_0/runtime/runtime.py @@ -716,7 +716,8 @@ async def _process_start_action(self, events: List[dict]) -> List[dict]: if isinstance(result, ActionResult): return_value = result.return_value return_events = result.events - context_updates.update(result.context_updates) + if result.context_updates is not None: + context_updates.update(result.context_updates) # If we have an action result key, we also record the update. if action_result_key: @@ -794,10 +795,17 @@ async def _get_action_resp( ) except Exception as e: log.info(f"Exception {e} while making request to {action_name}") + if not isinstance(result, dict): + result = {"value": result} return result, status except Exception as e: log.info(f"Failed to get response from {action_name} due to exception {e}") + + # Ensure result is a dict as expected by the return type + if not isinstance(result, dict): + result = {"value": result} + return result, status async def _process_start_flow( diff --git a/nemoguardrails/colang/v1_0/runtime/sliding.py b/nemoguardrails/colang/v1_0/runtime/sliding.py index 6ec39c49a..ed31c7d8c 100644 --- a/nemoguardrails/colang/v1_0/runtime/sliding.py +++ b/nemoguardrails/colang/v1_0/runtime/sliding.py @@ -14,7 +14,10 @@ # limitations under the License. import logging -from typing import Optional +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from nemoguardrails.colang.v1_0.runtime.flows import FlowConfig, State from nemoguardrails.colang.v1_0.runtime.eval import eval_expression diff --git a/nemoguardrails/colang/v2_x/lang/colang_ast.py b/nemoguardrails/colang/v2_x/lang/colang_ast.py index 3fa841b62..ff489eceb 100644 --- a/nemoguardrails/colang/v2_x/lang/colang_ast.py +++ b/nemoguardrails/colang/v2_x/lang/colang_ast.py @@ -77,13 +77,17 @@ def get(self, key, default_value=None): def __eq__(self, other): if isinstance(other, self.__class__): - return self.__hash__() == other.__hash__() + return self.hash() == other.hash() return NotImplemented def hash(self): """Return the hash for the current object.""" return hash(_make_hashable(self)) + def __hash__(self): + """Return the hash for the current object.""" + return self.hash() + ElementType = Union[Element, dict] diff --git a/nemoguardrails/colang/v2_x/lang/expansion.py b/nemoguardrails/colang/v2_x/lang/expansion.py index 8479c761e..f7e94a917 100644 --- a/nemoguardrails/colang/v2_x/lang/expansion.py +++ b/nemoguardrails/colang/v2_x/lang/expansion.py @@ -114,7 +114,12 @@ def expand_elements( if e.args[0]: error = e.args[0] - if hasattr(element, "_source") and element._source: + if ( + not isinstance(element, dict) + and hasattr(element, "_source") + and element._source is not None + and hasattr(element._source, "line") + ): # TODO: Resolve source line to Colang file level raise ColangSyntaxError( error + f" on source line {element._source.line}" @@ -435,10 +440,15 @@ def _expand_match_element( for idx, element in enumerate(and_group["elements"]): new_elements.append(event_label_elements[idx]) + # Ensure element is valid for SpecOp + if isinstance(element, (dict, Spec)): + spec_element: Union[dict, Spec] = element + else: + spec_element = {} new_elements.append( SpecOp( op="match", - spec=element, + spec=spec_element, ) ) new_elements.append(goto_end_element) @@ -455,8 +465,8 @@ def _expand_match_element( else: # Multiple and-groups combined by or - fork_uid: str = new_var_uuid() - fork_element = ForkHead(fork_uid=fork_uid) + or_fork_uid: str = new_var_uuid() + fork_element = ForkHead(fork_uid=or_fork_uid) group_label_elements: List[Label] = [] failure_label_name = f"failure_label_{new_var_uuid()}" failure_label_element = Label(name=failure_label_name) @@ -485,12 +495,12 @@ def _expand_match_element( new_elements.append(failure_label_element) new_elements.append(WaitForHeads(number=len(or_group))) - new_elements.append(MergeHeads(fork_uid=fork_uid)) + new_elements.append(MergeHeads(fork_uid=or_fork_uid)) new_elements.append(CatchPatternFailure(label=None)) new_elements.append(Abort()) new_elements.append(end_label_element) - new_elements.append(MergeHeads(fork_uid=fork_uid)) + new_elements.append(MergeHeads(fork_uid=or_fork_uid)) new_elements.append(CatchPatternFailure(label=None)) else: diff --git a/nemoguardrails/colang/v2_x/lang/parser.py b/nemoguardrails/colang/v2_x/lang/parser.py index 8f03c451a..5795e211a 100644 --- a/nemoguardrails/colang/v2_x/lang/parser.py +++ b/nemoguardrails/colang/v2_x/lang/parser.py @@ -19,6 +19,7 @@ import textwrap import yaml +from lark import ParseTree from nemoguardrails.colang.v2_x.lang.colang_ast import Flow, Import from nemoguardrails.colang.v2_x.lang.grammar.load import load_lark_parser @@ -40,7 +41,7 @@ def __init__(self, include_source_mapping: bool = False): # Initialize the Lark Parser self._lark_parser = load_lark_parser(self.grammar_path) - def get_parsing_tree(self, content: str) -> dict: + def get_parsing_tree(self, content: str) -> ParseTree: """Helper to get only the parsing tree. Args: @@ -141,9 +142,10 @@ def parse_content( result["import_paths"].append(import_el.path) else: # If we have a package name, we need to translate it to a path - result["import_paths"].append( - os.path.join(*import_el.package.split(".")) - ) + if import_el.package: + result["import_paths"].append( + os.path.join(*import_el.package.split(".")) + ) return result diff --git a/nemoguardrails/colang/v2_x/lang/transformer.py b/nemoguardrails/colang/v2_x/lang/transformer.py index ed6ed922c..b30653850 100644 --- a/nemoguardrails/colang/v2_x/lang/transformer.py +++ b/nemoguardrails/colang/v2_x/lang/transformer.py @@ -174,15 +174,17 @@ def _flow_def(self, children: dict, meta: Meta) -> Flow: assert member_name_el["_type"] == "var_name" member_name = member_name_el["elements"][0][1:] - member_def = FlowReturnMemberDef(name=member_name) + return_member_def_obj = FlowReturnMemberDef(name=member_name) # If we have a default value, we also use that if len(return_member_def["elements"]) == 2: default_value_el = return_member_def["elements"][1] assert default_value_el["_type"] == "expr" - member_def.default_value_expr = default_value_el["elements"][0] + return_member_def_obj.default_value_expr = default_value_el[ + "elements" + ][0] - return_member_defs.append(member_def) + return_member_defs.append(return_member_def_obj) elements[0:0] = [ SpecOp( @@ -558,7 +560,7 @@ def _non_var_spec_and(self, children: list, meta: Meta) -> dict: val["_source"] = self.__source(meta) return val - def __default__(self, data, children: list, meta: Meta) -> dict: + def __default__(self, data, children: list, meta: Meta) -> Any: """Default function that is called if there is no attribute matching ``data`` Can be overridden. Defaults to creating diff --git a/nemoguardrails/colang/v2_x/lang/utils.py b/nemoguardrails/colang/v2_x/lang/utils.py index c1e6fad79..579f77aea 100644 --- a/nemoguardrails/colang/v2_x/lang/utils.py +++ b/nemoguardrails/colang/v2_x/lang/utils.py @@ -18,7 +18,7 @@ def dataclass_to_dict(obj: Any) -> Any: - if is_dataclass(obj): + if is_dataclass(obj) and not isinstance(obj, type): return {k: dataclass_to_dict(v) for k, v in asdict(obj).items()} elif isinstance(obj, list): return [dataclass_to_dict(v) for v in obj] diff --git a/nemoguardrails/colang/v2_x/runtime/eval.py b/nemoguardrails/colang/v2_x/runtime/eval.py index 4c5998e8c..56073fd15 100644 --- a/nemoguardrails/colang/v2_x/runtime/eval.py +++ b/nemoguardrails/colang/v2_x/runtime/eval.py @@ -216,7 +216,8 @@ def _regex_findall(pattern: str, string: str) -> List[str]: def _pretty_str(data: Any) -> str: if isinstance(data, (dict, list, set)): string = json.dumps(data, indent=4) - return SimplifyFormatter().format(string) + # SimplifyFormatter.format() accepts string as well as LogRecord + return str(SimplifyFormatter().format(string)) # type: ignore return str(data) @@ -263,27 +264,27 @@ def _get_type(val: Any) -> str: def _less_than_operator(v_ref: Any) -> ComparisonExpression: """Create less then comparison expression.""" - return ComparisonExpression(lambda val, v_ref=v_ref: val < v_ref, v_ref) + return ComparisonExpression(lambda val: val < v_ref, v_ref) def _equal_or_less_than_operator(v_ref: Any) -> ComparisonExpression: """Create equal or less than comparison expression.""" - return ComparisonExpression(lambda val, val_ref=v_ref: val <= val_ref, v_ref) + return ComparisonExpression(lambda val: val <= v_ref, v_ref) def _greater_than_operator(v_ref: Any) -> ComparisonExpression: """Create less then comparison expression.""" - return ComparisonExpression(lambda val, val_ref=v_ref: val > val_ref, v_ref) + return ComparisonExpression(lambda val: val > v_ref, v_ref) def _equal_or_greater_than_operator(v_ref: Any) -> ComparisonExpression: """Create equal or less than comparison expression.""" - return ComparisonExpression(lambda val, val_ref=v_ref: val >= val_ref, v_ref) + return ComparisonExpression(lambda val: val >= v_ref, v_ref) def _not_equal_to_operator(v_ref: Any) -> ComparisonExpression: """Create a not equal comparison expression.""" - return ComparisonExpression(lambda val, val_ref=v_ref: val != val_ref, v_ref) + return ComparisonExpression(lambda val: val != v_ref, v_ref) def _flows_info(state: State, flow_instance_uid: Optional[str] = None) -> dict: diff --git a/nemoguardrails/colang/v2_x/runtime/flows.py b/nemoguardrails/colang/v2_x/runtime/flows.py index 053f43e65..a36b1aa30 100644 --- a/nemoguardrails/colang/v2_x/runtime/flows.py +++ b/nemoguardrails/colang/v2_x/runtime/flows.py @@ -23,7 +23,20 @@ from dataclasses import dataclass, field from datetime import datetime from enum import Enum -from typing import Any, Callable, Deque, Dict, List, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Deque, + Dict, + List, + Optional, + Tuple, + Union, +) + +if TYPE_CHECKING: + from nemoguardrails.rails.llm.config import RailsConfig from dataclasses_json import dataclass_json diff --git a/nemoguardrails/colang/v2_x/runtime/runtime.py b/nemoguardrails/colang/v2_x/runtime/runtime.py index 20044b8a6..aee541d90 100644 --- a/nemoguardrails/colang/v2_x/runtime/runtime.py +++ b/nemoguardrails/colang/v2_x/runtime/runtime.py @@ -45,7 +45,8 @@ from nemoguardrails.rails.llm.config import RailsConfig from nemoguardrails.utils import new_event_dict, new_readable_uuid -langchain.debug = False +if hasattr(langchain, "debug"): + langchain.debug = False # pyright: ignore log = logging.getLogger(__name__) @@ -147,9 +148,13 @@ async def _remove_flows_action(self, state: "State", **args: dict) -> None: def _init_flow_configs(self) -> None: """Initializes the flow configs based on the config.""" - self.flow_configs = create_flow_configs_from_flow_list(self.config.flows) + # Type cast to satisfy the type checker - the function handles dicts internally + flows = self.config.flows # type: ignore + self.flow_configs = create_flow_configs_from_flow_list(flows) # type: ignore - async def generate_events(self, events: List[dict]) -> List[dict]: + async def generate_events( + self, events: List[dict], processing_log: Optional[List[dict]] = None + ) -> List[dict]: raise NotImplementedError("Stateless API not supported for Colang 2.x, yet.") @staticmethod @@ -271,7 +276,7 @@ async def _process_start_action( "I'm sorry, an internal error has occurred." ) - return_value = result + return_value: Any = result return_events: List[dict] = [] context_updates: dict = {} @@ -317,10 +322,10 @@ async def _get_action_resp( f"Got status code {resp.status} while getting response from {action_name}" ) - resp = await resp.json() + response_data = await resp.json() result, status = ( - resp.get("result", result), - resp.get("status", status), + response_data.get("result", result), + response_data.get("status", status), ) except Exception as e: log.info( @@ -385,12 +390,14 @@ async def _get_async_actions_finished_events( "Local action finished with an exception!", exc_info=True, ) + result = None # Set a default value for failed actions self.async_actions[main_flow_uid].remove(finished_task) # We need to create the corresponding action finished event - action_finished_event = self._get_action_finished_event(result) - action_finished_events.append(action_finished_event) + if result is not None: + action_finished_event = self._get_action_finished_event(result) + action_finished_events.append(action_finished_event) return action_finished_events, len(pending) @@ -429,8 +436,8 @@ async def process_events( state. """ - output_events = [] - input_events: List[Union[dict, InternalEvent]] = events.copy() + output_events: List[dict] = [] + input_events: List[Union[dict, InternalEvent]] = list(events) local_running_actions: List[asyncio.Task[dict]] = [] if state is None or state == {}: @@ -683,11 +690,14 @@ async def _run_action( k: v for k, v in start_action_event.items() if k not in ignore_keys } + # Filter events_history to only include dicts + dict_events = [event for event in events_history if isinstance(event, dict)] + return_value, new_events, context_updates = await self._process_start_action( action_name, action_params=action_params, context=state.context, - events=events_history, + events=dict_events, state=state, ) @@ -737,7 +747,7 @@ def create_flow_configs_from_flow_list(flows: List[Flow]) -> Dict[str, FlowConfi config = FlowConfig( id=flow.name, - elements=flow.elements, + elements=list(flow.elements), # Type cast to ensure compatibility decorators=convert_decorator_list_to_dictionary(flow.decorators), parameters=flow.parameters, return_members=flow.return_members, diff --git a/nemoguardrails/colang/v2_x/runtime/serialization.py b/nemoguardrails/colang/v2_x/runtime/serialization.py index 095bfbe0b..b11433295 100644 --- a/nemoguardrails/colang/v2_x/runtime/serialization.py +++ b/nemoguardrails/colang/v2_x/runtime/serialization.py @@ -224,9 +224,14 @@ def json_to_state(s: str) -> State: data = json.loads(s) state = decode_from_dict(data, refs={}) + # Ensure we have a State object + if not isinstance(state, State): + raise ValueError("Decoded object is not a State instance") + # Redo the callbacks. for flow_uid, flow_state in state.flow_states.items(): for head_id, head in flow_state.heads.items(): + # The partial creates a callback that expects just the head parameter head.position_changed_callback = partial( _flow_head_changed, state, flow_state ) diff --git a/nemoguardrails/colang/v2_x/runtime/statemachine.py b/nemoguardrails/colang/v2_x/runtime/statemachine.py index f213990bd..e88b4c050 100644 --- a/nemoguardrails/colang/v2_x/runtime/statemachine.py +++ b/nemoguardrails/colang/v2_x/runtime/statemachine.py @@ -88,14 +88,17 @@ def initialize_state(state: State) -> None: state.flow_states = dict() + # TODO: Think about where to put this + current_flow_config: Optional[FlowConfig] = None try: - # TODO: Think about where to put this for flow_config in state.flow_configs.values(): + current_flow_config = flow_config initialize_flow(state, flow_config) except Exception as e: - if e.args[0]: + if e.args[0] and current_flow_config is not None: raise ColangSyntaxError( - e.args[0] + f" in flow `{flow_config.id}` ({flow_config.source_file})" + e.args[0] + + f" in flow `{current_flow_config.id}` ({current_flow_config.source_file})" ) else: raise ColangSyntaxError() from e @@ -122,7 +125,8 @@ def initialize_flow(state: State, flow_config: FlowConfig) -> None: # Extract all the label elements for idx, element in enumerate(flow_config.elements): if isinstance(element, Label): - flow_config.element_labels.update({element["name"]: idx}) + if hasattr(element, "name") and element.name: + flow_config.element_labels[element.name] = idx def create_flow_instance( @@ -958,7 +962,12 @@ def _advance_head_front(state: State, heads: List[FlowHead]) -> List[FlowHead]: # In case there were any runtime error the flow will be aborted (fail) source_line = "unknown" element = flow_config.elements[head.position] - if hasattr(element, "_source") and element._source: + if ( + not isinstance(element, dict) + and hasattr(element, "_source") + and element._source is not None + and hasattr(element._source, "line") + ): source_line = str(element._source.line) log.warning( "Flow '%s' failed on line %s (%s) due to Colang runtime exception: %s", @@ -2200,9 +2209,19 @@ def get_event_name_from_element( ): if element_spec.members is None: raise ColangValueError("Missing event attributes!") - event_name = member["name"] - event = obj.get_event(event_name, {}) - return event.name + # Handle both dict and non-dict member types + if isinstance(member, dict) and "name" in member: + event_name = member["name"] + elif hasattr(member, "name"): + event_name = member.name + else: + # Fall back to string representation + event_name = str(member) + if event_name is not None: + event = obj.get_event(event_name, {}) + return event.name + else: + raise ColangRuntimeError("Unable to determine event name") else: raise ColangRuntimeError(f"Unsupported type '{type(obj)}'") @@ -2213,17 +2232,25 @@ def get_event_name_from_element( flow_config = state.flow_configs[element_spec.name] temp_flow_state = create_flow_instance(flow_config, "", "", {}) flow_event_name = element_spec.members[0]["name"] - flow_event: InternalEvent = temp_flow_state.get_event(flow_event_name, {}) - del flow_event.arguments["source_flow_instance_uid"] - del flow_event.arguments["flow_instance_uid"] - return flow_event.name + if flow_event_name is not None: + flow_event: InternalEvent = temp_flow_state.get_event( + flow_event_name, {} + ) + del flow_event.arguments["source_flow_instance_uid"] + del flow_event.arguments["flow_instance_uid"] + return flow_event.name + else: + raise ColangRuntimeError("Flow event name cannot be None") elif element_spec.spec_type == SpecType.ACTION: # Action object assert element_spec.name action = Action(element_spec.name, {}, flow_state.flow_id) event_name = element_spec.members[0]["name"] - action_event: ActionEvent = action.get_event(event_name, {}) - return action_event.name + if event_name is not None: + action_event: ActionEvent = action.get_event(event_name, {}) + return action_event.name + else: + raise ColangRuntimeError("Action event name cannot be None") else: raise ColangRuntimeError(f"Unsupported type '{element_spec.spec_type }'") else: @@ -2280,10 +2307,16 @@ def get_event_from_element( raise ColangValueError("Missing event attributes!") event_name = member["name"] event_arguments = member["arguments"] - event_arguments = _evaluate_arguments( - event_arguments, _get_eval_context(state, flow_state) - ) - event = obj.get_event(event_name, event_arguments) + if event_arguments is not None: + event_arguments = _evaluate_arguments( + event_arguments, _get_eval_context(state, flow_state) + ) + else: + event_arguments = {} + if event_name is not None: + event = obj.get_event(event_name, event_arguments) + else: + raise ColangRuntimeError("Event name cannot be None") if isinstance(event, InternalEvent) and isinstance(obj, FlowState): event.flow = obj @@ -2303,14 +2336,21 @@ def get_event_from_element( flow_config = state.flow_configs[element_spec.name] temp_flow_state = create_flow_instance(flow_config, "", "", {}) flow_event_name = element_spec.members[0]["name"] - flow_event_arguments = element_spec.arguments - flow_event_arguments.update(element_spec.members[0]["arguments"]) + flow_event_arguments = ( + element_spec.arguments if element_spec.arguments is not None else {} + ) + member_arguments = element_spec.members[0]["arguments"] + if member_arguments is not None: + flow_event_arguments.update(member_arguments) flow_event_arguments = _evaluate_arguments( flow_event_arguments, _get_eval_context(state, flow_state) ) - flow_event: InternalEvent = temp_flow_state.get_event( - flow_event_name, flow_event_arguments - ) + if flow_event_name is not None: + flow_event: InternalEvent = temp_flow_state.get_event( + flow_event_name, flow_event_arguments + ) + else: + raise ColangRuntimeError("Flow event name cannot be None") del flow_event.arguments["source_flow_instance_uid"] del flow_event.arguments["flow_instance_uid"] if element["op"] == "match": @@ -2319,17 +2359,28 @@ def get_event_from_element( return flow_event elif element_spec.spec_type == SpecType.ACTION: # Action object - action_arguments = _evaluate_arguments( - element_spec.arguments, _get_eval_context(state, flow_state) - ) + if element_spec.arguments is not None: + action_arguments = _evaluate_arguments( + element_spec.arguments, _get_eval_context(state, flow_state) + ) + else: + action_arguments = {} action = Action(element_spec.name, action_arguments, flow_state.flow_id) # TODO: refactor the following repetition of code (see above) event_name = element_spec.members[0]["name"] event_arguments = element_spec.members[0]["arguments"] - event_arguments = _evaluate_arguments( - event_arguments, _get_eval_context(state, flow_state) - ) - action_event: ActionEvent = action.get_event(event_name, event_arguments) + if event_arguments is not None: + event_arguments = _evaluate_arguments( + event_arguments, _get_eval_context(state, flow_state) + ) + else: + event_arguments = {} + if event_name is not None: + action_event: ActionEvent = action.get_event( + event_name, event_arguments + ) + else: + raise ColangRuntimeError("Action event name cannot be None") if element["op"] == "match": # Delete action_uid from event since the action is only a helper object action_event.action_uid = None @@ -2339,27 +2390,36 @@ def get_event_from_element( assert element_spec.name if element_spec.name.islower() or element_spec.name in InternalEvents.ALL: # Flow event - event_arguments = _evaluate_arguments( - element_spec.arguments, _get_eval_context(state, flow_state) - ) + if element_spec.arguments is not None: + event_arguments = _evaluate_arguments( + element_spec.arguments, _get_eval_context(state, flow_state) + ) + else: + event_arguments = {} flow_event = InternalEvent( name=element_spec.name, arguments=event_arguments ) return flow_event elif "Action" in element_spec.name: # Action event - event_arguments = _evaluate_arguments( - element_spec.arguments, _get_eval_context(state, flow_state) - ) + if element_spec.arguments is not None: + event_arguments = _evaluate_arguments( + element_spec.arguments, _get_eval_context(state, flow_state) + ) + else: + event_arguments = {} action_event = ActionEvent( name=element_spec.name, arguments=event_arguments ) return action_event else: # Event - event_arguments = _evaluate_arguments( - element_spec.arguments, _get_eval_context(state, flow_state) - ) + if element_spec.arguments is not None: + event_arguments = _evaluate_arguments( + element_spec.arguments, _get_eval_context(state, flow_state) + ) + else: + event_arguments = {} new_event = Event(name=element_spec.name, arguments=event_arguments) return new_event diff --git a/nemoguardrails/logging/processing_log.py b/nemoguardrails/logging/processing_log.py index decc50181..0a171473f 100644 --- a/nemoguardrails/logging/processing_log.py +++ b/nemoguardrails/logging/processing_log.py @@ -14,7 +14,7 @@ # limitations under the License. import contextvars -from typing import List +from typing import List, Optional from nemoguardrails.logging.explain import LLMCallInfo from nemoguardrails.rails.llm.options import ( @@ -24,7 +24,9 @@ ) # The processing log for the current async stack -processing_log_var = contextvars.ContextVar("processing_log", default=None) +processing_log_var: contextvars.ContextVar[ + Optional[List[dict]] +] = contextvars.ContextVar("processing_log", default=None) def compute_generation_log(processing_log: List[dict]) -> GenerationLog: