Zum Inhalt

Asynchrone Operationen

Optimieren Sie die Leistung mit asynchronen Operationen für umfangreiche Prompt-Evaluierungen und gleichzeitige Verarbeitung

Das elluminate SDK bietet einen vollständigen AsyncClient für asynchrone Operationen. Dies ist besonders nützlich bei:

  • Gleichzeitiger Ausführung mehrerer Experimente - Mehrere Experimente parallel statt nacheinander ausführen
  • Integration mit Async-Frameworks - Verwendung mit FastAPI, aiohttp oder anderen Async-Web-Frameworks
  • Umfangreiche Batch-Verarbeitung - Hunderte oder Tausende von Operationen mit besserer Ressourcennutzung verarbeiten
  • Non-Blocking-Operationen - Ihre Anwendung bleibt während langwieriger Evaluierungen reaktionsfähig

AsyncClient vs Client

Das SDK bietet zwei Client-Klassen mit identischen APIs:

  • Client - Synchrone Operationen (blockierendes I/O)
  • AsyncClient - Asynchrone Operationen (nicht-blockierendes I/O)

Alle öffentlichen Methoden haben die gleiche Signatur; der einzige Unterschied ist, dass AsyncClient-Methoden mit await verwendet werden müssen:

from elluminate import Client, AsyncClient

# Synchron
client = Client()
template = client.create_prompt_template(name="Test", template="...")

# Asynchron (gleiche Signatur, nur await hinzufügen)
async with AsyncClient() as client:
    template = await client.create_prompt_template(name="Test", template="...")

Grundlegende Verwendung

Context Manager (Empfohlen)

Verwenden Sie immer den Async-Context-Manager, um eine ordnungsgemäße Ressourcenbereinigung sicherzustellen:

import asyncio
from elluminate import AsyncClient

async def main():
    async with AsyncClient() as client:
        # Ressourcen erstellen
        template = await client.create_prompt_template(
            name="Meine Vorlage",
            template="Erkläre {{concept}} in einfachen Worten.",
        )

        # Experiment ausführen
        experiment = await client.run_experiment(
            name="Mein Experiment",
            prompt_template=template,
            collection=collection,
        )

        print(f"Abgeschlossen: {experiment.name}")

# Async-Funktion ausführen
asyncio.run(main())

Manuelle Ressourcenverwaltung

Falls Sie manuelle Kontrolle über den Client-Lebenszyklus benötigen:

from elluminate import AsyncClient

async def main():
    client = AsyncClient()
    try:
        template = await client.create_prompt_template(...)
    finally:
        await client.close()  # Wichtig: Ressourcen bereinigen

asyncio.run(main())

Vollständiges Beispiel

Dieses Beispiel demonstriert den vollständigen Async-Workflow:

"""Async version of example_sdk_usage.py using AsyncClient.

Demonstrates the async SDK workflow:
1. Create a prompt template
2. Generate evaluation criteria using AI (async rich model method)
3. Create a test collection and add variables (async rich model method)
4. Run an experiment (async)
5. Review results

The async API allows for concurrent operations when needed, e.g.:
- Running multiple experiments in parallel with asyncio.gather()
- Non-blocking API calls in async web frameworks
- Integration with async web frameworks (FastAPI, aiohttp, etc.)
"""

import asyncio

from dotenv import load_dotenv
from elluminate import AsyncClient

load_dotenv(override=True)


async def main():
    # Use async context manager for proper resource management
    async with AsyncClient() as client:
        # Create a prompt template with a placeholder
        template = await client.create_prompt_template(
            name="Scheme Concepts Async",
            messages="Explain how {{concept}} works in Scheme, providing a short but illustrative code example.",
        )
        print(f"✓ Created template: {template.name}")

        # Generate evaluation criteria using AI
        criteria = await template.agenerate_criteria()
        print(f"✓ Generated {len(criteria)} criteria using AI")

        # Get the criterion set that was created
        criterion_set = await client.get_criterion_set(name=template.name)

        # Create a collection and add test cases
        collection = await client.create_collection(name="Scheme Concepts Async")
        await collection.aadd_many(
            variables=[
                {"concept": "recursion"},
                {"concept": "closures"},
            ]
        )
        print(f"✓ Created collection and added {len(collection.variables)} test cases")

        # Run the experiment - creates responses and rates them
        experiment = await client.run_experiment(
            name="Scheme Concepts Analysis Async",
            prompt_template=template,
            collection=collection,
            criterion_set=criterion_set,
        )
        print(f"✓ Experiment completed: {experiment.name}")

        # Review the results
        print("\n=== Results ===")
        for response in experiment.responses():
            print(f"\nInput: {response.template_variables}")
            print(f"Output: {response.response_text[:100]}...")
            if response.ratings:
                for rating in response.ratings:
                    print(f"  - {rating.criterion.criterion_str}: {rating.rating}")


if __name__ == "__main__":
    asyncio.run(main())
  1. Verwenden Sie den Async-Context-Manager für ordnungsgemäße Ressourcenverwaltung
  2. Alle Client-Methoden sind asynchron - verwenden Sie await
  3. Rich-Model-Methoden (auf Schema-Objekten) verwenden das a-Präfix: agenerate_criteria(), aadd_many()
  4. Greifen Sie auf Ergebnisse auf die gleiche Weise wie bei synchronem Code zu

Gleichzeitige Ausführung

Die wahre Stärke von AsyncClient kommt von der gleichzeitigen Ausführung mehrerer Operationen mit asyncio.gather():

"""Example demonstrating concurrent async operations with AsyncClient.

This example shows:
1. Using rich model async methods (aget_or_generate_criteria, aadd_many)
2. Running multiple experiments concurrently with asyncio.gather()
3. The key benefit of the async API: parallel execution

Use cases:
- A/B testing multiple LLM configs simultaneously
- Running the same test set across different models
- Parallelizing large evaluation workloads
"""

import asyncio

from dotenv import load_dotenv
from elluminate import AsyncClient

load_dotenv(override=True)


async def main():
    async with AsyncClient() as client:
        print("Setting up shared resources...")

        # Set up shared resources
        template, created = await client.get_or_create_prompt_template(
            name="Concurrent Test Template",
            messages="Write a haiku about {{topic}}.",
        )
        if created:
            print(f"✓ Created template: {template.name}")
        else:
            print(f"✓ Using existing template: {template.name}")

        # Generate or get criteria using rich model async method
        criteria, generated = await template.aget_or_generate_criteria()
        if generated:
            print(f"✓ Generated {len(criteria)} criteria using AI")
        else:
            print(f"✓ Using existing {len(criteria)} criteria")

        # Get the criterion set
        criterion_set = await client.get_criterion_set(name=template.name)

        # Create test collection
        collection, created = await client.get_or_create_collection(
            name="Concurrent Test Collection",
        )

        # Add test cases if collection was just created or is empty
        if created or collection.variables_count == 0:
            await collection.aadd_many(
                variables=[
                    {"topic": "programming"},
                    {"topic": "coffee"},
                    {"topic": "mountains"},
                ]
            )
            print(f"✓ Created collection and added {len(collection.variables)} test cases")
        else:
            print(f"✓ Using existing collection with {collection.variables_count} test cases")

        print("\n🚀 Running 3 experiments concurrently...")

        # Run multiple experiments concurrently using asyncio.gather
        # This is significantly faster than running them sequentially
        experiments = await asyncio.gather(
            client.run_experiment(
                name="Concurrent Test - Run 1",
                prompt_template=template,
                collection=collection,
                criterion_set=criterion_set,
            ),
            client.run_experiment(
                name="Concurrent Test - Run 2",
                prompt_template=template,
                collection=collection,
                criterion_set=criterion_set,
            ),
            client.run_experiment(
                name="Concurrent Test - Run 3",
                prompt_template=template,
                collection=collection,
                criterion_set=criterion_set,
            ),
        )

        # Compare results across runs
        print("\n=== Concurrent Experiment Results ===")
        for exp in experiments:
            print(f"\n{exp.name}:")
            print(f"  Responses: {len(exp.rated_responses)}")
            if exp.results:
                print(f"  Success rate: {exp.results.mean_all_ratings.yes:.2%}")
            else:
                print("  No ratings yet")


if __name__ == "__main__":
    asyncio.run(main())
  1. Gemeinsame Ressourcen einrichten (Template, Kriterien, Collection)
  2. Mehrere Experimente gleichzeitig mit asyncio.gather() ausführen - deutlich schneller als sequentielle Ausführung
  3. Ergebnisse über alle Experimente vergleichen

Leistungsvergleich

Für 3 Experimente mit jeweils 5 Testfällen:

  • Sequenziell (sync): ~45 Sekunden (3 Experimente × 15 Sekunden)
  • Gleichzeitig (async): ~15 Sekunden (alle 3 laufen parallel)

3x schneller mit gleichzeitiger Ausführung!

Async-Methoden-Referenz

AsyncClient Öffentliche Methoden

AsyncClient öffentliche Methoden verwenden KEIN a-Präfix:

async with AsyncClient() as client:
    # Prompt-Templates
    await client.create_prompt_template(...)
    await client.get_prompt_template(...)
    await client.get_or_create_prompt_template(...)
    await client.list_prompt_templates()
    await client.delete_prompt_template(...)

    # Collections
    await client.create_collection(...)
    await client.get_collection(...)
    await client.get_or_create_collection(...)
    await client.list_collections()
    await client.delete_collection(...)

    # Experimente
    await client.create_experiment(...)
    await client.run_experiment(...)  # Erstellt und führt aus
    await client.get_experiment(...)
    await client.list_experiments()
    await client.delete_experiment(...)

    # Criterion Sets
    await client.create_criterion_set(...)
    await client.get_criterion_set(...)
    await client.get_or_create_criterion_set(...)
    await client.list_criterion_sets()

    # LLM Configs
    await client.create_llm_config(...)
    await client.get_llm_config(...)
    await client.get_or_create_llm_config(...)
    await client.list_llm_configs()

    # Und mehr...

Rich-Model Async-Methoden

Rich-Model-Methoden (auf Schema-Objekten) verwenden DAS a-Präfix:

# PromptTemplate
template = await client.get_prompt_template(name="...")
criteria = await template.agenerate_criteria()
criteria, generated = await template.aget_or_generate_criteria()
all_criteria = await template.alist_criteria()
new_template = await template.anew_version(template="...")

# TemplateVariablesCollection
collection = await client.get_collection(name="...")
await collection.aadd_many(variables=[...])
await collection.aclear()
await collection.agenerate_variables(prompt_template)

# CriterionSet
criterion_set = await client.get_criterion_set(name="...")
await criterion_set.aadd_criterion(criterion="...")
await criterion_set.aadd_criteria(criteria=[...])
await criterion_set.aclear()
await criterion_set.alink_template(template)
await criterion_set.aunlink_template(template)

# Experiment
experiment = await client.get_experiment(name="...")
await experiment.arun()
await experiment.afetch_responses()
await experiment.aadd_responses(responses=[...], template_variables=[...])
await experiment.arate_responses()
new_exp = await experiment.aclone(name="...")

Häufige Muster

Muster 1: Gleichzeitige A/B-Tests

Mehrere Prompt-Varianten gleichzeitig ausführen:

async def ab_test():
    async with AsyncClient() as client:
        # Gemeinsame Ressourcen einrichten
        criterion_set = await client.create_criterion_set(name="Qualität")
        await criterion_set.aadd_criteria([
            "Ist die Antwort hilfreich?",
            "Ist sie genau?",
        ])

        collection = await client.create_collection(name="Testfälle")
        await collection.aadd_many(variables=[
            {"topic": "KI"},
            {"topic": "ML"},
        ])

        # Templates erstellen
        template_a = await client.create_prompt_template(
            name="Stil A",
            template="Erkläre {{topic}} kurz.",
        )
        template_b = await client.create_prompt_template(
            name="Stil B",
            template="Erkläre {{topic}} mit Beispielen.",
        )

        # Beide Experimente gleichzeitig ausführen
        exp_a, exp_b = await asyncio.gather(
            client.run_experiment(
                name="Test A",
                prompt_template=template_a,
                collection=collection,
                criterion_set=criterion_set,
            ),
            client.run_experiment(
                name="Test B",
                prompt_template=template_b,
                collection=collection,
                criterion_set=criterion_set,
            ),
        )

        # Ergebnisse vergleichen
        print(f"A: {exp_a.results.mean_all_ratings.yes:.1%}")
        print(f"B: {exp_b.results.mean_all_ratings.yes:.1%}")

asyncio.run(ab_test())

Muster 2: Batch-Verarbeitung von Testfällen

Mehrere Testfall-Collections parallel verarbeiten:

async def batch_process():
    async with AsyncClient() as client:
        template = await client.get_prompt_template(name="Meine Vorlage")
        criterion_set = await client.get_criterion_set(name="Meine Kriterien")

        # Mehrere Collections abrufen
        collections = await asyncio.gather(
            client.get_collection(name="Collection 1"),
            client.get_collection(name="Collection 2"),
            client.get_collection(name="Collection 3"),
        )

        # Experimente auf allen Collections gleichzeitig ausführen
        experiments = await asyncio.gather(*[
            client.run_experiment(
                name=f"Experiment {i+1}",
                prompt_template=template,
                collection=coll,
                criterion_set=criterion_set,
            )
            for i, coll in enumerate(collections)
        ])

        return experiments

results = asyncio.run(batch_process())

Muster 3: FastAPI-Integration

AsyncClient in einem FastAPI-Endpoint verwenden:

from fastapi import FastAPI
from elluminate import AsyncClient

app = FastAPI()

# Eine einzelne AsyncClient-Instanz für die Anwendung erstellen
async_client = AsyncClient()

@app.on_event("startup")
async def startup():
    global async_client
    async_client = AsyncClient()

@app.on_event("shutdown")
async def shutdown():
    await async_client.close()

@app.post("/evaluate")
async def evaluate_prompt(prompt: str, test_case: dict):
    # Template abrufen oder erstellen
    template, _ = await async_client.get_or_create_prompt_template(
        name="API Template",
        template=prompt,
    )

    # Kriterien generieren
    criteria, _ = await template.aget_or_generate_criteria()
    criterion_set = await async_client.get_criterion_set(name=template.name)

    # Collection mit einzelnem Testfall erstellen
    collection = await async_client.create_collection(name=f"Test {timestamp}")
    await collection.aadd_many(variables=[test_case])

    # Experiment ausführen
    experiment = await async_client.run_experiment(
        name=f"Eval {timestamp}",
        prompt_template=template,
        collection=collection,
        criterion_set=criterion_set,
    )

    return {
        "experiment_id": experiment.id,
        "pass_rate": experiment.results.mean_all_ratings.yes if experiment.results else 0,
    }

Jupyter Notebook Unterstützung

AsyncClient funktioniert in Jupyter Notebooks mit nest_asyncio:

# Im Notebook installieren
!pip install elluminate nest-asyncio

# Verschachtelte Event-Loops aktivieren
import nest_asyncio
nest_asyncio.apply()

# Jetzt können Sie await in Zellen verwenden
from elluminate import AsyncClient

async with AsyncClient() as client:
    template = await client.create_prompt_template(...)
    experiment = await client.run_experiment(...)

Performance-Tipps

1. asyncio.gather() für unabhängige Operationen verwenden

Wenn Operationen nicht voneinander abhängen, führen Sie sie gleichzeitig aus:

# GUT: Gleichzeitige Ausführung
template, collection, criterion_set = await asyncio.gather(
    client.get_prompt_template(name="..."),
    client.get_collection(name="..."),
    client.get_criterion_set(name="..."),
)

# SCHLECHT: Sequenzielle Ausführung (3x langsamer)
template = await client.get_prompt_template(name="...")
collection = await client.get_collection(name="...")
criterion_set = await client.get_criterion_set(name="...")

2. Gleichzeitigkeit für ressourcenintensive Operationen begrenzen

Semaphoren verwenden, um gleichzeitige teure Operationen zu begrenzen:

import asyncio

async def run_with_limit(semaphore, client, name, template, collection):
    async with semaphore:
        return await client.run_experiment(
            name=name,
            prompt_template=template,
            collection=collection,
        )

async def main():
    async with AsyncClient() as client:
        # Auf 5 gleichzeitige Experimente begrenzen
        semaphore = asyncio.Semaphore(5)

        tasks = [
            run_with_limit(semaphore, client, f"Exp {i}", template, collection)
            for i in range(20)
        ]

        experiments = await asyncio.gather(*tasks)

asyncio.run(main())

3. Client-Instanzen wiederverwenden

Erstellen Sie einen AsyncClient pro Anwendung, nicht pro Anfrage:

# GUT: Einzelne Client-Instanz
class App:
    def __init__(self):
        self.client = AsyncClient()

    async def process(self):
        return await self.client.run_experiment(...)

    async def cleanup(self):
        await self.client.close()

# SCHLECHT: Neuer Client pro Operation (Verbindungs-Overhead)
async def process():
    async with AsyncClient() as client:  # Erstellt neue Verbindung
        return await client.run_experiment(...)

Migration von Sync zu Async

Die Konvertierung von synchronem zu asynchronem Code ist einfach:

Vorher (Synchron)

from elluminate import Client

client = Client()

template = client.create_prompt_template(name="...", template="...")
collection = client.create_collection(name="...")
collection.add_many(variables=[...])

experiment = client.run_experiment(
    name="...",
    prompt_template=template,
    collection=collection,
)

Nachher (Asynchron)

from elluminate import AsyncClient
import asyncio

async def main():
    async with AsyncClient() as client:
        template = await client.create_prompt_template(name="...", template="...")
        collection = await client.create_collection(name="...")
        await collection.aadd_many(variables=[...])  # Hinweis: aadd_many mit 'a'-Präfix

        experiment = await client.run_experiment(
            name="...",
            prompt_template=template,
            collection=collection,
        )

asyncio.run(main())

Wichtige Änderungen: 1. AsyncClient statt Client importieren 2. Code in async def main() verpacken 3. async with AsyncClient() Context Manager verwenden 4. await vor allen Client-Methoden hinzufügen 5. Rich-Model-Methoden erhalten a-Präfix: add_manyaadd_many 6. Mit asyncio.run(main()) ausführen

Fehlerbehandlung

Fehlerbehandlung funktioniert genauso wie bei synchronem Code:

from elluminate.exceptions import ConflictError, NotFoundError

async def handle_errors():
    async with AsyncClient() as client:
        try:
            template = await client.create_prompt_template(name="Existierend", ...)
        except ConflictError:
            template = await client.get_prompt_template(name="Existierend")

        try:
            experiment = await client.get_experiment(name="NichtExistent")
        except NotFoundError:
            print("Experiment nicht gefunden")

Echtzeit-Streaming

AsyncClient unterstützt Server-Sent Events (SSE) Streaming für Echtzeit-Fortschrittsaktualisierungen während langwieriger Operationen. Dies ist besonders nützlich für:

  • Experiment-Ausführung - Beobachten Sie die Generierung und Bewertung von Antworten in Echtzeit
  • Batch-Operationen - Verfolgen Sie den Status von Batch-Bewertungsoperationen
  • Bessere UX - Zeigen Sie Live-Fortschritt statt blockierender Spinner
  • Früherkennung von Fehlern - Sehen Sie Fehler sofort, nicht erst nach einem Timeout

Streaming-Experiment-Ausführung

Streamen Sie Echtzeit-Fortschritt während der Experiment-Ausführung mit stream_experiment():

from elluminate import AsyncClient
from elluminate.streaming import TaskStatus

async with AsyncClient() as client:
    async for event in client.stream_experiment(
        name="Mein Experiment",
        prompt_template=template,
        collection=collection,
        criteria=["Ist es genau?", "Ist es hilfreich?"],
        polling_interval=0.5,  # Alle 0,5 Sekunden abfragen
    ):
        if event.status == TaskStatus.STARTED:
            # Live-Fortschritt anzeigen
            if event.progress:
                percent = event.progress.percent_complete
                print(f"Fortschritt: {percent:.1f}%")
                print(f"Generiert: {event.progress.responses_generated}/{event.progress.total_responses}")
                print(f"Bewertet: {event.progress.responses_rated}/{event.progress.total_responses}")

            # Inkrementelle Logs anzeigen
            if event.logs_delta:
                print(f"Log: {event.logs_delta}")

        elif event.status == TaskStatus.SUCCESS:
            print("✅ Abgeschlossen!")
            experiment = event.result  # Finales Experiment mit Antworten

        elif event.is_failure:
            print(f"❌ Fehlgeschlagen: {event.error_msg}")
            break

Hauptfunktionen:

  • Echtzeit-Fortschritt: Sehen Sie generierte und bewertete Antworten sowie Fertigstellungsgrad
  • Inkrementelle Logs: Erhalten Sie Log-Nachrichten während sie auftreten
  • Frühzeitiger Abbruch: Stoppen Sie sofort bei Fehlern
  • Endergebnis: Greifen Sie bei SUCCESS auf das abgeschlossene Experiment zu

Terminal-Zustände

Streaming-Operationen können in verschiedenen Terminal-Zuständen enden. Behandeln Sie immer alle möglichen Ausgänge:

Status Bedeutung Wann es auftritt Benutzeraktion
SUCCESS Erfolgreich abgeschlossen Alle Antworten generiert und bewertet Ergebnisse aus event.result verarbeiten
FAILURE Operation fehlgeschlagen LLM-Fehler, Validierungsfehler, Systemfehler event.error_msg prüfen, Problem beheben und erneut versuchen
TIMEOUT Zeitlimit überschritten Operation läuft länger als 10 Stunden Kleinere Batches erwägen oder Support kontaktieren
REVOKED Task abgebrochen Manuelles Abbrechen oder System-Herunterfahren Bei Bedarf erneut ausführen
REJECTED Task vor Start abgelehnt Queue voll oder ungültige Konfiguration Eingaben prüfen und erneut versuchen

Beispiel für die Behandlung aller Terminal-Zustände:

async for event in client.stream_experiment(...):
    if event.status == TaskStatus.STARTED:
        # Fortschrittsaktualisierungen behandeln
        pass

    elif event.status == TaskStatus.SUCCESS:
        print("✅ Abgeschlossen!")
        experiment = event.result
        break

    elif event.status == TaskStatus.FAILURE:
        print(f"❌ Fehlgeschlagen: {event.error_msg}")
        break

    elif event.status == TaskStatus.TIMEOUT:
        print(f"⏱️  Timeout nach 10 Stunden: {event.error_msg}")
        break

    elif event.status in {TaskStatus.REVOKED, TaskStatus.REJECTED}:
        print(f"⚠️  Operation {event.status.lower()}: {event.error_msg}")
        break

Prüfen Sie die is_complete Eigenschaft

Alle Terminal-Zustände haben event.is_complete == True. Verwenden Sie dies, um zu erkennen, wann das Streaming endet:

if event.is_complete:
    # Terminal-Zustand behandeln
    if event.is_success:
        # Ergebnisse verarbeiten
    elif event.is_failure:
        # Fehler behandeln (umfasst FAILURE, TIMEOUT, REVOKED, REJECTED)

Streaming-Batch-Bewertung

Streamen Sie Statusaktualisierungen für Batch-Bewertungsoperationen mit stream_batch_rate():

async with AsyncClient() as client:
    # Antworten zum Bewerten abrufen
    responses = list(experiment.responses())

    async for event in client.stream_batch_rate(
        prompt_responses=responses,
        rating_mode=RatingMode.DETAILED,
    ):
        if event.status == TaskStatus.STARTED:
            print("Bewertung läuft...")

        elif event.status == TaskStatus.SUCCESS:
            ratings = event.result  # List[List[Rating]]
            print(f"{len(ratings)} Antworten bewertet")

        elif event.is_failure:
            print(f"Fehlgeschlagen: {event.error_msg}")
            break

Batch-Bewertungs-Streaming-Verhalten

Verfügbar: Alle Terminal-Zustände (SUCCESS, FAILURE, TIMEOUT, REVOKED, REJECTED) mit error_msg falls zutreffend.

Nicht verfügbar: Fortschrittsmetriken (event.progress ist immer None). Das Backend verfolgt nicht, wie viele Antworten während Batch-Operationen bewertet wurden.

Warum: Batch-Bewertung ist typischerweise schnell (1-2 Sekunden pro Antwort), daher wurde Fortschrittsverfolgung als unnötig erachtet. Dies kann sich in zukünftigen Versionen ändern.

Vollständiges Streaming-Beispiel

Siehe ein vollständiges Beispiel mit Fortschrittsbalken und Fehlerbehandlung:

"""Real-time streaming experiment with progress updates.

Demonstrates the streaming SDK workflow:
1. Create a prompt template and test collection
2. Stream experiment execution with real-time progress
3. Display live updates as responses are generated and rated
4. Access the completed experiment when finished

The streaming API provides real-time feedback during long-running experiments:
- Live progress tracking (responses generated, ratings completed)
- Per-epoch progress for multi-epoch experiments
- Incremental log messages
- Early error detection
- Better user experience for experiments with many test cases
"""

import asyncio

from dotenv import load_dotenv
from elluminate import AsyncClient
from elluminate.streaming import TaskStatus

load_dotenv(override=True)


async def main():
    # Use async context manager for proper resource management
    async with AsyncClient() as client:
        # Create a prompt template
        template = await client.create_prompt_template(
            name="Programming Concepts Streaming",
            messages="Explain {{concept}} in {{language}}, providing a clear code example.",
        )
        print(f"✓ Created template: {template.name}")

        # Create a collection with multiple test cases
        collection = await client.create_collection(name="Programming Concepts Streaming")
        await collection.aadd_many(
            variables=[
                {"concept": "recursion", "language": "Python"},
                {"concept": "closures", "language": "JavaScript"},
                {"concept": "async/await", "language": "Python"},
                {"concept": "generators", "language": "Python"},
                {"concept": "promises", "language": "JavaScript"},
            ]
        )
        print(f"✓ Created collection with {len(collection.variables)} test cases")

        # Stream the experiment with real-time progress
        print("\n=== Streaming Experiment ===")

        async for event in client.stream_experiment(
            name="Programming Concepts Analysis Streaming",
            prompt_template=template,
            collection=collection,
            criteria=[
                "Is the explanation clear and accurate?",
                "Is the code example correct and illustrative?",
            ],
            polling_interval=0.5,  # Poll every 0.5 seconds
        ):
            if event.status == TaskStatus.PENDING:
                print("⏳ Experiment queued, waiting to start...")

            elif event.status == TaskStatus.STARTED:
                if event.progress:
                    # Calculate and display progress
                    percent = event.progress.percent_complete
                    completed = event.progress.responses_generated
                    rated = event.progress.responses_rated
                    total = event.progress.total_responses

                    # Progress bar
                    bar_length = 40
                    filled = int(bar_length * percent / 100)
                    bar = "█" * filled + "░" * (bar_length - filled)

                    print(
                        f"\r🔄 [{bar}] {percent:.1f}% | Generated: {completed}/{total} | Rated: {rated}/{total}",
                        end="",
                        flush=True,
                    )

                # Show incremental logs if available
                if event.logs_delta:
                    print(f"\n📝 Log: {event.logs_delta}")

            elif event.status == TaskStatus.SUCCESS:
                print("\n✅ Experiment completed successfully!")

                # Access the final experiment
                experiment = event.result
                if experiment:
                    print(f"\n=== Results for '{experiment.name}' ===")
                    print(f"Total responses: {len(list(experiment.responses()))}")

                    # Show mean ratings
                    if experiment.results:
                        print(f"Mean rating: {experiment.results.mean_all_ratings}")

                    # Show first few responses
                    print("\nSample responses:")
                    for i, response in enumerate(list(experiment.responses())[:2], 1):
                        print(f"\n{i}. Input: {response.template_variables}")
                        print(f"   Output: {response.response_text[:150]}...")
                        if response.ratings:
                            print("   Ratings:")
                            for rating in response.ratings:
                                print(f"     - {rating.criterion.criterion_str}: {rating.rating}")

            elif event.status == TaskStatus.FAILURE:
                print(f"\n❌ Experiment failed: {event.error_msg}")
                break

            elif event.status == TaskStatus.TIMEOUT:
                print(f"\n⏱️  Experiment timed out: {event.error_msg}")
                break

            elif event.status in {TaskStatus.REVOKED, TaskStatus.REJECTED}:
                print(f"\n⚠️  Experiment {event.status.lower()}")
                break


if __name__ == "__main__":
    asyncio.run(main())

Streaming vs. Blockierend

Funktion run_experiment() stream_experiment()
Fortschrittssichtbarkeit ❌ Keine ✅ Echtzeit
Früherkennung von Fehlern ❌ Nach Timeout ✅ Sofort
UX für lange Operationen ❌ Lade-Spinner ✅ Fortschrittsbalken
API-Aufrufe ✅ Einzeln ✅ SSE-Stream
Anwendungsfall Schnelle Experimente Langwierig, viele Testfälle

Wann Streaming verwenden:

  • Experimente mit vielen Testfällen (>10)
  • Langsame Modelle (dauert >10 Sekunden)
  • Benutzer-orientierte Anwendungen (benötigen Fortschritts-UI)
  • Debugging (möchten Logs in Echtzeit sehen)

Wann Blockierend verwenden:

  • Schnelle Experimente (<10 Testfälle)
  • Hintergrundverarbeitung (kein Fortschritt benötigt)
  • Einfache Skripte

Best Practices

  1. Immer Context Manager verwenden: Stellt ordnungsgemäße Bereinigung der Ressourcen sicher
  2. asyncio.gather() für Gleichzeitigkeit verwenden: Nicht in einer Schleife awaiten
  3. Gleichzeitige Operationen begrenzen: Semaphoren für teure Operationen verwenden
  4. Client-Instanzen wiederverwenden: Ein Client pro Anwendung, nicht pro Anfrage
  5. Ausnahmen ordnungsgemäß behandeln: Async-Ausnahmen funktionieren genauso wie Sync
  6. Streaming für lange Operationen verwenden: Bessere UX mit Echtzeit-Fortschritt

Nächste Schritte

  • Erfahren Sie mehr über Batch Operations für effiziente Verarbeitung
  • Entdecken Sie Experiments für Evaluierungs-Workflows
  • Sehen Sie sich Collections für die Verwaltung von Testfällen an