Async Operations¶
Optimize performance with asynchronous operations for large-scale prompt evaluations and concurrent processing
The elluminate SDK provides a full-featured AsyncClient for asynchronous operations. This is particularly useful when:
- Running Multiple Experiments Concurrently - Execute several experiments in parallel instead of sequentially
- Integration with Async Frameworks - Use with FastAPI, aiohttp, or other async web frameworks
- Large-Scale Batch Processing - Process hundreds or thousands of operations with better resource utilization
- Non-Blocking Operations - Keep your application responsive during long-running evaluations
AsyncClient vs Client¶
The SDK provides two client classes with identical APIs:
Client- Synchronous operations (blocking I/O)AsyncClient- Asynchronous operations (non-blocking I/O)
All public methods have the same signature; the only difference is that AsyncClient methods must be awaited:
from elluminate import Client, AsyncClient
# Synchronous
client = Client()
template = client.create_prompt_template(name="Test", template="...")
# Asynchronous (same signature, just add await)
async with AsyncClient() as client:
template = await client.create_prompt_template(name="Test", template="...")
Basic Usage¶
Context Manager (Recommended)¶
Always use the async context manager to ensure proper resource cleanup:
import asyncio
from elluminate import AsyncClient
async def main():
async with AsyncClient() as client:
# Create resources
template = await client.create_prompt_template(
name="My Template",
template="Explain {{concept}} in simple terms.",
)
# Run experiment
experiment = await client.run_experiment(
name="My Experiment",
prompt_template=template,
collection=collection,
)
print(f"Completed: {experiment.name}")
# Run the async function
asyncio.run(main())
Manual Resource Management¶
If you need manual control over the client lifecycle:
from elluminate import AsyncClient
async def main():
client = AsyncClient()
try:
template = await client.create_prompt_template(...)
finally:
await client.close() # Important: clean up resources
asyncio.run(main())
Complete Example¶
This example demonstrates the full async workflow:
- Use async context manager for proper resource management
- All client methods are async - use
await - Rich model methods (on schema objects) use
aprefix:agenerate_criteria(),aadd_many() - Access results the same way as synchronous code
Concurrent Execution¶
The real power of AsyncClient comes from running multiple operations concurrently with asyncio.gather():
- Set up shared resources (template, criteria, collection)
- Run multiple experiments concurrently using
asyncio.gather()- significantly faster than sequential execution - Compare results across all experiments
Performance Comparison¶
For 3 experiments with 5 test cases each:
- Sequential (sync): ~45 seconds (3 experiments × 15 seconds each)
- Concurrent (async): ~15 seconds (all 3 run in parallel)
3x faster with concurrent execution!
Async Method Reference¶
AsyncClient Public Methods¶
AsyncClient public methods do NOT use the a prefix:
async with AsyncClient() as client:
# Prompt Templates
await client.create_prompt_template(...)
await client.get_prompt_template(...)
await client.get_or_create_prompt_template(...)
await client.list_prompt_templates()
await client.delete_prompt_template(...)
# Collections
await client.create_collection(...)
await client.get_collection(...)
await client.get_or_create_collection(...)
await client.list_collections()
await client.delete_collection(...)
# Experiments
await client.create_experiment(...)
await client.run_experiment(...) # Creates and runs
await client.get_experiment(...)
await client.list_experiments()
await client.delete_experiment(...)
# Criterion Sets
await client.create_criterion_set(...)
await client.get_criterion_set(...)
await client.get_or_create_criterion_set(...)
await client.list_criterion_sets()
# LLM Configs
await client.create_llm_config(...)
await client.get_llm_config(...)
await client.get_or_create_llm_config(...)
await client.list_llm_configs()
# And more...
Rich Model Async Methods¶
Rich model methods (on schema objects) DO use the a prefix:
# PromptTemplate
template = await client.get_prompt_template(name="...")
criteria = await template.agenerate_criteria()
criteria, generated = await template.aget_or_generate_criteria()
all_criteria = await template.alist_criteria()
new_template = await template.anew_version(template="...")
# TemplateVariablesCollection
collection = await client.get_collection(name="...")
await collection.aadd_many(variables=[...])
await collection.aclear()
await collection.agenerate_variables(prompt_template)
# CriterionSet
criterion_set = await client.get_criterion_set(name="...")
await criterion_set.aadd_criterion(criterion="...")
await criterion_set.aadd_criteria(criteria=[...])
await criterion_set.aclear()
await criterion_set.alink_template(template)
await criterion_set.aunlink_template(template)
# Experiment
experiment = await client.get_experiment(name="...")
await experiment.arun()
await experiment.afetch_responses()
await experiment.aadd_responses(responses=[...], template_variables=[...])
await experiment.arate_responses()
new_exp = await experiment.aclone(name="...")
Common Patterns¶
Pattern 1: Concurrent A/B Testing¶
Run multiple prompt variations simultaneously:
async def ab_test():
async with AsyncClient() as client:
# Set up shared resources
criterion_set = await client.create_criterion_set(name="Quality")
await criterion_set.aadd_criteria([
"Is the response helpful?",
"Is it accurate?",
])
collection = await client.create_collection(name="Test Cases")
await collection.aadd_many(variables=[
{"topic": "AI"},
{"topic": "ML"},
])
# Create templates
template_a = await client.create_prompt_template(
name="Style A",
template="Explain {{topic}} briefly.",
)
template_b = await client.create_prompt_template(
name="Style B",
template="Explain {{topic}} with examples.",
)
# Run both experiments concurrently
exp_a, exp_b = await asyncio.gather(
client.run_experiment(
name="Test A",
prompt_template=template_a,
collection=collection,
criterion_set=criterion_set,
),
client.run_experiment(
name="Test B",
prompt_template=template_b,
collection=collection,
criterion_set=criterion_set,
),
)
# Compare results
print(f"A: {exp_a.results.mean_all_ratings.yes:.1%}")
print(f"B: {exp_b.results.mean_all_ratings.yes:.1%}")
asyncio.run(ab_test())
Pattern 2: Batch Processing Test Cases¶
Process multiple test case collections in parallel:
async def batch_process():
async with AsyncClient() as client:
template = await client.get_prompt_template(name="My Template")
criterion_set = await client.get_criterion_set(name="My Criteria")
# Get multiple collections
collections = await asyncio.gather(
client.get_collection(name="Collection 1"),
client.get_collection(name="Collection 2"),
client.get_collection(name="Collection 3"),
)
# Run experiments on all collections concurrently
experiments = await asyncio.gather(*[
client.run_experiment(
name=f"Experiment {i+1}",
prompt_template=template,
collection=coll,
criterion_set=criterion_set,
)
for i, coll in enumerate(collections)
])
return experiments
results = asyncio.run(batch_process())
Pattern 3: FastAPI Integration¶
Use AsyncClient in a FastAPI endpoint:
from fastapi import FastAPI
from elluminate import AsyncClient
app = FastAPI()
# Create a single AsyncClient instance for the application
async_client = AsyncClient()
@app.on_event("startup")
async def startup():
global async_client
async_client = AsyncClient()
@app.on_event("shutdown")
async def shutdown():
await async_client.close()
@app.post("/evaluate")
async def evaluate_prompt(prompt: str, test_case: dict):
# Get or create template
template, _ = await async_client.get_or_create_prompt_template(
name="API Template",
template=prompt,
)
# Generate criteria
criteria, _ = await template.aget_or_generate_criteria()
criterion_set = await async_client.get_criterion_set(name=template.name)
# Create collection with single test case
collection = await async_client.create_collection(name=f"Test {timestamp}")
await collection.aadd_many(variables=[test_case])
# Run experiment
experiment = await async_client.run_experiment(
name=f"Eval {timestamp}",
prompt_template=template,
collection=collection,
criterion_set=criterion_set,
)
return {
"experiment_id": experiment.id,
"pass_rate": experiment.results.mean_all_ratings.yes if experiment.results else 0,
}
Jupyter Notebook Support¶
AsyncClient works in Jupyter notebooks with nest_asyncio:
# Install in notebook
!pip install elluminate nest-asyncio
# Enable nested event loops
import nest_asyncio
nest_asyncio.apply()
# Now you can use await in cells
from elluminate import AsyncClient
async with AsyncClient() as client:
template = await client.create_prompt_template(...)
experiment = await client.run_experiment(...)
Performance Tips¶
1. Use asyncio.gather() for Independent Operations¶
When operations don't depend on each other, run them concurrently:
# GOOD: Concurrent execution
template, collection, criterion_set = await asyncio.gather(
client.get_prompt_template(name="..."),
client.get_collection(name="..."),
client.get_criterion_set(name="..."),
)
# BAD: Sequential execution (3x slower)
template = await client.get_prompt_template(name="...")
collection = await client.get_collection(name="...")
criterion_set = await client.get_criterion_set(name="...")
2. Limit Concurrency for Resource-Intensive Operations¶
Use semaphores to limit concurrent expensive operations:
import asyncio
async def run_with_limit(semaphore, client, name, template, collection):
async with semaphore:
return await client.run_experiment(
name=name,
prompt_template=template,
collection=collection,
)
async def main():
async with AsyncClient() as client:
# Limit to 5 concurrent experiments
semaphore = asyncio.Semaphore(5)
tasks = [
run_with_limit(semaphore, client, f"Exp {i}", template, collection)
for i in range(20)
]
experiments = await asyncio.gather(*tasks)
asyncio.run(main())
3. Reuse Client Instances¶
Create one AsyncClient per application, not per request:
# GOOD: Single client instance
class App:
def __init__(self):
self.client = AsyncClient()
async def process(self):
return await self.client.run_experiment(...)
async def cleanup(self):
await self.client.close()
# BAD: New client per operation (connection overhead)
async def process():
async with AsyncClient() as client: # Creates new connection
return await client.run_experiment(...)
Migration from Sync to Async¶
Converting synchronous code to async is straightforward:
Before (Synchronous)¶
from elluminate import Client
client = Client()
template = client.create_prompt_template(name="...", template="...")
collection = client.create_collection(name="...")
collection.add_many(variables=[...])
experiment = client.run_experiment(
name="...",
prompt_template=template,
collection=collection,
)
After (Asynchronous)¶
from elluminate import AsyncClient
import asyncio
async def main():
async with AsyncClient() as client:
template = await client.create_prompt_template(name="...", template="...")
collection = await client.create_collection(name="...")
await collection.aadd_many(variables=[...]) # Note: aadd_many with 'a' prefix
experiment = await client.run_experiment(
name="...",
prompt_template=template,
collection=collection,
)
asyncio.run(main())
Key changes:
1. Import AsyncClient instead of Client
2. Wrap code in async def main()
3. Use async with AsyncClient() context manager
4. Add await before all client methods
5. Rich model methods get a prefix: add_many → aadd_many
6. Run with asyncio.run(main())
Error Handling¶
Error handling works the same as synchronous code:
from elluminate.exceptions import ConflictError, NotFoundError
async def handle_errors():
async with AsyncClient() as client:
try:
template = await client.create_prompt_template(name="Existing", ...)
except ConflictError:
template = await client.get_prompt_template(name="Existing")
try:
experiment = await client.get_experiment(name="NonExistent")
except NotFoundError:
print("Experiment not found")
Real-Time Streaming¶
AsyncClient supports Server-Sent Events (SSE) streaming for real-time progress updates during long-running operations. This is particularly useful for:
- Experiment Execution - Watch responses being generated and rated in real-time
- Batch Operations - Track status of batch rating operations
- Better UX - Show live progress instead of blocking spinners
- Early Error Detection - See failures immediately, not after timeout
Streaming Experiment Execution¶
Stream real-time progress during experiment execution with stream_experiment():
from elluminate import AsyncClient
from elluminate.streaming import TaskStatus
async with AsyncClient() as client:
async for event in client.stream_experiment(
name="My Experiment",
prompt_template=template,
collection=collection,
criteria=["Is it accurate?", "Is it helpful?"],
polling_interval=0.5, # Poll every 0.5 seconds
):
if event.status == TaskStatus.STARTED:
# Show live progress
if event.progress:
percent = event.progress.percent_complete
print(f"Progress: {percent:.1f}%")
print(f"Generated: {event.progress.responses_generated}/{event.progress.total_responses}")
print(f"Rated: {event.progress.responses_rated}/{event.progress.total_responses}")
# Show incremental logs
if event.logs_delta:
print(f"Log: {event.logs_delta}")
elif event.status == TaskStatus.SUCCESS:
print("✅ Complete!")
experiment = event.result # Final experiment with responses
elif event.is_failure:
print(f"❌ Failed: {event.error_msg}")
break
Key Features:
- Real-time progress: See responses generated, rated, and percentage complete
- Incremental logs: Get log messages as they occur
- Early termination: Stop immediately on failure
- Final result: Access completed experiment on SUCCESS
Terminal States¶
Streaming operations can end in several terminal states. Always handle all possible outcomes:
| Status | Meaning | When It Occurs | User Action |
|---|---|---|---|
SUCCESS |
Completed successfully | All responses generated and rated | Process results from event.result |
FAILURE |
Operation failed | LLM error, validation error, system error | Check event.error_msg, fix issue and retry |
TIMEOUT |
Exceeded time limit | Operation runs longer than 10 hours | Consider smaller batches or contact support |
REVOKED |
Task cancelled | Manual cancellation or system shutdown | Re-run if needed |
REJECTED |
Task rejected before start | Queue full or invalid configuration | Check inputs and try again |
Example handling all terminal states:
async for event in client.stream_experiment(...):
if event.status == TaskStatus.STARTED:
# Handle progress updates
pass
elif event.status == TaskStatus.SUCCESS:
print("✅ Complete!")
experiment = event.result
break
elif event.status == TaskStatus.FAILURE:
print(f"❌ Failed: {event.error_msg}")
break
elif event.status == TaskStatus.TIMEOUT:
print(f"⏱️ Timed out after 10 hours: {event.error_msg}")
break
elif event.status in {TaskStatus.REVOKED, TaskStatus.REJECTED}:
print(f"⚠️ Operation {event.status.lower()}: {event.error_msg}")
break
Check is_complete Property
All terminal states have event.is_complete == True. Use this to detect when streaming ends:
Streaming Batch Rating¶
Stream status updates for batch rating operations with stream_batch_rate():
async with AsyncClient() as client:
# Get responses to rate
responses = list(experiment.responses())
async for event in client.stream_batch_rate(
prompt_responses=responses,
rating_mode=RatingMode.DETAILED,
):
if event.status == TaskStatus.STARTED:
print("Rating in progress...")
elif event.status == TaskStatus.SUCCESS:
ratings = event.result # List[List[Rating]]
print(f"Rated {len(ratings)} responses")
elif event.is_failure:
print(f"Failed: {event.error_msg}")
break
Batch Rating Streaming Behavior
Available: All terminal states (SUCCESS, FAILURE, TIMEOUT, REVOKED, REJECTED) with error_msg when applicable.
Not Available: Progress metrics (event.progress is always None). The backend doesn't track how many responses have been rated during batch operations.
Why: Batch rating is typically fast (1-2 seconds per response), so progress tracking was deemed unnecessary. This may change in future versions.
Complete Streaming Example¶
See a full example with progress bar and error handling:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | |
Streaming vs Blocking¶
| Feature | run_experiment() |
stream_experiment() |
|---|---|---|
| Progress visibility | ❌ None | ✅ Real-time |
| Early error detection | ❌ After timeout | ✅ Immediate |
| UX for long operations | ❌ Loading spinner | ✅ Progress bar |
| API calls | ✅ Single | ✅ SSE stream |
| Use case | Quick experiments | Long-running, many test cases |
When to use streaming:
- Experiments with many test cases (>10)
- Slow models (takes >10 seconds)
- User-facing applications (need progress UI)
- Debugging (want to see logs in real-time)
When to use blocking:
- Quick experiments (<10 test cases)
- Background processing (no progress needed)
- Simple scripts
Best Practices¶
- Always use context manager: Ensures proper cleanup of resources
- Use asyncio.gather() for concurrency: Don't await in a loop
- Limit concurrent operations: Use semaphores for expensive operations
- Reuse client instances: One client per application, not per request
- Handle exceptions properly: Async exceptions work the same as sync
- Use streaming for long operations: Better UX with real-time progress
Next Steps¶
- Learn about Batch Operations for efficient processing
- Explore Experiments for evaluation workflows
- Check out Collections for managing test cases