Skip to content

Async Operations

Optimize performance with asynchronous operations for large-scale prompt evaluations and concurrent processing

The elluminate SDK provides a full-featured AsyncClient for asynchronous operations. This is particularly useful when:

  • Running Multiple Experiments Concurrently - Execute several experiments in parallel instead of sequentially
  • Integration with Async Frameworks - Use with FastAPI, aiohttp, or other async web frameworks
  • Large-Scale Batch Processing - Process hundreds or thousands of operations with better resource utilization
  • Non-Blocking Operations - Keep your application responsive during long-running evaluations

AsyncClient vs Client

The SDK provides two client classes with identical APIs:

  • Client - Synchronous operations (blocking I/O)
  • AsyncClient - Asynchronous operations (non-blocking I/O)

All public methods have the same signature; the only difference is that AsyncClient methods must be awaited:

from elluminate import Client, AsyncClient

# Synchronous
client = Client()
template = client.create_prompt_template(name="Test", template="...")

# Asynchronous (same signature, just add await)
async with AsyncClient() as client:
    template = await client.create_prompt_template(name="Test", template="...")

Basic Usage

Always use the async context manager to ensure proper resource cleanup:

import asyncio
from elluminate import AsyncClient

async def main():
    async with AsyncClient() as client:
        # Create resources
        template = await client.create_prompt_template(
            name="My Template",
            template="Explain {{concept}} in simple terms.",
        )

        # Run experiment
        experiment = await client.run_experiment(
            name="My Experiment",
            prompt_template=template,
            collection=collection,
        )

        print(f"Completed: {experiment.name}")

# Run the async function
asyncio.run(main())

Manual Resource Management

If you need manual control over the client lifecycle:

from elluminate import AsyncClient

async def main():
    client = AsyncClient()
    try:
        template = await client.create_prompt_template(...)
    finally:
        await client.close()  # Important: clean up resources

asyncio.run(main())

Complete Example

This example demonstrates the full async workflow:

"""Async version of example_sdk_usage.py using AsyncClient.

Demonstrates the async SDK workflow:
1. Create a prompt template
2. Generate evaluation criteria using AI (async rich model method)
3. Create a test collection and add variables (async rich model method)
4. Run an experiment (async)
5. Review results

The async API allows for concurrent operations when needed, e.g.:
- Running multiple experiments in parallel with asyncio.gather()
- Non-blocking API calls in async web frameworks
- Integration with async web frameworks (FastAPI, aiohttp, etc.)
"""

import asyncio

from dotenv import load_dotenv
from elluminate import AsyncClient

load_dotenv(override=True)


async def main():
    # Use async context manager for proper resource management
    async with AsyncClient() as client:
        # Create a prompt template with a placeholder
        template = await client.create_prompt_template(
            name="Scheme Concepts Async",
            messages="Explain how {{concept}} works in Scheme, providing a short but illustrative code example.",
        )
        print(f"✓ Created template: {template.name}")

        # Generate evaluation criteria using AI
        criteria = await template.agenerate_criteria()
        print(f"✓ Generated {len(criteria)} criteria using AI")

        # Get the criterion set that was created
        criterion_set = await client.get_criterion_set(name=template.name)

        # Create a collection and add test cases
        collection = await client.create_collection(name="Scheme Concepts Async")
        await collection.aadd_many(
            variables=[
                {"concept": "recursion"},
                {"concept": "closures"},
            ]
        )
        print(f"✓ Created collection and added {len(collection.variables)} test cases")

        # Run the experiment - creates responses and rates them
        experiment = await client.run_experiment(
            name="Scheme Concepts Analysis Async",
            prompt_template=template,
            collection=collection,
            criterion_set=criterion_set,
        )
        print(f"✓ Experiment completed: {experiment.name}")

        # Review the results
        print("\n=== Results ===")
        for response in experiment.responses():
            print(f"\nInput: {response.template_variables}")
            print(f"Output: {response.response_text[:100]}...")
            if response.ratings:
                for rating in response.ratings:
                    print(f"  - {rating.criterion.criterion_str}: {rating.rating}")


if __name__ == "__main__":
    asyncio.run(main())
  1. Use async context manager for proper resource management
  2. All client methods are async - use await
  3. Rich model methods (on schema objects) use a prefix: agenerate_criteria(), aadd_many()
  4. Access results the same way as synchronous code

Concurrent Execution

The real power of AsyncClient comes from running multiple operations concurrently with asyncio.gather():

"""Example demonstrating concurrent async operations with AsyncClient.

This example shows:
1. Using rich model async methods (aget_or_generate_criteria, aadd_many)
2. Running multiple experiments concurrently with asyncio.gather()
3. The key benefit of the async API: parallel execution

Use cases:
- A/B testing multiple LLM configs simultaneously
- Running the same test set across different models
- Parallelizing large evaluation workloads
"""

import asyncio

from dotenv import load_dotenv
from elluminate import AsyncClient

load_dotenv(override=True)


async def main():
    async with AsyncClient() as client:
        print("Setting up shared resources...")

        # Set up shared resources
        template, created = await client.get_or_create_prompt_template(
            name="Concurrent Test Template",
            messages="Write a haiku about {{topic}}.",
        )
        if created:
            print(f"✓ Created template: {template.name}")
        else:
            print(f"✓ Using existing template: {template.name}")

        # Generate or get criteria using rich model async method
        criteria, generated = await template.aget_or_generate_criteria()
        if generated:
            print(f"✓ Generated {len(criteria)} criteria using AI")
        else:
            print(f"✓ Using existing {len(criteria)} criteria")

        # Get the criterion set
        criterion_set = await client.get_criterion_set(name=template.name)

        # Create test collection
        collection, created = await client.get_or_create_collection(
            name="Concurrent Test Collection",
        )

        # Add test cases if collection was just created or is empty
        if created or collection.variables_count == 0:
            await collection.aadd_many(
                variables=[
                    {"topic": "programming"},
                    {"topic": "coffee"},
                    {"topic": "mountains"},
                ]
            )
            print(f"✓ Created collection and added {len(collection.variables)} test cases")
        else:
            print(f"✓ Using existing collection with {collection.variables_count} test cases")

        print("\n🚀 Running 3 experiments concurrently...")

        # Run multiple experiments concurrently using asyncio.gather
        # This is significantly faster than running them sequentially
        experiments = await asyncio.gather(
            client.run_experiment(
                name="Concurrent Test - Run 1",
                prompt_template=template,
                collection=collection,
                criterion_set=criterion_set,
            ),
            client.run_experiment(
                name="Concurrent Test - Run 2",
                prompt_template=template,
                collection=collection,
                criterion_set=criterion_set,
            ),
            client.run_experiment(
                name="Concurrent Test - Run 3",
                prompt_template=template,
                collection=collection,
                criterion_set=criterion_set,
            ),
        )

        # Compare results across runs
        print("\n=== Concurrent Experiment Results ===")
        for exp in experiments:
            print(f"\n{exp.name}:")
            print(f"  Responses: {len(exp.rated_responses)}")
            if exp.results:
                print(f"  Success rate: {exp.results.mean_all_ratings.yes:.2%}")
            else:
                print("  No ratings yet")


if __name__ == "__main__":
    asyncio.run(main())
  1. Set up shared resources (template, criteria, collection)
  2. Run multiple experiments concurrently using asyncio.gather() - significantly faster than sequential execution
  3. Compare results across all experiments

Performance Comparison

For 3 experiments with 5 test cases each:

  • Sequential (sync): ~45 seconds (3 experiments × 15 seconds each)
  • Concurrent (async): ~15 seconds (all 3 run in parallel)

3x faster with concurrent execution!

Async Method Reference

AsyncClient Public Methods

AsyncClient public methods do NOT use the a prefix:

async with AsyncClient() as client:
    # Prompt Templates
    await client.create_prompt_template(...)
    await client.get_prompt_template(...)
    await client.get_or_create_prompt_template(...)
    await client.list_prompt_templates()
    await client.delete_prompt_template(...)

    # Collections
    await client.create_collection(...)
    await client.get_collection(...)
    await client.get_or_create_collection(...)
    await client.list_collections()
    await client.delete_collection(...)

    # Experiments
    await client.create_experiment(...)
    await client.run_experiment(...)  # Creates and runs
    await client.get_experiment(...)
    await client.list_experiments()
    await client.delete_experiment(...)

    # Criterion Sets
    await client.create_criterion_set(...)
    await client.get_criterion_set(...)
    await client.get_or_create_criterion_set(...)
    await client.list_criterion_sets()

    # LLM Configs
    await client.create_llm_config(...)
    await client.get_llm_config(...)
    await client.get_or_create_llm_config(...)
    await client.list_llm_configs()

    # And more...

Rich Model Async Methods

Rich model methods (on schema objects) DO use the a prefix:

# PromptTemplate
template = await client.get_prompt_template(name="...")
criteria = await template.agenerate_criteria()
criteria, generated = await template.aget_or_generate_criteria()
all_criteria = await template.alist_criteria()
new_template = await template.anew_version(template="...")

# TemplateVariablesCollection
collection = await client.get_collection(name="...")
await collection.aadd_many(variables=[...])
await collection.aclear()
await collection.agenerate_variables(prompt_template)

# CriterionSet
criterion_set = await client.get_criterion_set(name="...")
await criterion_set.aadd_criterion(criterion="...")
await criterion_set.aadd_criteria(criteria=[...])
await criterion_set.aclear()
await criterion_set.alink_template(template)
await criterion_set.aunlink_template(template)

# Experiment
experiment = await client.get_experiment(name="...")
await experiment.arun()
await experiment.afetch_responses()
await experiment.aadd_responses(responses=[...], template_variables=[...])
await experiment.arate_responses()
new_exp = await experiment.aclone(name="...")

Common Patterns

Pattern 1: Concurrent A/B Testing

Run multiple prompt variations simultaneously:

async def ab_test():
    async with AsyncClient() as client:
        # Set up shared resources
        criterion_set = await client.create_criterion_set(name="Quality")
        await criterion_set.aadd_criteria([
            "Is the response helpful?",
            "Is it accurate?",
        ])

        collection = await client.create_collection(name="Test Cases")
        await collection.aadd_many(variables=[
            {"topic": "AI"},
            {"topic": "ML"},
        ])

        # Create templates
        template_a = await client.create_prompt_template(
            name="Style A",
            template="Explain {{topic}} briefly.",
        )
        template_b = await client.create_prompt_template(
            name="Style B", 
            template="Explain {{topic}} with examples.",
        )

        # Run both experiments concurrently
        exp_a, exp_b = await asyncio.gather(
            client.run_experiment(
                name="Test A",
                prompt_template=template_a,
                collection=collection,
                criterion_set=criterion_set,
            ),
            client.run_experiment(
                name="Test B",
                prompt_template=template_b,
                collection=collection,
                criterion_set=criterion_set,
            ),
        )

        # Compare results
        print(f"A: {exp_a.results.mean_all_ratings.yes:.1%}")
        print(f"B: {exp_b.results.mean_all_ratings.yes:.1%}")

asyncio.run(ab_test())

Pattern 2: Batch Processing Test Cases

Process multiple test case collections in parallel:

async def batch_process():
    async with AsyncClient() as client:
        template = await client.get_prompt_template(name="My Template")
        criterion_set = await client.get_criterion_set(name="My Criteria")

        # Get multiple collections
        collections = await asyncio.gather(
            client.get_collection(name="Collection 1"),
            client.get_collection(name="Collection 2"),
            client.get_collection(name="Collection 3"),
        )

        # Run experiments on all collections concurrently
        experiments = await asyncio.gather(*[
            client.run_experiment(
                name=f"Experiment {i+1}",
                prompt_template=template,
                collection=coll,
                criterion_set=criterion_set,
            )
            for i, coll in enumerate(collections)
        ])

        return experiments

results = asyncio.run(batch_process())

Pattern 3: FastAPI Integration

Use AsyncClient in a FastAPI endpoint:

from fastapi import FastAPI
from elluminate import AsyncClient

app = FastAPI()

# Create a single AsyncClient instance for the application
async_client = AsyncClient()

@app.on_event("startup")
async def startup():
    global async_client
    async_client = AsyncClient()

@app.on_event("shutdown")
async def shutdown():
    await async_client.close()

@app.post("/evaluate")
async def evaluate_prompt(prompt: str, test_case: dict):
    # Get or create template
    template, _ = await async_client.get_or_create_prompt_template(
        name="API Template",
        template=prompt,
    )

    # Generate criteria
    criteria, _ = await template.aget_or_generate_criteria()
    criterion_set = await async_client.get_criterion_set(name=template.name)

    # Create collection with single test case
    collection = await async_client.create_collection(name=f"Test {timestamp}")
    await collection.aadd_many(variables=[test_case])

    # Run experiment
    experiment = await async_client.run_experiment(
        name=f"Eval {timestamp}",
        prompt_template=template,
        collection=collection,
        criterion_set=criterion_set,
    )

    return {
        "experiment_id": experiment.id,
        "pass_rate": experiment.results.mean_all_ratings.yes if experiment.results else 0,
    }

Jupyter Notebook Support

AsyncClient works in Jupyter notebooks with nest_asyncio:

# Install in notebook
!pip install elluminate nest-asyncio

# Enable nested event loops
import nest_asyncio
nest_asyncio.apply()

# Now you can use await in cells
from elluminate import AsyncClient

async with AsyncClient() as client:
    template = await client.create_prompt_template(...)
    experiment = await client.run_experiment(...)

Performance Tips

1. Use asyncio.gather() for Independent Operations

When operations don't depend on each other, run them concurrently:

# GOOD: Concurrent execution
template, collection, criterion_set = await asyncio.gather(
    client.get_prompt_template(name="..."),
    client.get_collection(name="..."),
    client.get_criterion_set(name="..."),
)

# BAD: Sequential execution (3x slower)
template = await client.get_prompt_template(name="...")
collection = await client.get_collection(name="...")
criterion_set = await client.get_criterion_set(name="...")

2. Limit Concurrency for Resource-Intensive Operations

Use semaphores to limit concurrent expensive operations:

import asyncio

async def run_with_limit(semaphore, client, name, template, collection):
    async with semaphore:
        return await client.run_experiment(
            name=name,
            prompt_template=template,
            collection=collection,
        )

async def main():
    async with AsyncClient() as client:
        # Limit to 5 concurrent experiments
        semaphore = asyncio.Semaphore(5)

        tasks = [
            run_with_limit(semaphore, client, f"Exp {i}", template, collection)
            for i in range(20)
        ]

        experiments = await asyncio.gather(*tasks)

asyncio.run(main())

3. Reuse Client Instances

Create one AsyncClient per application, not per request:

# GOOD: Single client instance
class App:
    def __init__(self):
        self.client = AsyncClient()

    async def process(self):
        return await self.client.run_experiment(...)

    async def cleanup(self):
        await self.client.close()

# BAD: New client per operation (connection overhead)
async def process():
    async with AsyncClient() as client:  # Creates new connection
        return await client.run_experiment(...)

Migration from Sync to Async

Converting synchronous code to async is straightforward:

Before (Synchronous)

from elluminate import Client

client = Client()

template = client.create_prompt_template(name="...", template="...")
collection = client.create_collection(name="...")
collection.add_many(variables=[...])

experiment = client.run_experiment(
    name="...",
    prompt_template=template,
    collection=collection,
)

After (Asynchronous)

from elluminate import AsyncClient
import asyncio

async def main():
    async with AsyncClient() as client:
        template = await client.create_prompt_template(name="...", template="...")
        collection = await client.create_collection(name="...")
        await collection.aadd_many(variables=[...])  # Note: aadd_many with 'a' prefix

        experiment = await client.run_experiment(
            name="...",
            prompt_template=template,
            collection=collection,
        )

asyncio.run(main())

Key changes: 1. Import AsyncClient instead of Client 2. Wrap code in async def main() 3. Use async with AsyncClient() context manager 4. Add await before all client methods 5. Rich model methods get a prefix: add_manyaadd_many 6. Run with asyncio.run(main())

Error Handling

Error handling works the same as synchronous code:

from elluminate.exceptions import ConflictError, NotFoundError

async def handle_errors():
    async with AsyncClient() as client:
        try:
            template = await client.create_prompt_template(name="Existing", ...)
        except ConflictError:
            template = await client.get_prompt_template(name="Existing")

        try:
            experiment = await client.get_experiment(name="NonExistent")
        except NotFoundError:
            print("Experiment not found")

Real-Time Streaming

AsyncClient supports Server-Sent Events (SSE) streaming for real-time progress updates during long-running operations. This is particularly useful for:

  • Experiment Execution - Watch responses being generated and rated in real-time
  • Batch Operations - Track status of batch rating operations
  • Better UX - Show live progress instead of blocking spinners
  • Early Error Detection - See failures immediately, not after timeout

Streaming Experiment Execution

Stream real-time progress during experiment execution with stream_experiment():

from elluminate import AsyncClient
from elluminate.streaming import TaskStatus

async with AsyncClient() as client:
    async for event in client.stream_experiment(
        name="My Experiment",
        prompt_template=template,
        collection=collection,
        criteria=["Is it accurate?", "Is it helpful?"],
        polling_interval=0.5,  # Poll every 0.5 seconds
    ):
        if event.status == TaskStatus.STARTED:
            # Show live progress
            if event.progress:
                percent = event.progress.percent_complete
                print(f"Progress: {percent:.1f}%")
                print(f"Generated: {event.progress.responses_generated}/{event.progress.total_responses}")
                print(f"Rated: {event.progress.responses_rated}/{event.progress.total_responses}")

            # Show incremental logs
            if event.logs_delta:
                print(f"Log: {event.logs_delta}")

        elif event.status == TaskStatus.SUCCESS:
            print("✅ Complete!")
            experiment = event.result  # Final experiment with responses

        elif event.is_failure:
            print(f"❌ Failed: {event.error_msg}")
            break

Key Features:

  • Real-time progress: See responses generated, rated, and percentage complete
  • Incremental logs: Get log messages as they occur
  • Early termination: Stop immediately on failure
  • Final result: Access completed experiment on SUCCESS

Terminal States

Streaming operations can end in several terminal states. Always handle all possible outcomes:

Status Meaning When It Occurs User Action
SUCCESS Completed successfully All responses generated and rated Process results from event.result
FAILURE Operation failed LLM error, validation error, system error Check event.error_msg, fix issue and retry
TIMEOUT Exceeded time limit Operation runs longer than 10 hours Consider smaller batches or contact support
REVOKED Task cancelled Manual cancellation or system shutdown Re-run if needed
REJECTED Task rejected before start Queue full or invalid configuration Check inputs and try again

Example handling all terminal states:

async for event in client.stream_experiment(...):
    if event.status == TaskStatus.STARTED:
        # Handle progress updates
        pass

    elif event.status == TaskStatus.SUCCESS:
        print("✅ Complete!")
        experiment = event.result
        break

    elif event.status == TaskStatus.FAILURE:
        print(f"❌ Failed: {event.error_msg}")
        break

    elif event.status == TaskStatus.TIMEOUT:
        print(f"⏱️  Timed out after 10 hours: {event.error_msg}")
        break

    elif event.status in {TaskStatus.REVOKED, TaskStatus.REJECTED}:
        print(f"⚠️  Operation {event.status.lower()}: {event.error_msg}")
        break

Check is_complete Property

All terminal states have event.is_complete == True. Use this to detect when streaming ends:

if event.is_complete:
    # Handle terminal state
    if event.is_success:
        # Process results
    elif event.is_failure:
        # Handle error (includes FAILURE, TIMEOUT, REVOKED, REJECTED)

Streaming Batch Rating

Stream status updates for batch rating operations with stream_batch_rate():

async with AsyncClient() as client:
    # Get responses to rate
    responses = list(experiment.responses())

    async for event in client.stream_batch_rate(
        prompt_responses=responses,
        rating_mode=RatingMode.DETAILED,
    ):
        if event.status == TaskStatus.STARTED:
            print("Rating in progress...")

        elif event.status == TaskStatus.SUCCESS:
            ratings = event.result  # List[List[Rating]]
            print(f"Rated {len(ratings)} responses")

        elif event.is_failure:
            print(f"Failed: {event.error_msg}")
            break

Batch Rating Streaming Behavior

Available: All terminal states (SUCCESS, FAILURE, TIMEOUT, REVOKED, REJECTED) with error_msg when applicable.

Not Available: Progress metrics (event.progress is always None). The backend doesn't track how many responses have been rated during batch operations.

Why: Batch rating is typically fast (1-2 seconds per response), so progress tracking was deemed unnecessary. This may change in future versions.

Complete Streaming Example

See a full example with progress bar and error handling:

"""Real-time streaming experiment with progress updates.

Demonstrates the streaming SDK workflow:
1. Create a prompt template and test collection
2. Stream experiment execution with real-time progress
3. Display live updates as responses are generated and rated
4. Access the completed experiment when finished

The streaming API provides real-time feedback during long-running experiments:
- Live progress tracking (responses generated, ratings completed)
- Per-epoch progress for multi-epoch experiments
- Incremental log messages
- Early error detection
- Better user experience for experiments with many test cases
"""

import asyncio

from dotenv import load_dotenv
from elluminate import AsyncClient
from elluminate.streaming import TaskStatus

load_dotenv(override=True)


async def main():
    # Use async context manager for proper resource management
    async with AsyncClient() as client:
        # Create a prompt template
        template = await client.create_prompt_template(
            name="Programming Concepts Streaming",
            messages="Explain {{concept}} in {{language}}, providing a clear code example.",
        )
        print(f"✓ Created template: {template.name}")

        # Create a collection with multiple test cases
        collection = await client.create_collection(name="Programming Concepts Streaming")
        await collection.aadd_many(
            variables=[
                {"concept": "recursion", "language": "Python"},
                {"concept": "closures", "language": "JavaScript"},
                {"concept": "async/await", "language": "Python"},
                {"concept": "generators", "language": "Python"},
                {"concept": "promises", "language": "JavaScript"},
            ]
        )
        print(f"✓ Created collection with {len(collection.variables)} test cases")

        # Stream the experiment with real-time progress
        print("\n=== Streaming Experiment ===")

        async for event in client.stream_experiment(
            name="Programming Concepts Analysis Streaming",
            prompt_template=template,
            collection=collection,
            criteria=[
                "Is the explanation clear and accurate?",
                "Is the code example correct and illustrative?",
            ],
            polling_interval=0.5,  # Poll every 0.5 seconds
        ):
            if event.status == TaskStatus.PENDING:
                print("⏳ Experiment queued, waiting to start...")

            elif event.status == TaskStatus.STARTED:
                if event.progress:
                    # Calculate and display progress
                    percent = event.progress.percent_complete
                    completed = event.progress.responses_generated
                    rated = event.progress.responses_rated
                    total = event.progress.total_responses

                    # Progress bar
                    bar_length = 40
                    filled = int(bar_length * percent / 100)
                    bar = "█" * filled + "░" * (bar_length - filled)

                    print(
                        f"\r🔄 [{bar}] {percent:.1f}% | Generated: {completed}/{total} | Rated: {rated}/{total}",
                        end="",
                        flush=True,
                    )

                # Show incremental logs if available
                if event.logs_delta:
                    print(f"\n📝 Log: {event.logs_delta}")

            elif event.status == TaskStatus.SUCCESS:
                print("\n✅ Experiment completed successfully!")

                # Access the final experiment
                experiment = event.result
                if experiment:
                    print(f"\n=== Results for '{experiment.name}' ===")
                    print(f"Total responses: {len(list(experiment.responses()))}")

                    # Show mean ratings
                    if experiment.results:
                        print(f"Mean rating: {experiment.results.mean_all_ratings}")

                    # Show first few responses
                    print("\nSample responses:")
                    for i, response in enumerate(list(experiment.responses())[:2], 1):
                        print(f"\n{i}. Input: {response.template_variables}")
                        print(f"   Output: {response.response_text[:150]}...")
                        if response.ratings:
                            print("   Ratings:")
                            for rating in response.ratings:
                                print(f"     - {rating.criterion.criterion_str}: {rating.rating}")

            elif event.status == TaskStatus.FAILURE:
                print(f"\n❌ Experiment failed: {event.error_msg}")
                break

            elif event.status == TaskStatus.TIMEOUT:
                print(f"\n⏱️  Experiment timed out: {event.error_msg}")
                break

            elif event.status in {TaskStatus.REVOKED, TaskStatus.REJECTED}:
                print(f"\n⚠️  Experiment {event.status.lower()}")
                break


if __name__ == "__main__":
    asyncio.run(main())

Streaming vs Blocking

Feature run_experiment() stream_experiment()
Progress visibility ❌ None ✅ Real-time
Early error detection ❌ After timeout ✅ Immediate
UX for long operations ❌ Loading spinner ✅ Progress bar
API calls ✅ Single ✅ SSE stream
Use case Quick experiments Long-running, many test cases

When to use streaming:

  • Experiments with many test cases (>10)
  • Slow models (takes >10 seconds)
  • User-facing applications (need progress UI)
  • Debugging (want to see logs in real-time)

When to use blocking:

  • Quick experiments (<10 test cases)
  • Background processing (no progress needed)
  • Simple scripts

Best Practices

  1. Always use context manager: Ensures proper cleanup of resources
  2. Use asyncio.gather() for concurrency: Don't await in a loop
  3. Limit concurrent operations: Use semaphores for expensive operations
  4. Reuse client instances: One client per application, not per request
  5. Handle exceptions properly: Async exceptions work the same as sync
  6. Use streaming for long operations: Better UX with real-time progress

Next Steps