Skip to content

Commit 172bf1d

Browse files
gustavocidornelaswhoseoyster
authored andcommitted
Completes OPEN-5690 Create Openlayer callback handler for LangChain support
1 parent a93115e commit 172bf1d

File tree

3 files changed

+341
-0
lines changed

3 files changed

+341
-0
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"id": "2722b419",
6+
"metadata": {},
7+
"source": [
8+
"[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/openlayer-ai/examples-gallery/blob/main/monitoring/llms/langchain/langchain_callback.ipynb)\n",
9+
"\n",
10+
"\n",
11+
"# <a id=\"top\">Openlayer LangChain callback handler</a>\n",
12+
"\n",
13+
"This notebook illustrates how use Openlayer's callback handler to monitor LLMs/chains built with LangChain."
14+
]
15+
},
16+
{
17+
"cell_type": "code",
18+
"execution_count": null,
19+
"id": "020c8f6a",
20+
"metadata": {},
21+
"outputs": [],
22+
"source": [
23+
"!pip install openlayer"
24+
]
25+
},
26+
{
27+
"cell_type": "markdown",
28+
"id": "75c2a473",
29+
"metadata": {},
30+
"source": [
31+
"## 1. Set the environment variables"
32+
]
33+
},
34+
{
35+
"cell_type": "code",
36+
"execution_count": null,
37+
"id": "f3f4fa13",
38+
"metadata": {},
39+
"outputs": [],
40+
"source": [
41+
"import os\n",
42+
"import openai\n",
43+
"\n",
44+
"# OpenAI env variable\n",
45+
"os.environ[\"OPENAI_API_KEY\"] = \"YOUR_OPENAI_API_KEY_HERE\"\n",
46+
"\n",
47+
"# Openlayer env variables\n",
48+
"os.environ[\"OPENLAYER_API_KEY\"] = \"YOUR_OPENLAYER_API_KEY_HERE\"\n",
49+
"os.environ[\"OPENLAYER_PROJECT_NAME\"] = \"YOUR_PROJECT_NAME_HERE\""
50+
]
51+
},
52+
{
53+
"cell_type": "markdown",
54+
"id": "9758533f",
55+
"metadata": {},
56+
"source": [
57+
"## 2. Instantiate the `OpenlayerHandler`"
58+
]
59+
},
60+
{
61+
"cell_type": "code",
62+
"execution_count": null,
63+
"id": "e60584fa",
64+
"metadata": {},
65+
"outputs": [],
66+
"source": [
67+
"from openlayer.integrations import langchain_callback\n",
68+
"\n",
69+
"openlayer_handler = langchain_callback.OpenlayerHandler()"
70+
]
71+
},
72+
{
73+
"cell_type": "markdown",
74+
"id": "72a6b954",
75+
"metadata": {},
76+
"source": [
77+
"## 3. Use LangChain"
78+
]
79+
},
80+
{
81+
"cell_type": "markdown",
82+
"id": "76a350b4",
83+
"metadata": {},
84+
"source": [
85+
"Now, you can pass the `openlayer_handler` as a callback to LLM's or chain invokations."
86+
]
87+
},
88+
{
89+
"cell_type": "code",
90+
"execution_count": null,
91+
"id": "e00c1c79",
92+
"metadata": {},
93+
"outputs": [],
94+
"source": [
95+
"from langchain_core.messages import HumanMessage\n",
96+
"from langchain_openai import ChatOpenAI"
97+
]
98+
},
99+
{
100+
"cell_type": "code",
101+
"execution_count": null,
102+
"id": "abaf6987-c257-4f0d-96e7-3739b24c7206",
103+
"metadata": {},
104+
"outputs": [],
105+
"source": [
106+
"chat = ChatOpenAI(max_tokens=25, callbacks=[openlayer_handler])"
107+
]
108+
},
109+
{
110+
"cell_type": "code",
111+
"execution_count": null,
112+
"id": "4123669f-aa28-47b7-8d46-ee898aba99e8",
113+
"metadata": {},
114+
"outputs": [],
115+
"source": [
116+
"chat.invoke([HumanMessage(content=\"What's the meaning of life?\")])"
117+
]
118+
},
119+
{
120+
"cell_type": "markdown",
121+
"id": "9a702ad1-da68-4757-95a6-4661ddaef251",
122+
"metadata": {},
123+
"source": [
124+
"That's it! Now your data is being streamed to Openlayer after every invokation."
125+
]
126+
},
127+
{
128+
"cell_type": "code",
129+
"execution_count": null,
130+
"id": "a3092828-3fbd-4f12-bae7-8de7f7319ff0",
131+
"metadata": {},
132+
"outputs": [],
133+
"source": []
134+
}
135+
],
136+
"metadata": {
137+
"kernelspec": {
138+
"display_name": "Python 3 (ipykernel)",
139+
"language": "python",
140+
"name": "python3"
141+
},
142+
"language_info": {
143+
"codemirror_mode": {
144+
"name": "ipython",
145+
"version": 3
146+
},
147+
"file_extension": ".py",
148+
"mimetype": "text/x-python",
149+
"name": "python",
150+
"nbconvert_exporter": "python",
151+
"pygments_lexer": "ipython3",
152+
"version": "3.9.18"
153+
}
154+
},
155+
"nbformat": 4,
156+
"nbformat_minor": 5
157+
}

openlayer/integrations/__init__.py

Whitespace-only changes.
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
"""Module with the Openlayer callback handler for LangChain."""
2+
3+
# pylint: disable=unused-argument
4+
import time
5+
from typing import Any, Dict, List, Optional, Union
6+
7+
from langchain import schema as langchain_schema
8+
from langchain.callbacks.base import BaseCallbackHandler
9+
10+
from .. import constants
11+
from ..tracing import tracer
12+
13+
LANGCHAIN_TO_OPENLAYER_PROVIDER_MAP = {"openai-chat": "OpenAI"}
14+
PROVIDER_TO_STEP_NAME = {"OpenAI": "OpenAI Chat Completion"}
15+
16+
17+
class OpenlayerHandler(BaseCallbackHandler):
18+
"""LangChain callback handler that logs to Openlayer."""
19+
20+
def __init__(
21+
self,
22+
**kwargs: Any,
23+
) -> None:
24+
super().__init__()
25+
26+
self.start_time: float = None
27+
self.end_time: float = None
28+
self.prompt: List[Dict[str, str]] = None
29+
self.latency: float = None
30+
self.provider: str = None
31+
self.model: Optional[str] = None
32+
self.model_parameters: Dict[str, Any] = None
33+
self.cost: Optional[float] = None
34+
self.prompt_tokens: int = None
35+
self.completion_tokens: int = None
36+
self.total_tokens: int = None
37+
self.output: str = None
38+
self.metatada: Dict[str, Any] = kwargs or {}
39+
40+
def on_llm_start(
41+
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
42+
) -> Any:
43+
"""Run when LLM starts running."""
44+
45+
def on_chat_model_start(
46+
self,
47+
serialized: Dict[str, Any],
48+
messages: List[List[langchain_schema.BaseMessage]],
49+
**kwargs: Any,
50+
) -> Any:
51+
"""Run when Chat Model starts running."""
52+
self.model_parameters = kwargs.get("invocation_params", {})
53+
54+
provider = self.model_parameters.get("_type", None)
55+
if provider in LANGCHAIN_TO_OPENLAYER_PROVIDER_MAP:
56+
self.provider = LANGCHAIN_TO_OPENLAYER_PROVIDER_MAP[provider]
57+
self.model_parameters.pop("_type")
58+
59+
self.model = self.model_parameters.get("model_name", None)
60+
self.output = ""
61+
self.prompt = self._langchain_messages_to_prompt(messages)
62+
self.start_time = time.time()
63+
64+
@staticmethod
65+
def _langchain_messages_to_prompt(
66+
messages: List[List[langchain_schema.BaseMessage]],
67+
) -> List[Dict[str, str]]:
68+
"""Converts Langchain messages to the Openlayer prompt format (similar to
69+
OpenAI's.)"""
70+
prompt = []
71+
for message in messages:
72+
for m in message:
73+
if m.type == "human":
74+
prompt.append({"role": "user", "content": m.content})
75+
elif m.type == "system":
76+
prompt.append({"role": "system", "content": m.content})
77+
elif m.type == "ai":
78+
prompt.append({"role": "assistant", "content": m.content})
79+
return prompt
80+
81+
def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
82+
"""Run on new LLM token. Only available when streaming is enabled."""
83+
84+
def on_llm_end(self, response: langchain_schema.LLMResult, **kwargs: Any) -> Any:
85+
"""Run when LLM ends running."""
86+
self.end_time = time.time()
87+
self.latency = (self.end_time - self.start_time) * 1000
88+
89+
if response.llm_output and "token_usage" in response.llm_output:
90+
self.prompt_tokens = response.llm_output["token_usage"].get(
91+
"prompt_tokens", 0
92+
)
93+
self.completion_tokens = response.llm_output["token_usage"].get(
94+
"completion_tokens", 0
95+
)
96+
self.cost = self._get_cost_estimate(
97+
num_input_tokens=self.prompt_tokens,
98+
num_output_tokens=self.completion_tokens,
99+
)
100+
self.total_tokens = response.llm_output["token_usage"].get(
101+
"total_tokens", 0
102+
)
103+
104+
for generations in response.generations:
105+
for generation in generations:
106+
self.output += generation.text.replace("\n", " ")
107+
108+
self._add_to_trace()
109+
110+
def _get_cost_estimate(
111+
self, num_input_tokens: int, num_output_tokens: int
112+
) -> float:
113+
"""Returns the cost estimate for a given model and number of tokens."""
114+
if self.model not in constants.OPENAI_COST_PER_TOKEN:
115+
return None
116+
cost_per_token = constants.OPENAI_COST_PER_TOKEN[self.model]
117+
return (
118+
cost_per_token["input"] * num_input_tokens
119+
+ cost_per_token["output"] * num_output_tokens
120+
)
121+
122+
def _add_to_trace(self) -> None:
123+
"""Adds to the trace."""
124+
name = PROVIDER_TO_STEP_NAME.get(self.provider, "Chat Completion Model")
125+
tracer.add_openai_chat_completion_step_to_trace(
126+
name=name,
127+
provider=self.provider,
128+
inputs={"prompt": self.prompt},
129+
output=self.output,
130+
cost=self.cost,
131+
tokens=self.total_tokens,
132+
latency=self.latency,
133+
start_time=self.start_time,
134+
end_time=self.end_time,
135+
model=self.model,
136+
model_parameters=self.model_parameters,
137+
prompt_tokens=self.prompt_tokens,
138+
completion_tokens=self.completion_tokens,
139+
metadata=self.metatada,
140+
)
141+
142+
def on_llm_error(
143+
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
144+
) -> Any:
145+
"""Run when LLM errors."""
146+
147+
def on_chain_start(
148+
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
149+
) -> Any:
150+
"""Run when chain starts running."""
151+
152+
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
153+
"""Run when chain ends running."""
154+
155+
def on_chain_error(
156+
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
157+
) -> Any:
158+
"""Run when chain errors."""
159+
160+
def on_tool_start(
161+
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
162+
) -> Any:
163+
"""Run when tool starts running."""
164+
165+
def on_tool_end(self, output: str, **kwargs: Any) -> Any:
166+
"""Run when tool ends running."""
167+
168+
def on_tool_error(
169+
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
170+
) -> Any:
171+
"""Run when tool errors."""
172+
173+
def on_text(self, text: str, **kwargs: Any) -> Any:
174+
"""Run on arbitrary text."""
175+
176+
def on_agent_action(
177+
self, action: langchain_schema.AgentAction, **kwargs: Any
178+
) -> Any:
179+
"""Run on agent action."""
180+
181+
def on_agent_finish(
182+
self, finish: langchain_schema.AgentFinish, **kwargs: Any
183+
) -> Any:
184+
"""Run on agent end."""

0 commit comments

Comments
 (0)