Creating Tasks
Tasks in Patronus experiments are functions that process each dataset example and produce outputs that will be evaluated. This page covers how to create and use tasks effectively.
Task Function Basics
A task function receives a dataset row and produces an output. The simplest task functions look like this:
def simple_task(row, **kwargs):
# Process the input from the row
input_text = row.task_input
# Generate an output (typically a score between 0 and 1)
quality_score = 0.85
# Return the output as a float
return quality_score
The framework automatically converts numeric outputs to TaskResult
objects.
Task Function Parameters
Task functions always receive these parameters:
row
: Row - The dataset example to processparent
: EvalParent - Information from previous chain stages (if any)tags
: Tags - Tags associated with the experiment and dataset**kwargs
: Additional keyword arguments
Here's a more complete task function:
from patronus.datasets import Row
from patronus.experiments.types import EvalParent
def complete_task(
row: Row,
parent: EvalParent = None,
tags: dict[str, str] = None,
**kwargs
):
# Access dataset fields
input_text = row.task_input
context = row.task_context
system_prompt = row.system_prompt
gold_answer = row.gold_answer
# Access parent information (from previous chain steps)
previous_output = None
if parent and parent.task:
previous_output = parent.task.output
# Access tags
model_name = tags.get("model_name", "default")
# Generate output (in real usage, this would call an LLM)
output = f"Model {model_name} processed: {input_text}"
# Return the output
return output
Return Types
Task functions can return several types:
String Output
Here's an improved example for the string return type section that demonstrates a classification task:
def classify_sentiment(row: Row, **kwargs) -> str:
# Extract the text to classify
text = row.task_input
# Simple rule-based sentiment classifier
positive_words = ["good", "great", "excellent", "happy", "positive"]
negative_words = ["bad", "terrible", "awful", "sad", "negative"]
text_lower = text.lower()
positive_count = sum(word in text_lower for word in positive_words)
negative_count = sum(word in text_lower for word in negative_words)
# Classify based on word counts
if positive_count > negative_count:
return "positive"
elif negative_count > positive_count:
return "negative"
else:
return "neutral"
The string output represents a specific classification category, which is a common pattern in text classification tasks.
Numeric Output (Float/Int)
For score-based outputs:
def score_task(row: Row, **kwargs) -> float:
# Calculate a relevance score between 0 and 1
return 0.92
TaskResult Object
For more control, return a TaskResult object:
from patronus.experiments.types import TaskResult
def task_result(row: Row, **kwargs) -> TaskResult:
# Generate output
output = f"Processed: {row.task_input}"
# Include metadata about the processing
metadata = {
"processing_time_ms": 42,
"confidence": 0.95,
"tokens_used": 150
}
# Add tags for filtering and organization
tags = {
"model": "gpt-4",
"temperature": "0.7"
}
# Return a complete TaskResult
return TaskResult(
output=output,
metadata=metadata,
tags=tags
)
None / Skipping Examples
Return None
to skip processing this example:
def selective_task(row: Row, **kwargs) -> None:
# Skip examples without the required fields
if not row.task_input or not row.gold_answer:
return None
# Process valid examples
return f"Processed: {row.task_input}"
Calling LLMs
A common use of tasks is to generate outputs using Large Language Models:
from openai import OpenAI
from patronus.datasets import Row
from patronus.experiments.types import TaskResult
oai = OpenAI()
def openai_task(row: Row, **kwargs) -> TaskResult:
# Prepare the input for the model
system_message = row.system_prompt or "You are a helpful assistant."
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": row.task_input}
]
# Call the OpenAI API
response = oai.chat.completions.create(
model="gpt-4",
messages=messages,
temperature=0.7,
max_tokens=150
)
# Extract the output
output = response.choices[0].message.content
# Include metadata about the call
metadata = {
"model": response.model,
"tokens": {
"prompt": response.usage.prompt_tokens,
"completion": response.usage.completion_tokens,
"total": response.usage.total_tokens
}
}
return TaskResult(
output=output,
metadata=metadata
)
Async Tasks
For better performance, especially with API calls, you can use async tasks:
import asyncio
from openai import AsyncOpenAI
from patronus.datasets import Row
from patronus.experiments.types import TaskResult
oai = AsyncOpenAI()
async def async_openai_task(
row: Row,
parent: EvalParent = None,
tags: dict[str, str] = None,
**kwargs
) -> TaskResult:
# Create async client
# Prepare the input
system_message = row.system_prompt or "You are a helpful assistant."
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": row.task_input}
]
# Call the OpenAI API asynchronously
response = await oai.chat.completions.create(
model="gpt-4",
messages=messages,
temperature=0.7,
max_tokens=150
)
# Extract and return the output
output = response.choices[0].message.content
return TaskResult(
output=output,
metadata={"model": response.model}
)
The Patronus framework automatically handles both synchronous and asynchronous tasks.
Using Parent Information
In multi-stage chains, tasks can access the results of previous stages:
from patronus.datasets import Row
from patronus.experiments.types import EvalParent
def second_stage_task(
row: Row,
parent: EvalParent,
tags: dict[str, str] = None,
**kwargs
) -> str:
# Access previous task output
if parent and parent.task:
previous_output = parent.task.output
return f"Building on previous output: {previous_output}"
# Fallback if no previous output
return f"Starting fresh: {row.task_input}"
Error Handling
Task functions should handle exceptions appropriately:
from patronus import get_logger
from patronus.datasets import Row
def robust_task(row: Row, **kwargs):
try:
# Attempt to process
if row.task_input:
return f"Processed: {row.task_input}"
else:
# Skip if input is missing
return None
except Exception as e:
# Log the error
get_logger().exception(f"Error processing row {row.sid}: {e}")
# Skip this example
return None
If an unhandled exception occurs, the experiment will log the error and skip that example.
Task Tracing
Tasks are automatically traced with the Patronus tracing system. You can add additional tracing:
from patronus.tracing import start_span
from patronus.datasets import Row
def traced_task(row: Row, **kwargs):
# Outer span is created automatically by the framework
# Create spans for subtasks
with start_span("Preprocessing"):
# Preprocessing logic...
preprocessed = preprocess(row.task_input)
with start_span("Model Call"):
# Model call logic...
output = call_model(preprocessed)
with start_span("Postprocessing"):
# Postprocessing logic...
final_output = postprocess(output)
return final_output
This helps with debugging and performance analysis.
Best Practices
When creating task functions:
- Handle missing data gracefully: Check for required fields and handle missing data
- Include useful metadata: Add information about processing steps, model parameters, etc.
- Use async for API calls: Async tasks significantly improve performance for API-dependent workflows
- Add explanatory tags: Tags help with filtering and analyzing results
- Add tracing spans: For complex processing, add spans to help with debugging and optimization
- Keep functions focused: Tasks should have a clear purpose; use chains for multi-step processes
Next, we'll explore how to use evaluators in experiments to assess task outputs.