Error Handling
The SDK maps every failure to a typed exception so you can handle errors precisely.
Exception hierarchyโ
FerroError # Base โ catch-all for all SDK errors
โโโ FerroAPIError # Any non-2xx HTTP response
โ โโโ FerroAuthError # 401 Unauthorized
โ โโโ FerroRateLimitError # 429 Too Many Requests
โ โโโ FerroNotFoundError # 404 Not Found
โ โโโ FerroServerError # 5xx Server Error
โโโ FerroConnectionError # Network / timeout (retried automatically)
โโโ FerroStreamError # SSE parse failure during streaming
Basic error handlingโ
from ferrolabsai import FerroClient
from ferrolabsai import (
FerroError,
FerroAPIError,
FerroAuthError,
FerroRateLimitError,
FerroNotFoundError,
FerroServerError,
FerroConnectionError,
FerroStreamError,
)
client = FerroClient()
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
except FerroAuthError:
print("Invalid or missing API key")
except FerroRateLimitError:
print("Rate limit exceeded โ back off and retry")
except FerroNotFoundError:
print("Model or endpoint not found")
except FerroServerError as e:
print(f"Server error ({e.status_code}): {e.message}")
except FerroConnectionError:
print("Network error โ retries exhausted")
except FerroStreamError:
print("Failed to parse streaming response")
except FerroAPIError as e:
print(f"API error {e.status_code}: {e.message}")
except FerroError as e:
print(f"Unexpected SDK error: {e}")
FerroAPIError fieldsโ
Every FerroAPIError carries structured context:
| Field | Type | Description |
|---|---|---|
| status_code | int | HTTP status code |
| message | str | Human-readable error message |
| code | str or None | Machine-readable error code from the gateway |
| request_id | str or None | Gateway request ID for debugging |
except FerroAPIError as e:
print(f"Status: {e.status_code}")
print(f"Message: {e.message}")
print(f"Code: {e.code}")
print(f"Request ID: {e.request_id}")
Automatic retriesโ
Connection errors (httpx.ConnectError) and timeouts (httpx.TimeoutException) are retried automatically.
| Setting | Default | Description |
|---|---|---|
| max_retries | 2 | Number of retry attempts |
| Retried errors | ConnectError, TimeoutException | Network-level failures only |
| NOT retried | 4xx, 5xx HTTP errors | API errors propagate immediately |
# Increase retries for unreliable networks
client = FerroClient(max_retries=5, timeout=60.0)
warning
HTTP errors (4xx, 5xx) are never retried. The gateway already handles provider-level retries via its own retry config โ the SDK trusts the gateway's response as final.
Streaming error handlingโ
Streaming errors surface as FerroStreamError when an SSE chunk cannot be parsed:
try:
for chunk in client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
stream=True,
):
print(chunk.choices[0].delta.content or "", end="")
except FerroStreamError:
print("\nStream interrupted โ partial response received")
except FerroConnectionError:
print("\nConnection lost during streaming")
Production patternsโ
Graceful degradationโ
def get_completion(prompt: str) -> str:
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
return response.content
except FerroRateLimitError:
return "Service is busy. Please try again shortly."
except FerroServerError:
return "Service temporarily unavailable."
except FerroConnectionError:
return "Cannot reach the AI gateway."
Logging with trace IDsโ
import logging
logger = logging.getLogger(__name__)
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
logger.info("OK", extra={"trace_id": response.trace_id, "provider": response.provider})
except FerroAPIError as e:
logger.error("API error", extra={"status": e.status_code, "request_id": e.request_id})
raise
Next stepsโ
- API Reference โ Full method signatures and response types
- Async Usage โ AsyncFerroClient for async/await workflows
- Troubleshooting โ Common gateway issues and fixes