Observability SDK
This guide shows you how to use the Agenta observability SDK to instrument workflows in your application.
If you're exclusively using a framework like LangChain, you can use the auto-instrumentation packages to automatically instrument your application.
However, if you need more flexibility, you can use the SDK to:
- Instrument custom functions in your workflow
- Add additional metadata to the spans
- Link traces to applications, variants, and environments in Agenta
Setup
1. Install the Agenta SDK
pip install -U agenta
2. Set environment variables
- Agenta Cloud or Enterprise
- Agenta OSS Running Locally
- Visit the Agenta API Keys page.
- Click on Create New API Key and follow the prompts.
import os
os.environ["AGENTA_API_KEY"] = "YOUR_AGENTA_API_KEY"
os.environ["AGENTA_HOST"] = "https://cloud.agenta.ai"
import os
os.environ["AGENTA_HOST"] = "http://localhost"
Instrumenting functions with the decorator
To instrument a function, add the @ag.instrument()
decorator. This automatically captures all input and output data.
The decorator has a spankind
argument to categorize each span in the UI. Available types are:
agent
, chain
, workflow
, tool
, embedding
, query
, completion
, chat
, rerank
The instrument decorator should be the top-most decorator on a function (i.e. the last decorator before the function call).
@ag.instrument(spankind="task")
def myllmcall(country:str):
prompt = f"What is the capital of {country}"
response = client.chat.completions.create(
model='gpt-4',
messages=[
{'role': 'user', 'content': prompt},
],
)
return response.choices[0].text
@ag.instrument(spankind="workflow")
def generate(country:str):
return myllmcall(country)
Agenta automatically determines the parent span based on the function call and nests the spans accordingly.
Modify a span's metadata
You can add additional information to a span's metadata using ag.tracing.store_meta()
. This function accesses the active span from the context and adds the key-value pairs to the metadata.
@ag.instrument(spankind="task")
def compile_prompt(country:str):
prompt = f"What is the capital of {country}"
ag.tracing.store_meta({"prompt_template": prompt})
formatted_prompt = prompt.format(country=country)
return formatted_prompt
Metadata is displayed in the span's raw data view.
Linking spans to applications, variants, and environments
You can link a span to an application, variant, and environment by calling ag.tracing.store_refs()
.
Applications, variants, and environments can be referenced by their slugs, versions, and commit IDs (for specific versions). You can link a span to an application and variant like this:
@ag.instrument(spankind="workflow")
def generate(country:str):
prompt = f"What is the capital of {country}"
formatted_prompt = prompt.format(country=country)
completion = client.chat.completions.create(
model='gpt-4',
messages=[
{'role': 'user', 'content': formatted_prompt},
],
)
ag.tracing.store_refs(
{
"application.slug": "capital-app",
"environment.slug": "production",
}
)
return completion.choices[0].message.content
ag.tracing.store_refs()
takes a dict with keys from application.slug
, application.id
, application.version
, variant.slug
, variant.id
, variant.version
, environment.slug
, environment.id
and environment.commit_id
, with the values being the slug, id, version or commit id of the application, variant, and environment respectively.
Storing Internals
Internals are additional data stored in the span. Compared to metadata, internals have the following differences:
- Internals are saved within the span data and are searchable with plain text queries.
- Internals are shown by default in the span view in a collapsible section, while metadata is only shown as part of the JSON file with the raw data (i.e. better visibility with internals).
- Internals can be used for evaluation. For instance, you can save the retrieved context in the internals and then use it to evaluate the factuality of the response.
As a rule of thumb, use metadata for additional information that is not used for evaluation and not elementary to understand the span, otherwise use internals.
Internals can be stored similarly to metadata:
@ag.instrument(spankind="workflow")
def rag_workflow(query:str):
context = retrieve_context(query)
ag.tracing.store_internals({"context": context})
prompt = f"Answer the following question {query} based on the context: {context}"
completion = client.chat.completions.create(
model='gpt-4',
messages=[
{'role': 'user', 'content': formatted_prompt},
],
)
return completion.choices[0].message.content
Excluding Inputs/Outputs Information from Capture
In some cases, you may want to exclude parts of the inputs or outputs due to privacy concerns or because the data is too large to be stored in the span.
You can achieve this by setting the ignore_inputs
and/or ignore_outputs
arguments to True
in the instrument decorator.
@ag.instrument(
spankind="workflow",
ignore_inputs=True,
ignore_outputs=True
)
def rag_workflow(query:str):
...
If you need finer control, you can specify which parts of the inputs and outputs you want to exclude.
@ag.instrument(
spankind="workflow",
ignore_inputs=["user_id"],
ignore_outputs=["pii"],
)
def rag_workflow(query:str, user_id:str):
...
return {
"result": ...,
"pii": ...
}
If you need even finer control, you can alternatively specify a custom redact()
callback, along with instructions in the case of errors.
def my_redact(name, field, data):
if name == "rag_workflow":
if field == "inputs":
del data["user_id"]
if field == "outputs":
del data["pii"]
return data
@ag.instrument(
spankind="workflow",
redact=my_redact,
redact_on_error=False,
)
def rag_workflow(query:str, user_id:str):
...
return {
"result": ...,
"pii": ...
}
Finally, if you want global guardrails, you can specify a global redact()
callback that will be applied on top of everything else.
def global_redact(
name:str,
field:str,
data: Dict[str, Any]
):
if "pii" in data:
del data["pii"]
return data
ag.init(
redact=global_redact,
redact_on_error=True,
)
def local_redact(
name:str,
field:str,
data: Dict[str, Any]
):
if name == "rag_workflow":
if field == "inputs":
del data["user_id"]
return data
@ag.instrument(
spankind="workflow",
redact=local_redact,
redact_on_error=False,
)
def rag_workflow(query:str, user_id:str):
...
return {
"result": ...,
"pii": ...
}