Asynchrone Operationen¶
Optimieren Sie die Leistung mit asynchronen Operationen für umfangreiche Prompt-Evaluierungen und gleichzeitige Verarbeitung
Das elluminate SDK bietet einen vollständigen AsyncClient für asynchrone Operationen. Dies ist besonders nützlich bei:
- Gleichzeitiger Ausführung mehrerer Experimente - Mehrere Experimente parallel statt nacheinander ausführen
- Integration mit Async-Frameworks - Verwendung mit FastAPI, aiohttp oder anderen Async-Web-Frameworks
- Umfangreiche Batch-Verarbeitung - Hunderte oder Tausende von Operationen mit besserer Ressourcennutzung verarbeiten
- Non-Blocking-Operationen - Ihre Anwendung bleibt während langwieriger Evaluierungen reaktionsfähig
AsyncClient vs Client¶
Das SDK bietet zwei Client-Klassen mit identischen APIs:
Client- Synchrone Operationen (blockierendes I/O)AsyncClient- Asynchrone Operationen (nicht-blockierendes I/O)
Alle öffentlichen Methoden haben die gleiche Signatur; der einzige Unterschied ist, dass AsyncClient-Methoden mit await verwendet werden müssen:
from elluminate import Client, AsyncClient
# Synchron
client = Client()
template = client.create_prompt_template(name="Test", template="...")
# Asynchron (gleiche Signatur, nur await hinzufügen)
async with AsyncClient() as client:
template = await client.create_prompt_template(name="Test", template="...")
Grundlegende Verwendung¶
Context Manager (Empfohlen)¶
Verwenden Sie immer den Async-Context-Manager, um eine ordnungsgemäße Ressourcenbereinigung sicherzustellen:
import asyncio
from elluminate import AsyncClient
async def main():
async with AsyncClient() as client:
# Ressourcen erstellen
template = await client.create_prompt_template(
name="Meine Vorlage",
template="Erkläre {{concept}} in einfachen Worten.",
)
# Experiment ausführen
experiment = await client.run_experiment(
name="Mein Experiment",
prompt_template=template,
collection=collection,
)
print(f"Abgeschlossen: {experiment.name}")
# Async-Funktion ausführen
asyncio.run(main())
Manuelle Ressourcenverwaltung¶
Falls Sie manuelle Kontrolle über den Client-Lebenszyklus benötigen:
from elluminate import AsyncClient
async def main():
client = AsyncClient()
try:
template = await client.create_prompt_template(...)
finally:
await client.close() # Wichtig: Ressourcen bereinigen
asyncio.run(main())
Vollständiges Beispiel¶
Dieses Beispiel demonstriert den vollständigen Async-Workflow:
- Verwenden Sie den Async-Context-Manager für ordnungsgemäße Ressourcenverwaltung
- Alle Client-Methoden sind asynchron - verwenden Sie
await - Rich-Model-Methoden (auf Schema-Objekten) verwenden das
a-Präfix:agenerate_criteria(),aadd_many() - Greifen Sie auf Ergebnisse auf die gleiche Weise wie bei synchronem Code zu
Gleichzeitige Ausführung¶
Die wahre Stärke von AsyncClient kommt von der gleichzeitigen Ausführung mehrerer Operationen mit asyncio.gather():
- Gemeinsame Ressourcen einrichten (Template, Kriterien, Collection)
- Mehrere Experimente gleichzeitig mit
asyncio.gather()ausführen - deutlich schneller als sequentielle Ausführung - Ergebnisse über alle Experimente vergleichen
Leistungsvergleich¶
Für 3 Experimente mit jeweils 5 Testfällen:
- Sequenziell (sync): ~45 Sekunden (3 Experimente × 15 Sekunden)
- Gleichzeitig (async): ~15 Sekunden (alle 3 laufen parallel)
3x schneller mit gleichzeitiger Ausführung!
Async-Methoden-Referenz¶
AsyncClient Öffentliche Methoden¶
AsyncClient öffentliche Methoden verwenden KEIN a-Präfix:
async with AsyncClient() as client:
# Prompt-Templates
await client.create_prompt_template(...)
await client.get_prompt_template(...)
await client.get_or_create_prompt_template(...)
await client.list_prompt_templates()
await client.delete_prompt_template(...)
# Collections
await client.create_collection(...)
await client.get_collection(...)
await client.get_or_create_collection(...)
await client.list_collections()
await client.delete_collection(...)
# Experimente
await client.create_experiment(...)
await client.run_experiment(...) # Erstellt und führt aus
await client.get_experiment(...)
await client.list_experiments()
await client.delete_experiment(...)
# Criterion Sets
await client.create_criterion_set(...)
await client.get_criterion_set(...)
await client.get_or_create_criterion_set(...)
await client.list_criterion_sets()
# LLM Configs
await client.create_llm_config(...)
await client.get_llm_config(...)
await client.get_or_create_llm_config(...)
await client.list_llm_configs()
# Und mehr...
Rich-Model Async-Methoden¶
Rich-Model-Methoden (auf Schema-Objekten) verwenden DAS a-Präfix:
# PromptTemplate
template = await client.get_prompt_template(name="...")
criteria = await template.agenerate_criteria()
criteria, generated = await template.aget_or_generate_criteria()
all_criteria = await template.alist_criteria()
new_template = await template.anew_version(template="...")
# TemplateVariablesCollection
collection = await client.get_collection(name="...")
await collection.aadd_many(variables=[...])
await collection.aclear()
await collection.agenerate_variables(prompt_template)
# CriterionSet
criterion_set = await client.get_criterion_set(name="...")
await criterion_set.aadd_criterion(criterion="...")
await criterion_set.aadd_criteria(criteria=[...])
await criterion_set.aclear()
await criterion_set.alink_template(template)
await criterion_set.aunlink_template(template)
# Experiment
experiment = await client.get_experiment(name="...")
await experiment.arun()
await experiment.afetch_responses()
await experiment.aadd_responses(responses=[...], template_variables=[...])
await experiment.arate_responses()
new_exp = await experiment.aclone(name="...")
Häufige Muster¶
Muster 1: Gleichzeitige A/B-Tests¶
Mehrere Prompt-Varianten gleichzeitig ausführen:
async def ab_test():
async with AsyncClient() as client:
# Gemeinsame Ressourcen einrichten
criterion_set = await client.create_criterion_set(name="Qualität")
await criterion_set.aadd_criteria([
"Ist die Antwort hilfreich?",
"Ist sie genau?",
])
collection = await client.create_collection(name="Testfälle")
await collection.aadd_many(variables=[
{"topic": "KI"},
{"topic": "ML"},
])
# Templates erstellen
template_a = await client.create_prompt_template(
name="Stil A",
template="Erkläre {{topic}} kurz.",
)
template_b = await client.create_prompt_template(
name="Stil B",
template="Erkläre {{topic}} mit Beispielen.",
)
# Beide Experimente gleichzeitig ausführen
exp_a, exp_b = await asyncio.gather(
client.run_experiment(
name="Test A",
prompt_template=template_a,
collection=collection,
criterion_set=criterion_set,
),
client.run_experiment(
name="Test B",
prompt_template=template_b,
collection=collection,
criterion_set=criterion_set,
),
)
# Ergebnisse vergleichen
print(f"A: {exp_a.results.mean_all_ratings.yes:.1%}")
print(f"B: {exp_b.results.mean_all_ratings.yes:.1%}")
asyncio.run(ab_test())
Muster 2: Batch-Verarbeitung von Testfällen¶
Mehrere Testfall-Collections parallel verarbeiten:
async def batch_process():
async with AsyncClient() as client:
template = await client.get_prompt_template(name="Meine Vorlage")
criterion_set = await client.get_criterion_set(name="Meine Kriterien")
# Mehrere Collections abrufen
collections = await asyncio.gather(
client.get_collection(name="Collection 1"),
client.get_collection(name="Collection 2"),
client.get_collection(name="Collection 3"),
)
# Experimente auf allen Collections gleichzeitig ausführen
experiments = await asyncio.gather(*[
client.run_experiment(
name=f"Experiment {i+1}",
prompt_template=template,
collection=coll,
criterion_set=criterion_set,
)
for i, coll in enumerate(collections)
])
return experiments
results = asyncio.run(batch_process())
Muster 3: FastAPI-Integration¶
AsyncClient in einem FastAPI-Endpoint verwenden:
from fastapi import FastAPI
from elluminate import AsyncClient
app = FastAPI()
# Eine einzelne AsyncClient-Instanz für die Anwendung erstellen
async_client = AsyncClient()
@app.on_event("startup")
async def startup():
global async_client
async_client = AsyncClient()
@app.on_event("shutdown")
async def shutdown():
await async_client.close()
@app.post("/evaluate")
async def evaluate_prompt(prompt: str, test_case: dict):
# Template abrufen oder erstellen
template, _ = await async_client.get_or_create_prompt_template(
name="API Template",
template=prompt,
)
# Kriterien generieren
criteria, _ = await template.aget_or_generate_criteria()
criterion_set = await async_client.get_criterion_set(name=template.name)
# Collection mit einzelnem Testfall erstellen
collection = await async_client.create_collection(name=f"Test {timestamp}")
await collection.aadd_many(variables=[test_case])
# Experiment ausführen
experiment = await async_client.run_experiment(
name=f"Eval {timestamp}",
prompt_template=template,
collection=collection,
criterion_set=criterion_set,
)
return {
"experiment_id": experiment.id,
"pass_rate": experiment.results.mean_all_ratings.yes if experiment.results else 0,
}
Jupyter Notebook Unterstützung¶
AsyncClient funktioniert in Jupyter Notebooks mit nest_asyncio:
# Im Notebook installieren
!pip install elluminate nest-asyncio
# Verschachtelte Event-Loops aktivieren
import nest_asyncio
nest_asyncio.apply()
# Jetzt können Sie await in Zellen verwenden
from elluminate import AsyncClient
async with AsyncClient() as client:
template = await client.create_prompt_template(...)
experiment = await client.run_experiment(...)
Performance-Tipps¶
1. asyncio.gather() für unabhängige Operationen verwenden¶
Wenn Operationen nicht voneinander abhängen, führen Sie sie gleichzeitig aus:
# GUT: Gleichzeitige Ausführung
template, collection, criterion_set = await asyncio.gather(
client.get_prompt_template(name="..."),
client.get_collection(name="..."),
client.get_criterion_set(name="..."),
)
# SCHLECHT: Sequenzielle Ausführung (3x langsamer)
template = await client.get_prompt_template(name="...")
collection = await client.get_collection(name="...")
criterion_set = await client.get_criterion_set(name="...")
2. Gleichzeitigkeit für ressourcenintensive Operationen begrenzen¶
Semaphoren verwenden, um gleichzeitige teure Operationen zu begrenzen:
import asyncio
async def run_with_limit(semaphore, client, name, template, collection):
async with semaphore:
return await client.run_experiment(
name=name,
prompt_template=template,
collection=collection,
)
async def main():
async with AsyncClient() as client:
# Auf 5 gleichzeitige Experimente begrenzen
semaphore = asyncio.Semaphore(5)
tasks = [
run_with_limit(semaphore, client, f"Exp {i}", template, collection)
for i in range(20)
]
experiments = await asyncio.gather(*tasks)
asyncio.run(main())
3. Client-Instanzen wiederverwenden¶
Erstellen Sie einen AsyncClient pro Anwendung, nicht pro Anfrage:
# GUT: Einzelne Client-Instanz
class App:
def __init__(self):
self.client = AsyncClient()
async def process(self):
return await self.client.run_experiment(...)
async def cleanup(self):
await self.client.close()
# SCHLECHT: Neuer Client pro Operation (Verbindungs-Overhead)
async def process():
async with AsyncClient() as client: # Erstellt neue Verbindung
return await client.run_experiment(...)
Migration von Sync zu Async¶
Die Konvertierung von synchronem zu asynchronem Code ist einfach:
Vorher (Synchron)¶
from elluminate import Client
client = Client()
template = client.create_prompt_template(name="...", template="...")
collection = client.create_collection(name="...")
collection.add_many(variables=[...])
experiment = client.run_experiment(
name="...",
prompt_template=template,
collection=collection,
)
Nachher (Asynchron)¶
from elluminate import AsyncClient
import asyncio
async def main():
async with AsyncClient() as client:
template = await client.create_prompt_template(name="...", template="...")
collection = await client.create_collection(name="...")
await collection.aadd_many(variables=[...]) # Hinweis: aadd_many mit 'a'-Präfix
experiment = await client.run_experiment(
name="...",
prompt_template=template,
collection=collection,
)
asyncio.run(main())
Wichtige Änderungen:
1. AsyncClient statt Client importieren
2. Code in async def main() verpacken
3. async with AsyncClient() Context Manager verwenden
4. await vor allen Client-Methoden hinzufügen
5. Rich-Model-Methoden erhalten a-Präfix: add_many → aadd_many
6. Mit asyncio.run(main()) ausführen
Fehlerbehandlung¶
Fehlerbehandlung funktioniert genauso wie bei synchronem Code:
from elluminate.exceptions import ConflictError, NotFoundError
async def handle_errors():
async with AsyncClient() as client:
try:
template = await client.create_prompt_template(name="Existierend", ...)
except ConflictError:
template = await client.get_prompt_template(name="Existierend")
try:
experiment = await client.get_experiment(name="NichtExistent")
except NotFoundError:
print("Experiment nicht gefunden")
Echtzeit-Streaming¶
AsyncClient unterstützt Server-Sent Events (SSE) Streaming für Echtzeit-Fortschrittsaktualisierungen während langwieriger Operationen. Dies ist besonders nützlich für:
- Experiment-Ausführung - Beobachten Sie die Generierung und Bewertung von Antworten in Echtzeit
- Batch-Operationen - Verfolgen Sie den Status von Batch-Bewertungsoperationen
- Bessere UX - Zeigen Sie Live-Fortschritt statt blockierender Spinner
- Früherkennung von Fehlern - Sehen Sie Fehler sofort, nicht erst nach einem Timeout
Streaming-Experiment-Ausführung¶
Streamen Sie Echtzeit-Fortschritt während der Experiment-Ausführung mit stream_experiment():
from elluminate import AsyncClient
from elluminate.streaming import TaskStatus
async with AsyncClient() as client:
async for event in client.stream_experiment(
name="Mein Experiment",
prompt_template=template,
collection=collection,
criteria=["Ist es genau?", "Ist es hilfreich?"],
polling_interval=0.5, # Alle 0,5 Sekunden abfragen
):
if event.status == TaskStatus.STARTED:
# Live-Fortschritt anzeigen
if event.progress:
percent = event.progress.percent_complete
print(f"Fortschritt: {percent:.1f}%")
print(f"Generiert: {event.progress.responses_generated}/{event.progress.total_responses}")
print(f"Bewertet: {event.progress.responses_rated}/{event.progress.total_responses}")
# Inkrementelle Logs anzeigen
if event.logs_delta:
print(f"Log: {event.logs_delta}")
elif event.status == TaskStatus.SUCCESS:
print("✅ Abgeschlossen!")
experiment = event.result # Finales Experiment mit Antworten
elif event.is_failure:
print(f"❌ Fehlgeschlagen: {event.error_msg}")
break
Hauptfunktionen:
- Echtzeit-Fortschritt: Sehen Sie generierte und bewertete Antworten sowie Fertigstellungsgrad
- Inkrementelle Logs: Erhalten Sie Log-Nachrichten während sie auftreten
- Frühzeitiger Abbruch: Stoppen Sie sofort bei Fehlern
- Endergebnis: Greifen Sie bei SUCCESS auf das abgeschlossene Experiment zu
Terminal-Zustände¶
Streaming-Operationen können in verschiedenen Terminal-Zuständen enden. Behandeln Sie immer alle möglichen Ausgänge:
| Status | Bedeutung | Wann es auftritt | Benutzeraktion |
|---|---|---|---|
SUCCESS |
Erfolgreich abgeschlossen | Alle Antworten generiert und bewertet | Ergebnisse aus event.result verarbeiten |
FAILURE |
Operation fehlgeschlagen | LLM-Fehler, Validierungsfehler, Systemfehler | event.error_msg prüfen, Problem beheben und erneut versuchen |
TIMEOUT |
Zeitlimit überschritten | Operation läuft länger als 10 Stunden | Kleinere Batches erwägen oder Support kontaktieren |
REVOKED |
Task abgebrochen | Manuelles Abbrechen oder System-Herunterfahren | Bei Bedarf erneut ausführen |
REJECTED |
Task vor Start abgelehnt | Queue voll oder ungültige Konfiguration | Eingaben prüfen und erneut versuchen |
Beispiel für die Behandlung aller Terminal-Zustände:
async for event in client.stream_experiment(...):
if event.status == TaskStatus.STARTED:
# Fortschrittsaktualisierungen behandeln
pass
elif event.status == TaskStatus.SUCCESS:
print("✅ Abgeschlossen!")
experiment = event.result
break
elif event.status == TaskStatus.FAILURE:
print(f"❌ Fehlgeschlagen: {event.error_msg}")
break
elif event.status == TaskStatus.TIMEOUT:
print(f"⏱️ Timeout nach 10 Stunden: {event.error_msg}")
break
elif event.status in {TaskStatus.REVOKED, TaskStatus.REJECTED}:
print(f"⚠️ Operation {event.status.lower()}: {event.error_msg}")
break
Prüfen Sie die is_complete Eigenschaft
Alle Terminal-Zustände haben event.is_complete == True. Verwenden Sie dies, um zu erkennen, wann das Streaming endet:
Streaming-Batch-Bewertung¶
Streamen Sie Statusaktualisierungen für Batch-Bewertungsoperationen mit stream_batch_rate():
async with AsyncClient() as client:
# Antworten zum Bewerten abrufen
responses = list(experiment.responses())
async for event in client.stream_batch_rate(
prompt_responses=responses,
rating_mode=RatingMode.DETAILED,
):
if event.status == TaskStatus.STARTED:
print("Bewertung läuft...")
elif event.status == TaskStatus.SUCCESS:
ratings = event.result # List[List[Rating]]
print(f"{len(ratings)} Antworten bewertet")
elif event.is_failure:
print(f"Fehlgeschlagen: {event.error_msg}")
break
Batch-Bewertungs-Streaming-Verhalten
Verfügbar: Alle Terminal-Zustände (SUCCESS, FAILURE, TIMEOUT, REVOKED, REJECTED) mit error_msg falls zutreffend.
Nicht verfügbar: Fortschrittsmetriken (event.progress ist immer None). Das Backend verfolgt nicht, wie viele Antworten während Batch-Operationen bewertet wurden.
Warum: Batch-Bewertung ist typischerweise schnell (1-2 Sekunden pro Antwort), daher wurde Fortschrittsverfolgung als unnötig erachtet. Dies kann sich in zukünftigen Versionen ändern.
Vollständiges Streaming-Beispiel¶
Siehe ein vollständiges Beispiel mit Fortschrittsbalken und Fehlerbehandlung:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | |
Streaming vs. Blockierend¶
| Funktion | run_experiment() |
stream_experiment() |
|---|---|---|
| Fortschrittssichtbarkeit | ❌ Keine | ✅ Echtzeit |
| Früherkennung von Fehlern | ❌ Nach Timeout | ✅ Sofort |
| UX für lange Operationen | ❌ Lade-Spinner | ✅ Fortschrittsbalken |
| API-Aufrufe | ✅ Einzeln | ✅ SSE-Stream |
| Anwendungsfall | Schnelle Experimente | Langwierig, viele Testfälle |
Wann Streaming verwenden:
- Experimente mit vielen Testfällen (>10)
- Langsame Modelle (dauert >10 Sekunden)
- Benutzer-orientierte Anwendungen (benötigen Fortschritts-UI)
- Debugging (möchten Logs in Echtzeit sehen)
Wann Blockierend verwenden:
- Schnelle Experimente (<10 Testfälle)
- Hintergrundverarbeitung (kein Fortschritt benötigt)
- Einfache Skripte
Best Practices¶
- Immer Context Manager verwenden: Stellt ordnungsgemäße Bereinigung der Ressourcen sicher
- asyncio.gather() für Gleichzeitigkeit verwenden: Nicht in einer Schleife awaiten
- Gleichzeitige Operationen begrenzen: Semaphoren für teure Operationen verwenden
- Client-Instanzen wiederverwenden: Ein Client pro Anwendung, nicht pro Anfrage
- Ausnahmen ordnungsgemäß behandeln: Async-Ausnahmen funktionieren genauso wie Sync
- Streaming für lange Operationen verwenden: Bessere UX mit Echtzeit-Fortschritt
Nächste Schritte¶
- Erfahren Sie mehr über Batch Operations für effiziente Verarbeitung
- Entdecken Sie Experiments für Evaluierungs-Workflows
- Sehen Sie sich Collections für die Verwaltung von Testfällen an