Skip to content

Commit

Permalink
threading robustness and feedback retrieval (#480)
Browse files Browse the repository at this point in the history
* split off work on threading issues

* work on dummy example

* prototyping various thread robustness solutions

* work

* working on threading and feedback results

* more ignores

* nevermind that last gitignore addition

* remove unneeded

* added feedback result retrieval into langchain quickstart

* don't use submit inside feedback functions

* what the last thing said
  • Loading branch information
piotrm0 authored Oct 18, 2023
1 parent f21e13c commit 4041684
Show file tree
Hide file tree
Showing 16 changed files with 676 additions and 253 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"python.formatting.provider": "yapf"
"python.formatting.provider": "yapf",
"python.analysis.typeCheckingMode": "basic"
}
164 changes: 149 additions & 15 deletions trulens_eval/examples/experimental/dummy_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dummy Example\n",
"# Dummy Provider Example and High Volume Robustness Testing\n",
"\n",
"This notebook shows the use of the dummy feedback function provider which\n",
"behaves like the huggingface provider except it does not actually perform any\n",
"network calls and just produces constant results. It can be used to prototype\n",
"feedback function wiring for your apps before invoking potentially slow (to\n",
"run/to load) feedback functions."
"This notebook has two purposes: \n",
"\n",
"- Demostrate the dummy feedback function provider which behaves like the\n",
" huggingface provider except it does not actually perform any network calls and\n",
" just produces constant results. It can be used to prototype feedback function\n",
" wiring for your apps before invoking potentially slow (to run/to load)\n",
" feedback functions.\n",
"\n",
"- Test out high-volume record and feedback computation. To this end, we use the\n",
" custom app which is dummy in a sense that it produces useless answers without\n",
" making any API calls but otherwise behaves similarly to real apps, and the\n",
" dummy feedback function provider."
]
},
{
Expand All @@ -34,12 +41,28 @@
"metadata": {},
"outputs": [],
"source": [
"from concurrent.futures import as_completed\n",
"from time import sleep\n",
"\n",
"from examples.expositional.end2end_apps.custom_app.custom_app import CustomApp\n",
"from tqdm.auto import tqdm\n",
"\n",
"from trulens_eval import Feedback\n",
"from trulens_eval import Tru\n",
"from trulens_eval.feedback.provider.hugs import Dummy\n",
"from trulens_eval.schema import FeedbackMode\n",
"from trulens_eval.tru_custom_app import TruCustomApp\n",
"from trulens_eval.utils.threading import TP\n",
"\n",
"tp = TP()\n",
"\n",
"d = Dummy(\n",
" loading_prob=0.1,\n",
" freeze_prob=0.01,\n",
" error_prob=0.01,\n",
" overloaded_prob=0.1,\n",
" rpm=6000\n",
")\n",
"\n",
"tru = Tru()\n",
"\n",
Expand All @@ -57,24 +80,135 @@
"metadata": {},
"outputs": [],
"source": [
"f_dummy1 = Feedback(\n",
" d.language_match\n",
").on_input_output()\n",
"\n",
"f_dummy2 = Feedback(\n",
" d.positive_sentiment\n",
").on_output()\n",
"\n",
"f_dummy3 = Feedback(\n",
" d.positive_sentiment\n",
").on_input()\n",
"\n",
"\n",
"# Create custom app:\n",
"ca = CustomApp()\n",
"\n",
"# Create trulens wrapper:\n",
"ta = TruCustomApp(\n",
" ca,\n",
" app_id=\"customapp\"\n",
")\n",
" app_id=\"customapp\",\n",
" main_method = ca.respond_to_query,\n",
" feedbacks=[f_dummy1, f_dummy2, f_dummy3],\n",
" feedback_mode=FeedbackMode.WITH_APP_THREAD\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with ta as recorder:\n",
" res = ca.respond_to_query(f\"hello there\")\n",
"\n",
"rec = recorder.get()\n",
"print(rec.feedback_results)\n",
"for res in as_completed(rec.feedback_results):\n",
" print(res.result())\n",
" \n",
"print(rec.feedback_results)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Sequential app invocation.\n",
"\n",
"if True:\n",
" for i in tqdm(range(100), desc=\"invoking app\"):\n",
" with ta as recorder:\n",
" res = ca.respond_to_query(f\"hello {i}\")\n",
"\n",
" rec = recorder.get()\n",
" assert rec is not None"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Parallel feedback evaluation.\n",
"\n",
"futures = []\n",
"num_tests = 1000\n",
"good = 0\n",
"bad = 0\n",
"\n",
"def test_feedback(msg):\n",
" return msg, d.positive_sentiment(msg)\n",
"\n",
"for i in tqdm(range(num_tests), desc=\"starting feedback task\"):\n",
" futures.append(tp.submit(test_feedback, msg=f\"good\"))\n",
"\n",
"prog = tqdm(as_completed(futures), total=num_tests)\n",
"\n",
"for f in prog:\n",
" try:\n",
" res = f.result()\n",
" good += 1\n",
"\n",
" assert res[0] == \"good\"\n",
"\n",
" prog.set_description_str(f\"{good} / {bad}\")\n",
" except Exception as e:\n",
" bad += 1\n",
" prog.set_description_str(f\"{good} / {bad}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"✅ feedback result positive_sentiment DONE)\n",
"✅ feedback result positive_sentiment DONE)\n",
"✅ feedback result positive_sentiment DONE)\n",
"✅ feedback result language_match DONE)\n"
]
}
],
"source": [
"# Parallel app invocation.\n",
"\n",
"def run_query(q):\n",
"\n",
" with ta as recorder:\n",
" res = ca.respond_to_query(q)\n",
"\n",
"# Must be set after above constructor as the instrumentation changes what\n",
"# CustomApp.respond_to_query points to.\n",
"ta.main_method = CustomApp.respond_to_query\n",
"ta.main_async_method = CustomApp.arespond_to_query\n",
" rec = recorder.get()\n",
" assert rec is not None\n",
"\n",
"from trulens_eval.appui import AppUI\n",
" return f\"run_query {q} result\"\n",
"\n",
"aui = AppUI(app=ta)\n",
"aui.d"
"for i in tqdm(range(100), desc=\"starting app task\"):\n",
" print(\n",
" tp.completed_tasks, \n",
" end=\"\\r\"\n",
" )\n",
" tp.submit(run_query, q=f\"hello {i}\")"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from asyncio import sleep
from concurrent.futures import wait

from examples.expositional.end2end_apps.custom_app.custom_llm import CustomLLM
from examples.expositional.end2end_apps.custom_app.custom_memory import \
Expand All @@ -8,6 +9,7 @@
CustomRetriever

from trulens_eval.tru_custom_app import instrument
from trulens_eval.utils.threading import ThreadPoolExecutor

instrument.method(CustomRetriever, "retrieve_chunks")
instrument.method(CustomMemory, "remember")
Expand Down Expand Up @@ -42,6 +44,18 @@ def retrieve_chunks(self, data):
@instrument
def respond_to_query(self, input):
chunks = self.retrieve_chunks(input)

# Creates a few threads to process chunks in parallel to test apps that
# make use of threads.
ex = ThreadPoolExecutor(max_workers=max(1, len(chunks)))

futures = list(
ex.submit(lambda chunk: chunk + " processed", chunk=chunk) for chunk in chunks
)

wait(futures)
chunks = list(future.result() for future in futures)

self.memory.remember(input)

answer = self.llm.generate(",".join(chunks))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,15 @@
"\n",
"def test_bot(selector, question):\n",
" print(selector, question)\n",
" app = get_or_make_app(cid=question + str(selector), selector=selector)#, feedback_mode=FeedbackMode.WITH_APP)\n",
" app = get_or_make_app(cid=question + str(selector), selector=selector, feedback_mode=FeedbackMode.DEFERRED)\n",
" answer = get_answer(app=app, question=question)\n",
" return answer\n",
"\n",
"results = []\n",
"\n",
"for s in selectors:\n",
" for m in messages:\n",
" # results.append(TP().promise(test_bot, selector=s, question=m))\n",
" # TP().finish()\n",
" test_bot(selector=s, question=m)\n"
" results.append(TP().submit(test_bot, selector=s, question=m))\n"
]
},
{
Expand All @@ -125,8 +123,7 @@
"metadata": {},
"outputs": [],
"source": [
"thread = Tru().start_evaluator(restart=True)\n",
"# TP().finish()"
"thread = Tru().start_evaluator(restart=True)"
]
},
{
Expand Down
45 changes: 44 additions & 1 deletion trulens_eval/examples/quickstart/langchain_quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"\n",
"# Imports main tools:\n",
"from trulens_eval import TruChain, Feedback, Huggingface, Tru\n",
"from trulens_eval.schema import FeedbackResult\n",
"tru = Tru()\n",
"\n",
"# Imports from langchain to build app. You may need to install langchain first\n",
Expand Down Expand Up @@ -184,6 +185,48 @@
"display(llm_response)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Retrieve records and feedback"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The record of the ap invocation can be retrieved from the `recording`:\n",
"\n",
"rec = recording.get() # use .get if only one record\n",
"# recs = recording.records # use .records if multiple\n",
"\n",
"display(rec)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The results of the feedback functions can be rertireved from the record. These\n",
"# are `Future` instances (see `concurrent.futures`). You can use `as_completed`\n",
"# to wait until they have finished evaluating.\n",
"\n",
"from concurrent.futures import as_completed\n",
"\n",
"for feedback_future in as_completed(rec.feedback_results):\n",
" feedback, feedback_result = feedback_future.result()\n",
" \n",
" feedback: Feedback\n",
" feedbac_result: FeedbackResult\n",
"\n",
" display(feedback.name, feedback_result.result)\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
Expand Down Expand Up @@ -285,7 +328,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
"version": "3.8.16"
},
"vscode": {
"interpreter": {
Expand Down
Loading

0 comments on commit 4041684

Please sign in to comment.