Skip to main content

Error Handling

The SDK maps every failure to a typed exception so you can handle errors precisely.

Exception hierarchyโ€‹

FerroError                        # Base โ€” catch-all for all SDK errors
โ”œโ”€โ”€ FerroAPIError # Any non-2xx HTTP response
โ”‚ โ”œโ”€โ”€ FerroAuthError # 401 Unauthorized
โ”‚ โ”œโ”€โ”€ FerroRateLimitError # 429 Too Many Requests
โ”‚ โ”œโ”€โ”€ FerroNotFoundError # 404 Not Found
โ”‚ โ””โ”€โ”€ FerroServerError # 5xx Server Error
โ”œโ”€โ”€ FerroConnectionError # Network / timeout (retried automatically)
โ””โ”€โ”€ FerroStreamError # SSE parse failure during streaming

Basic error handlingโ€‹

from ferrolabsai import FerroClient
from ferrolabsai import (
FerroError,
FerroAPIError,
FerroAuthError,
FerroRateLimitError,
FerroNotFoundError,
FerroServerError,
FerroConnectionError,
FerroStreamError,
)

client = FerroClient()

try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
except FerroAuthError:
print("Invalid or missing API key")
except FerroRateLimitError:
print("Rate limit exceeded โ€” back off and retry")
except FerroNotFoundError:
print("Model or endpoint not found")
except FerroServerError as e:
print(f"Server error ({e.status_code}): {e.message}")
except FerroConnectionError:
print("Network error โ€” retries exhausted")
except FerroStreamError:
print("Failed to parse streaming response")
except FerroAPIError as e:
print(f"API error {e.status_code}: {e.message}")
except FerroError as e:
print(f"Unexpected SDK error: {e}")

FerroAPIError fieldsโ€‹

Every FerroAPIError carries structured context:

FieldTypeDescription
status_codeintHTTP status code
messagestrHuman-readable error message
codestr or NoneMachine-readable error code from the gateway
request_idstr or NoneGateway request ID for debugging
except FerroAPIError as e:
print(f"Status: {e.status_code}")
print(f"Message: {e.message}")
print(f"Code: {e.code}")
print(f"Request ID: {e.request_id}")

Automatic retriesโ€‹

Connection errors (httpx.ConnectError) and timeouts (httpx.TimeoutException) are retried automatically.

SettingDefaultDescription
max_retries2Number of retry attempts
Retried errorsConnectError, TimeoutExceptionNetwork-level failures only
NOT retried4xx, 5xx HTTP errorsAPI errors propagate immediately
# Increase retries for unreliable networks
client = FerroClient(max_retries=5, timeout=60.0)
warning

HTTP errors (4xx, 5xx) are never retried. The gateway already handles provider-level retries via its own retry config โ€” the SDK trusts the gateway's response as final.

Streaming error handlingโ€‹

Streaming errors surface as FerroStreamError when an SSE chunk cannot be parsed:

try:
for chunk in client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
stream=True,
):
print(chunk.choices[0].delta.content or "", end="")
except FerroStreamError:
print("\nStream interrupted โ€” partial response received")
except FerroConnectionError:
print("\nConnection lost during streaming")

Production patternsโ€‹

Graceful degradationโ€‹

def get_completion(prompt: str) -> str:
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
return response.content
except FerroRateLimitError:
return "Service is busy. Please try again shortly."
except FerroServerError:
return "Service temporarily unavailable."
except FerroConnectionError:
return "Cannot reach the AI gateway."

Logging with trace IDsโ€‹

import logging

logger = logging.getLogger(__name__)

try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
logger.info("OK", extra={"trace_id": response.trace_id, "provider": response.provider})
except FerroAPIError as e:
logger.error("API error", extra={"status": e.status_code, "request_id": e.request_id})
raise

Next stepsโ€‹