Python SDK Reference
Complete API reference for the ferrolabsai Python SDK.
FerroClientโ
The synchronous client for interacting with the Ferro AI Gateway. AsyncFerroClient has an identical interface but returns coroutines.
from ferrolabsai import FerroClient
client = FerroClient(
api_key="sk-ferro-...",
base_url="https://gateway.example.com",
)
Constructor Parametersโ
| Parameter | Type | Default | Description |
|---|---|---|---|
api_key | str | None | API key. Falls back to FERRO_API_KEY then OPENAI_API_KEY env vars |
base_url | str | "http://localhost:8080" | Gateway URL. Falls back to FERRO_BASE_URL env var |
timeout | float | 120.0 | HTTP timeout in seconds |
max_retries | int | 2 | Retry count for connection errors and timeouts |
default_headers | dict | None | Extra headers merged into every request |
http_client | httpx.Client | None | Bring your own httpx client |
client.chat.completions.create()โ
Create a chat completion. Supports both blocking and streaming modes.
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.content)
Parametersโ
| Parameter | Type | Required | Description |
|---|---|---|---|
model | str | Yes | Model name (e.g., "gpt-4o", "claude-3-5-sonnet-20241022") |
messages | list[dict] | Yes | Message list with "role" and "content" |
stream | bool | No | If True, returns Iterator[ChatCompletionChunk] |
temperature | float | No | Sampling temperature (0.0--2.0) |
max_tokens | int | No | Maximum tokens in response |
top_p | float | No | Nucleus sampling parameter |
frequency_penalty | float | No | Frequency penalty (-2.0--2.0) |
presence_penalty | float | No | Presence penalty (-2.0--2.0) |
stop | str | list[str] | No | Stop sequences |
tools | list[dict] | No | Tool/function definitions |
tool_choice | Any | No | Tool selection strategy |
user | str | No | End-user identifier |
template_id | str | No | Ferro-specific: server-side prompt template ID |
template_variables | dict | No | Ferro-specific: variables for template |
route_tag | str | No | Ferro-specific: override routing strategy |
Returnsโ
- Non-streaming:
ChatCompletion - Streaming:
Iterator[ChatCompletionChunk]
Streaming Exampleโ
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
Response Typesโ
ChatCompletionโ
@dataclass
class ChatCompletion:
id: str
object: str
created: int
model: str
choices: list[Choice]
usage: Usage | None
trace_id: str | None # Ferro-specific
provider: str | None # Ferro-specific
latency_ms: int | None # Ferro-specific
@property
def content(self) -> str | None:
"""Shortcut to choices[0].message.content"""
Choiceโ
@dataclass
class Choice:
index: int
message: ChatMessage
finish_reason: str | None
ChatMessageโ
@dataclass
class ChatMessage:
role: str
content: str | None
tool_calls: list[dict] | None
Usageโ
@dataclass
class Usage:
prompt_tokens: int
completion_tokens: int
total_tokens: int
cost_usd: float | None # Ferro-specific
cache_hit: bool | None # Ferro-specific
provider: str | None # Ferro-specific
ChatCompletionChunkโ
@dataclass
class ChatCompletionChunk:
id: str
object: str
created: int
model: str
choices: list[StreamChoice]
StreamChoiceโ
@dataclass
class StreamChoice:
index: int
delta: StreamDelta
finish_reason: str | None
StreamDeltaโ
@dataclass
class StreamDelta:
role: str | None
content: str | None
tool_calls: list[dict] | None
client.embeddings.create()โ
Generate embeddings for text input.
response = client.embeddings.create(
model="text-embedding-3-small",
input="The food was delicious",
)
print(response.data[0].embedding[:5])
Parametersโ
| Parameter | Type | Required | Description |
|---|---|---|---|
model | str | Yes | Embedding model name |
input | str | list[str] | Yes | Text to embed (single string or batch) |
encoding_format | str | No | Output encoding format (e.g., "float", "base64") |
dimensions | int | No | Desired embedding dimensions (model-dependent) |
user | str | No | End-user identifier |
Returnsโ
EmbeddingResponse
@dataclass
class EmbeddingResponse:
object: str
data: list[EmbeddingData]
model: str
usage: Usage | None
@dataclass
class EmbeddingData:
object: str
embedding: list[float]
index: int
client.images.generate()โ
Generate images from a text prompt.
response = client.images.generate(
model="dall-e-3",
prompt="A sunset over a mountain lake",
size="1024x1024",
)
print(response.data[0].url)
Parametersโ
| Parameter | Type | Required | Description |
|---|---|---|---|
model | str | Yes | Image generation model name |
prompt | str | Yes | Text description of the desired image |
n | int | No | Number of images to generate (1--10) |
size | str | No | Image size (e.g., "1024x1024", "512x512") |
quality | str | No | Image quality (e.g., "standard", "hd") |
response_format | str | No | Response format ("url" or "b64_json") |
style | str | No | Image style (e.g., "natural", "vivid") |
user | str | No | End-user identifier |
Returnsโ
ImageResponse
@dataclass
class ImageResponse:
created: int
data: list[ImageData]
@dataclass
class ImageData:
url: str | None
b64_json: str | None
revised_prompt: str | None
client.modelsโ
Query available models on the gateway.
models.list()โ
models = client.models.list(provider="openai", capability="chat")
| Parameter | Type | Required | Description |
|---|---|---|---|
provider | str | No | Filter by provider name |
capability | str | No | Filter by capability (e.g., "chat", "embeddings") |
Returns: list[ModelInfo]
models.retrieve()โ
model = client.models.retrieve("gpt-4o")
| Parameter | Type | Required | Description |
|---|---|---|---|
model_id | str | Yes | Model identifier |
Returns: ModelInfo
models.search()โ
results = client.models.search("claude")
| Parameter | Type | Required | Description |
|---|---|---|---|
query | str | Yes | Search query string |
Returns: list[ModelInfo]
ModelInfoโ
@dataclass
class ModelInfo:
id: str
object: str
provider: str
context_window: int
max_output_tokens: int
input_cost_per_token: float
output_cost_per_token: float
capabilities: list[str]
status: str
client.adminโ
Administrative API for managing keys, configuration, logs, providers, and plugins.
Admin endpoints require an API key with admin-level scopes.
admin.keysโ
Manage API keys for the gateway.
admin.keys.list()โ
keys = client.admin.keys.list()
Returns: list[APIKey]
admin.keys.retrieve()โ
key = client.admin.keys.retrieve("key_abc123")
| Parameter | Type | Required | Description |
|---|---|---|---|
key_id | str | Yes | API key identifier |
Returns: APIKey
admin.keys.create()โ
new_key = client.admin.keys.create(
name="production-backend",
scopes=["chat", "embeddings"],
expires_at="2025-12-31T23:59:59Z",
)
print(new_key.key) # Only returned on creation
| Parameter | Type | Required | Description |
|---|---|---|---|
name | str | Yes | Human-readable key name |
scopes | list[str] | No | Permission scopes |
expires_at | str | No | ISO 8601 expiration timestamp |
Returns: CreatedAPIKey (includes the plaintext key field)
admin.keys.update()โ
client.admin.keys.update("key_abc123", name="renamed-key", active=False)
| Parameter | Type | Required | Description |
|---|---|---|---|
key_id | str | Yes | API key identifier |
name | str | No | Updated name |
scopes | list[str] | No | Updated scopes |
expires_at | str | No | Updated expiration |
active | bool | No | Enable or disable the key |
Returns: APIKey
admin.keys.delete()โ
client.admin.keys.delete("key_abc123")
| Parameter | Type | Required | Description |
|---|---|---|---|
key_id | str | Yes | API key identifier |
Returns: None
admin.keys.revoke()โ
client.admin.keys.revoke("key_abc123")
| Parameter | Type | Required | Description |
|---|---|---|---|
key_id | str | Yes | API key identifier |
Returns: None
admin.keys.rotate()โ
rotated = client.admin.keys.rotate("key_abc123")
print(rotated.key) # New plaintext key
| Parameter | Type | Required | Description |
|---|---|---|---|
key_id | str | Yes | API key identifier |
Returns: CreatedAPIKey
admin.keys.usage()โ
usage = client.admin.keys.usage(limit=10, sort="usage", active=True)
| Parameter | Type | Required | Description |
|---|---|---|---|
limit | int | No | Number of results (default 20) |
offset | int | No | Pagination offset (default 0) |
sort | str | No | Sort field (default "usage") |
active | bool | No | Filter by active status |
since | str | No | ISO 8601 start timestamp |
Returns: dict
admin.configโ
Manage gateway configuration.
admin.config.get()โ
config = client.admin.config.get()
Returns: GatewayConfig
admin.config.create()โ
result = client.admin.config.create(config={"routing": {"strategy": "cost"}})
| Parameter | Type | Required | Description |
|---|---|---|---|
config | dict | Yes | Configuration object |
Returns: dict
admin.config.update()โ
result = client.admin.config.update(config={"routing": {"strategy": "latency"}})
| Parameter | Type | Required | Description |
|---|---|---|---|
config | dict | Yes | Partial configuration update |
Returns: dict
admin.config.delete()โ
result = client.admin.config.delete()
Returns: dict
admin.config.history()โ
history = client.admin.config.history()
Returns: list[ConfigHistoryEntry]
admin.config.rollback()โ
result = client.admin.config.rollback(version=3)
| Parameter | Type | Required | Description |
|---|---|---|---|
version | int | Yes | Configuration version to roll back to |
Returns: dict
admin.logsโ
Query request logs and statistics.
admin.logs.list()โ
logs = client.admin.logs.list(limit=50, provider="openai", model="gpt-4o")
| Parameter | Type | Required | Description |
|---|---|---|---|
limit | int | No | Number of results (default 50) |
offset | int | No | Pagination offset (default 0) |
stage | str | No | Filter by pipeline stage |
provider | str | No | Filter by provider name |
model | str | No | Filter by model name |
since | str | No | ISO 8601 start timestamp |
Returns: dict
admin.logs.stats()โ
stats = client.admin.logs.stats(since="2025-01-01T00:00:00Z")
| Parameter | Type | Required | Description |
|---|---|---|---|
limit | int | No | Limit on aggregation buckets |
since | str | No | ISO 8601 start timestamp |
Returns: dict
admin.logs.delete()โ
result = client.admin.logs.delete(before="2024-01-01T00:00:00Z")
| Parameter | Type | Required | Description |
|---|---|---|---|
before | str | No | Delete logs before this ISO 8601 timestamp |
stage | str | No | Delete only logs matching this stage |
Returns: dict
admin.providersโ
admin.providers.list()โ
providers = client.admin.providers.list()
Returns: list[dict]
admin.pluginsโ
admin.plugins.list()โ
plugins = client.admin.plugins.list()
Returns: list[dict]
Convenience Methodsโ
admin.dashboard()โ
Returns a summary of gateway health, key usage, and recent activity.
dashboard = client.admin.dashboard()
Returns: dict
admin.health()โ
Returns the gateway health status.
health = client.admin.health()
Returns: dict
AsyncFerroClientโ
AsyncFerroClient mirrors the FerroClient interface. All methods are async and return awaitables. Streaming returns AsyncIterator[ChatCompletionChunk].
from ferrolabsai import AsyncFerroClient
client = AsyncFerroClient(api_key="sk-ferro-...")
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.content)
Async Streamingโ
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
Error Handlingโ
The SDK raises typed exceptions for common failure modes.
| Exception | HTTP Status | Description |
|---|---|---|
AuthenticationError | 401 | Invalid or missing API key |
PermissionDeniedError | 403 | Insufficient scopes |
NotFoundError | 404 | Resource not found |
RateLimitError | 429 | Rate limit exceeded |
APIError | 5xx | Server-side error |
APIConnectionError | -- | Network connectivity failure |
APITimeoutError | -- | Request exceeded timeout |
from ferrolabsai import FerroClient, RateLimitError
client = FerroClient()
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")