From e31f8cb0e4d6cf6161812989572feb063948adea Mon Sep 17 00:00:00 2001 From: Josh Reini Date: Wed, 25 Sep 2024 18:00:52 -0400 Subject: [PATCH] Revert "fixes" This reverts commit f88a89273f3fe0c2e591093a35d6435a8a11f1a6. --- .../trulens/core/database/connector/base.py | 7 +- src/core/trulens/core/database/sqlalchemy.py | 6 +- src/core/trulens/core/utils/python.py | 157 ++++-------------- .../unit/static/golden/api.trulens.3.11.yaml | 2 - 4 files changed, 41 insertions(+), 131 deletions(-) diff --git a/src/core/trulens/core/database/connector/base.py b/src/core/trulens/core/database/connector/base.py index 5500e87c6..75970e8e4 100644 --- a/src/core/trulens/core/database/connector/base.py +++ b/src/core/trulens/core/database/connector/base.py @@ -100,7 +100,6 @@ def add_record_nowait( self.batch_record_queue.put(record) def _batch_loop(self): - apps = {} while True: time.sleep(self.RECORDS_BATCH_TIMEOUT_IN_SEC) records = [] @@ -122,12 +121,10 @@ def _batch_loop(self): ) continue feedback_results = [] + apps = {} for record in records: app_id = record.app_id - if app_id not in apps: - apps[app_id] = self.get_app(app_id=app_id) - app = apps[app_id] - + app = apps.setdefault(app_id, self.get_app(app_id=app_id)) feedback_definitions = app.get("feedback_definitions", []) # TODO(Dave): Modify this to add only client side feedback results for feedback_definition_id in feedback_definitions: diff --git a/src/core/trulens/core/database/sqlalchemy.py b/src/core/trulens/core/database/sqlalchemy.py index 4a014e9d2..8b3d632bd 100644 --- a/src/core/trulens/core/database/sqlalchemy.py +++ b/src/core/trulens/core/database/sqlalchemy.py @@ -375,7 +375,7 @@ def batch_insert_record( self.orm.Record.parse(r, redact_keys=self.redact_keys) for r in records ] - session.add_all(records_list) + session.bulk_save_objects(records_list) logger.info(f"{UNICODE_CHECK} added record batch") # return record ids from orm objects return [r.record_id for r in records_list] @@ -550,7 +550,7 @@ def batch_insert_feedback( self.orm.FeedbackResult.parse(f, redact_keys=self.redact_keys) for f in feedback_results ] - session.add_all(feedback_results_list) + session.bulk_save_objects(feedback_results_list) return [f.feedback_result_id for f in feedback_results_list] def _feedback_query( @@ -807,7 +807,7 @@ def batch_insert_ground_truth( ) ground_truths_to_insert.append(new_ground_truth) - session.add_all(ground_truths_to_insert) + session.bulk_save_objects(ground_truths_to_insert) return [gt.ground_truth_id for gt in ground_truths] def get_ground_truth( diff --git a/src/core/trulens/core/utils/python.py b/src/core/trulens/core/utils/python.py index fec05f2e1..f051da3d7 100644 --- a/src/core/trulens/core/utils/python.py +++ b/src/core/trulens/core/utils/python.py @@ -23,7 +23,6 @@ Generator, Generic, Hashable, - Iterable, Iterator, List, Optional, @@ -32,7 +31,6 @@ TypeVar, Union, ) -import weakref T = TypeVar("T") @@ -374,8 +372,7 @@ def superstack() -> Iterator[FrameType]: across Tasks and threads. """ - frames = stack_with_tasks() - next(iter(frames)) # skip this method itself + frames = stack_with_tasks()[1:] # + 1 to skip this method itself # NOTE: skipping offset frames is done below since the full stack may need # to be reconstructed there. @@ -406,8 +403,8 @@ def caller_module_name(offset=0) -> str: """ Get the caller's (of this function) module name. """ - frame = caller_frame(offset=offset + 1) - return frame.f_globals["__name__"] + + return inspect.stack()[offset + 1].frame.f_globals["__name__"] def caller_module(offset=0) -> ModuleType: @@ -423,32 +420,8 @@ def caller_frame(offset=0) -> FrameType: Get the caller's (of this function) frame. See https://docs.python.org/3/reference/datamodel.html#frame-objects . """ - caller_frame = inspect.currentframe() - for _ in range(offset + 1): - if caller_frame is None: - raise RuntimeError("No current frame found.") - caller_frame = caller_frame.f_back - - if caller_frame is None: - raise RuntimeError("No caller frame found.") - - return caller_frame - - -def external_caller_frame(offset=0) -> FrameType: - """Get the caller's (of this function) frame that is not in the trulens - namespace. - - Raises: - RuntimeError: If no such frame is found. - """ - frame = inspect.currentframe() - gen = stack_generator(frame=frame, offset=offset + 2) - for f_info in gen: - if not f_info.f_globals["__name__"].startswith("trulens"): - return f_info - raise RuntimeError("No external caller frame found.") + return inspect.stack()[offset + 1].frame def caller_frameinfo( @@ -464,11 +437,11 @@ def caller_frameinfo( skip_module: Skip frames from the given module. Default is "trulens". """ - for f_info in inspect.stack(0)[offset + 1 :]: + for finfo in inspect.stack()[offset + 1 :]: if skip_module is None: - return f_info - if not f_info.frame.f_globals["__name__"].startswith(skip_module): - return f_info + return finfo + if not finfo.frame.f_globals["__name__"].startswith(skip_module): + return finfo return None @@ -485,8 +458,7 @@ def task_factory_with_stack(loop, coro, *args, **kwargs) -> Sequence[FrameType]: parent_task = asyncio.current_task(loop=loop) task = asyncio.tasks.Task(coro=coro, loop=loop, *args, **kwargs) - frame = inspect.currentframe() - stack = stack_generator(frame=frame, offset=3) + stack = [fi.frame for fi in inspect.stack()[2:]] if parent_task is not None: stack = merge_stacks(stack, parent_task.get_stack()[::-1]) @@ -527,7 +499,7 @@ def get_task_stack(task: asyncio.Task) -> Sequence[FrameType]: def merge_stacks( - s1: Iterable[FrameType], s2: Sequence[FrameType] + s1: Sequence[FrameType], s2: Sequence[FrameType] ) -> Sequence[FrameType]: """ Assuming `s1` is a subset of `s2`, combine the two stacks in presumed call @@ -536,110 +508,56 @@ def merge_stacks( ret = [] - for f in s1: + while len(s1) > 1: + f = s1[0] + s1 = s1[1:] + ret.append(f) try: s2i = s2.index(f) for _ in range(s2i): ret.append(s2[0]) s2 = s2[1:] + except Exception: pass return ret -def stack_generator( - frame: Optional[FrameType] = None, offset: int = 0 -) -> Iterable[FrameType]: - if frame is None: - frame = inspect.currentframe() - for _ in range(offset): - if frame is None: - raise ValueError("No frame found.") - frame = frame.f_back - while frame is not None: - yield frame - frame = frame.f_back - - -def stack_with_tasks() -> Iterable[FrameType]: - """Get the current stack (not including this function) with frames reaching +def stack_with_tasks() -> Sequence[FrameType]: + """ + Get the current stack (not including this function) with frames reaching across Tasks. """ - frame = inspect.currentframe() - frame_gen = stack_generator(frame=frame, offset=1) + + ret = [fi.frame for fi in inspect.stack()[1:]] # skip stack_with_task_stack + try: task_stack = get_task_stack(asyncio.current_task()) - return merge_stacks(frame_gen, task_stack) + return merge_stacks(ret, task_stack) except Exception: - return frame_gen - + return ret -class _Wrap(Generic[T]): - """Wrap an object. - See WeakWrapper for explanation. +def _future_target_wrapper(stack, context, func, *args, **kwargs): """ - - def __init__(self, obj: T): - self.obj: T = obj - - def get(self) -> T: - return self.obj - - -@dataclasses.dataclass -class WeakWrapper(Generic[T]): - """Wrap an object with a weak reference. - - This is to be able to use weakref.ref on objects like lists which are - otherwise not weakly referencable. The goal of this class is to generalize - weakref.ref to work with any object.""" - - obj: weakref.ReferenceType[Union[_Wrap[T], T]] - - def __init__(self, obj: Union[weakref.ReferenceType[T], WeakWrapper[T], T]): - if isinstance(obj, weakref.ReferenceType): - self.obj = obj - - else: - if isinstance(obj, WeakWrapper): - obj = obj.get() - - try: - # Try to make reference to obj directly. - self.obj = weakref.ref(obj) - - except Exception: - # If its a list or other non-weakly referencable object, wrap it. - self.obj = weakref.ref(_Wrap(obj)) - - def get(self) -> T: - """Get the wrapped object.""" - - temp = self.obj() # undo weakref.ref - if isinstance(temp, _Wrap): - return temp.get() # undo _Wrap if needed - else: - return temp - - -def _future_target_wrapper(stack, func, *args, **kwargs): - """Wrapper for a function that is started by threads. - - This is needed to record the call stack prior to thread creation as in - python threads do not inherit the stack. Our instrumentation, however, - relies on walking the stack and need to do this to the frames prior to - thread starts. + Wrapper for a function that is started by threads. This is needed to + record the call stack prior to thread creation as in python threads do + not inherit the stack. Our instrumentation, however, relies on walking + the stack and need to do this to the frames prior to thread starts. """ + # TODO: See if threading.stack_size([size]) can be used instead. + # Keep this for looking up via get_first_local_in_call_stack . - pre_start_stack = WeakWrapper(stack) # noqa: F841 # pylint: disable=W0612 + pre_start_stack = stack # noqa: F841 + + for var, value in context.items(): + var.set(value) - # with with_context(context): return func(*args, **kwargs) @@ -680,16 +598,13 @@ def get_all_local_in_call_stack( [TP][trulens.core.utils.threading.TP]. """ - frames_gen = stack_with_tasks() + frames = stack_with_tasks()[1:] # + 1 to skip this method itself # NOTE: skipping offset frames is done below since the full stack may need # to be reconstructed there. # Using queue for frames as additional frames may be added due to handling threads. q = queue.Queue() - for i, f in enumerate(frames_gen): - if i == 0: - # skip this method itself - continue + for f in frames: q.put(f) while not q.empty(): diff --git a/tests/unit/static/golden/api.trulens.3.11.yaml b/tests/unit/static/golden/api.trulens.3.11.yaml index b8310dd35..28c3d527d 100644 --- a/tests/unit/static/golden/api.trulens.3.11.yaml +++ b/tests/unit/static/golden/api.trulens.3.11.yaml @@ -1356,8 +1356,6 @@ trulens.core.utils.python: safe_hasattr: builtins.function safe_issubclass: builtins.function safe_signature: builtins.function - set_context_vars_or_values: builtins.function - stack_generator: builtins.function stack_with_tasks: builtins.function superstack: builtins.function task_factory_with_stack: builtins.function