Skip to content

Commit

Permalink
Revert "fixes"
Browse files Browse the repository at this point in the history
This reverts commit f88a892.
  • Loading branch information
Josh Reini committed Sep 25, 2024
1 parent 4e183b8 commit e31f8cb
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 131 deletions.
7 changes: 2 additions & 5 deletions src/core/trulens/core/database/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def add_record_nowait(
self.batch_record_queue.put(record)

def _batch_loop(self):
apps = {}
while True:
time.sleep(self.RECORDS_BATCH_TIMEOUT_IN_SEC)
records = []
Expand All @@ -122,12 +121,10 @@ def _batch_loop(self):
)
continue
feedback_results = []
apps = {}
for record in records:
app_id = record.app_id
if app_id not in apps:
apps[app_id] = self.get_app(app_id=app_id)
app = apps[app_id]

app = apps.setdefault(app_id, self.get_app(app_id=app_id))
feedback_definitions = app.get("feedback_definitions", [])
# TODO(Dave): Modify this to add only client side feedback results
for feedback_definition_id in feedback_definitions:
Expand Down
6 changes: 3 additions & 3 deletions src/core/trulens/core/database/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def batch_insert_record(
self.orm.Record.parse(r, redact_keys=self.redact_keys)
for r in records
]
session.add_all(records_list)
session.bulk_save_objects(records_list)
logger.info(f"{UNICODE_CHECK} added record batch")
# return record ids from orm objects
return [r.record_id for r in records_list]
Expand Down Expand Up @@ -550,7 +550,7 @@ def batch_insert_feedback(
self.orm.FeedbackResult.parse(f, redact_keys=self.redact_keys)
for f in feedback_results
]
session.add_all(feedback_results_list)
session.bulk_save_objects(feedback_results_list)
return [f.feedback_result_id for f in feedback_results_list]

def _feedback_query(
Expand Down Expand Up @@ -807,7 +807,7 @@ def batch_insert_ground_truth(
)
ground_truths_to_insert.append(new_ground_truth)

session.add_all(ground_truths_to_insert)
session.bulk_save_objects(ground_truths_to_insert)
return [gt.ground_truth_id for gt in ground_truths]

def get_ground_truth(
Expand Down
157 changes: 36 additions & 121 deletions src/core/trulens/core/utils/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
Generator,
Generic,
Hashable,
Iterable,
Iterator,
List,
Optional,
Expand All @@ -32,7 +31,6 @@
TypeVar,
Union,
)
import weakref

T = TypeVar("T")

Expand Down Expand Up @@ -374,8 +372,7 @@ def superstack() -> Iterator[FrameType]:
across Tasks and threads.
"""

frames = stack_with_tasks()
next(iter(frames)) # skip this method itself
frames = stack_with_tasks()[1:] # + 1 to skip this method itself
# NOTE: skipping offset frames is done below since the full stack may need
# to be reconstructed there.

Expand Down Expand Up @@ -406,8 +403,8 @@ def caller_module_name(offset=0) -> str:
"""
Get the caller's (of this function) module name.
"""
frame = caller_frame(offset=offset + 1)
return frame.f_globals["__name__"]

return inspect.stack()[offset + 1].frame.f_globals["__name__"]


def caller_module(offset=0) -> ModuleType:
Expand All @@ -423,32 +420,8 @@ def caller_frame(offset=0) -> FrameType:
Get the caller's (of this function) frame. See
https://docs.python.org/3/reference/datamodel.html#frame-objects .
"""
caller_frame = inspect.currentframe()
for _ in range(offset + 1):
if caller_frame is None:
raise RuntimeError("No current frame found.")
caller_frame = caller_frame.f_back

if caller_frame is None:
raise RuntimeError("No caller frame found.")

return caller_frame


def external_caller_frame(offset=0) -> FrameType:
"""Get the caller's (of this function) frame that is not in the trulens
namespace.
Raises:
RuntimeError: If no such frame is found.
"""
frame = inspect.currentframe()
gen = stack_generator(frame=frame, offset=offset + 2)
for f_info in gen:
if not f_info.f_globals["__name__"].startswith("trulens"):
return f_info

raise RuntimeError("No external caller frame found.")
return inspect.stack()[offset + 1].frame


def caller_frameinfo(
Expand All @@ -464,11 +437,11 @@ def caller_frameinfo(
skip_module: Skip frames from the given module. Default is "trulens".
"""

for f_info in inspect.stack(0)[offset + 1 :]:
for finfo in inspect.stack()[offset + 1 :]:
if skip_module is None:
return f_info
if not f_info.frame.f_globals["__name__"].startswith(skip_module):
return f_info
return finfo
if not finfo.frame.f_globals["__name__"].startswith(skip_module):
return finfo

return None

Expand All @@ -485,8 +458,7 @@ def task_factory_with_stack(loop, coro, *args, **kwargs) -> Sequence[FrameType]:
parent_task = asyncio.current_task(loop=loop)
task = asyncio.tasks.Task(coro=coro, loop=loop, *args, **kwargs)

frame = inspect.currentframe()
stack = stack_generator(frame=frame, offset=3)
stack = [fi.frame for fi in inspect.stack()[2:]]

if parent_task is not None:
stack = merge_stacks(stack, parent_task.get_stack()[::-1])
Expand Down Expand Up @@ -527,7 +499,7 @@ def get_task_stack(task: asyncio.Task) -> Sequence[FrameType]:


def merge_stacks(
s1: Iterable[FrameType], s2: Sequence[FrameType]
s1: Sequence[FrameType], s2: Sequence[FrameType]
) -> Sequence[FrameType]:
"""
Assuming `s1` is a subset of `s2`, combine the two stacks in presumed call
Expand All @@ -536,110 +508,56 @@ def merge_stacks(

ret = []

for f in s1:
while len(s1) > 1:
f = s1[0]
s1 = s1[1:]

ret.append(f)
try:
s2i = s2.index(f)
for _ in range(s2i):
ret.append(s2[0])
s2 = s2[1:]

except Exception:
pass

return ret


def stack_generator(
frame: Optional[FrameType] = None, offset: int = 0
) -> Iterable[FrameType]:
if frame is None:
frame = inspect.currentframe()
for _ in range(offset):
if frame is None:
raise ValueError("No frame found.")
frame = frame.f_back
while frame is not None:
yield frame
frame = frame.f_back


def stack_with_tasks() -> Iterable[FrameType]:
"""Get the current stack (not including this function) with frames reaching
def stack_with_tasks() -> Sequence[FrameType]:
"""
Get the current stack (not including this function) with frames reaching
across Tasks.
"""
frame = inspect.currentframe()
frame_gen = stack_generator(frame=frame, offset=1)

ret = [fi.frame for fi in inspect.stack()[1:]] # skip stack_with_task_stack

try:
task_stack = get_task_stack(asyncio.current_task())

return merge_stacks(frame_gen, task_stack)
return merge_stacks(ret, task_stack)

except Exception:
return frame_gen

return ret

class _Wrap(Generic[T]):
"""Wrap an object.

See WeakWrapper for explanation.
def _future_target_wrapper(stack, context, func, *args, **kwargs):
"""

def __init__(self, obj: T):
self.obj: T = obj

def get(self) -> T:
return self.obj


@dataclasses.dataclass
class WeakWrapper(Generic[T]):
"""Wrap an object with a weak reference.
This is to be able to use weakref.ref on objects like lists which are
otherwise not weakly referencable. The goal of this class is to generalize
weakref.ref to work with any object."""

obj: weakref.ReferenceType[Union[_Wrap[T], T]]

def __init__(self, obj: Union[weakref.ReferenceType[T], WeakWrapper[T], T]):
if isinstance(obj, weakref.ReferenceType):
self.obj = obj

else:
if isinstance(obj, WeakWrapper):
obj = obj.get()

try:
# Try to make reference to obj directly.
self.obj = weakref.ref(obj)

except Exception:
# If its a list or other non-weakly referencable object, wrap it.
self.obj = weakref.ref(_Wrap(obj))

def get(self) -> T:
"""Get the wrapped object."""

temp = self.obj() # undo weakref.ref
if isinstance(temp, _Wrap):
return temp.get() # undo _Wrap if needed
else:
return temp


def _future_target_wrapper(stack, func, *args, **kwargs):
"""Wrapper for a function that is started by threads.
This is needed to record the call stack prior to thread creation as in
python threads do not inherit the stack. Our instrumentation, however,
relies on walking the stack and need to do this to the frames prior to
thread starts.
Wrapper for a function that is started by threads. This is needed to
record the call stack prior to thread creation as in python threads do
not inherit the stack. Our instrumentation, however, relies on walking
the stack and need to do this to the frames prior to thread starts.
"""

# TODO: See if threading.stack_size([size]) can be used instead.

# Keep this for looking up via get_first_local_in_call_stack .
pre_start_stack = WeakWrapper(stack) # noqa: F841 # pylint: disable=W0612
pre_start_stack = stack # noqa: F841

for var, value in context.items():
var.set(value)

# with with_context(context):
return func(*args, **kwargs)


Expand Down Expand Up @@ -680,16 +598,13 @@ def get_all_local_in_call_stack(
[TP][trulens.core.utils.threading.TP].
"""

frames_gen = stack_with_tasks()
frames = stack_with_tasks()[1:] # + 1 to skip this method itself
# NOTE: skipping offset frames is done below since the full stack may need
# to be reconstructed there.

# Using queue for frames as additional frames may be added due to handling threads.
q = queue.Queue()
for i, f in enumerate(frames_gen):
if i == 0:
# skip this method itself
continue
for f in frames:
q.put(f)

while not q.empty():
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/static/golden/api.trulens.3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1356,8 +1356,6 @@ trulens.core.utils.python:
safe_hasattr: builtins.function
safe_issubclass: builtins.function
safe_signature: builtins.function
set_context_vars_or_values: builtins.function
stack_generator: builtins.function
stack_with_tasks: builtins.function
superstack: builtins.function
task_factory_with_stack: builtins.function
Expand Down

0 comments on commit e31f8cb

Please sign in to comment.