Skip to main content

Python SDK Reference

Complete API reference for the ferrolabsai Python SDK.

FerroClientโ€‹

The synchronous client for interacting with the Ferro AI Gateway. AsyncFerroClient has an identical interface but returns coroutines.

from ferrolabsai import FerroClient

client = FerroClient(
api_key="sk-ferro-...",
base_url="https://gateway.example.com",
)

Constructor Parametersโ€‹

ParameterTypeDefaultDescription
api_keystrNoneAPI key. Falls back to FERRO_API_KEY then OPENAI_API_KEY env vars
base_urlstr"http://localhost:8080"Gateway URL. Falls back to FERRO_BASE_URL env var
timeoutfloat120.0HTTP timeout in seconds
max_retriesint2Retry count for connection errors and timeouts
default_headersdictNoneExtra headers merged into every request
http_clienthttpx.ClientNoneBring your own httpx client

client.chat.completions.create()โ€‹

Create a chat completion. Supports both blocking and streaming modes.

response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.content)

Parametersโ€‹

ParameterTypeRequiredDescription
modelstrYesModel name (e.g., "gpt-4o", "claude-3-5-sonnet-20241022")
messageslist[dict]YesMessage list with "role" and "content"
streamboolNoIf True, returns Iterator[ChatCompletionChunk]
temperaturefloatNoSampling temperature (0.0--2.0)
max_tokensintNoMaximum tokens in response
top_pfloatNoNucleus sampling parameter
frequency_penaltyfloatNoFrequency penalty (-2.0--2.0)
presence_penaltyfloatNoPresence penalty (-2.0--2.0)
stopstr | list[str]NoStop sequences
toolslist[dict]NoTool/function definitions
tool_choiceAnyNoTool selection strategy
userstrNoEnd-user identifier
template_idstrNoFerro-specific: server-side prompt template ID
template_variablesdictNoFerro-specific: variables for template
route_tagstrNoFerro-specific: override routing strategy

Returnsโ€‹

  • Non-streaming: ChatCompletion
  • Streaming: Iterator[ChatCompletionChunk]

Streaming Exampleโ€‹

stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True,
)

for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")

Response Typesโ€‹

ChatCompletionโ€‹

@dataclass
class ChatCompletion:
id: str
object: str
created: int
model: str
choices: list[Choice]
usage: Usage | None
trace_id: str | None # Ferro-specific
provider: str | None # Ferro-specific
latency_ms: int | None # Ferro-specific

@property
def content(self) -> str | None:
"""Shortcut to choices[0].message.content"""

Choiceโ€‹

@dataclass
class Choice:
index: int
message: ChatMessage
finish_reason: str | None

ChatMessageโ€‹

@dataclass
class ChatMessage:
role: str
content: str | None
tool_calls: list[dict] | None

Usageโ€‹

@dataclass
class Usage:
prompt_tokens: int
completion_tokens: int
total_tokens: int
cost_usd: float | None # Ferro-specific
cache_hit: bool | None # Ferro-specific
provider: str | None # Ferro-specific

ChatCompletionChunkโ€‹

@dataclass
class ChatCompletionChunk:
id: str
object: str
created: int
model: str
choices: list[StreamChoice]

StreamChoiceโ€‹

@dataclass
class StreamChoice:
index: int
delta: StreamDelta
finish_reason: str | None

StreamDeltaโ€‹

@dataclass
class StreamDelta:
role: str | None
content: str | None
tool_calls: list[dict] | None

client.embeddings.create()โ€‹

Generate embeddings for text input.

response = client.embeddings.create(
model="text-embedding-3-small",
input="The food was delicious",
)
print(response.data[0].embedding[:5])

Parametersโ€‹

ParameterTypeRequiredDescription
modelstrYesEmbedding model name
inputstr | list[str]YesText to embed (single string or batch)
encoding_formatstrNoOutput encoding format (e.g., "float", "base64")
dimensionsintNoDesired embedding dimensions (model-dependent)
userstrNoEnd-user identifier

Returnsโ€‹

EmbeddingResponse

@dataclass
class EmbeddingResponse:
object: str
data: list[EmbeddingData]
model: str
usage: Usage | None

@dataclass
class EmbeddingData:
object: str
embedding: list[float]
index: int

client.images.generate()โ€‹

Generate images from a text prompt.

response = client.images.generate(
model="dall-e-3",
prompt="A sunset over a mountain lake",
size="1024x1024",
)
print(response.data[0].url)

Parametersโ€‹

ParameterTypeRequiredDescription
modelstrYesImage generation model name
promptstrYesText description of the desired image
nintNoNumber of images to generate (1--10)
sizestrNoImage size (e.g., "1024x1024", "512x512")
qualitystrNoImage quality (e.g., "standard", "hd")
response_formatstrNoResponse format ("url" or "b64_json")
stylestrNoImage style (e.g., "natural", "vivid")
userstrNoEnd-user identifier

Returnsโ€‹

ImageResponse

@dataclass
class ImageResponse:
created: int
data: list[ImageData]

@dataclass
class ImageData:
url: str | None
b64_json: str | None
revised_prompt: str | None

client.modelsโ€‹

Query available models on the gateway.

models.list()โ€‹

models = client.models.list(provider="openai", capability="chat")
ParameterTypeRequiredDescription
providerstrNoFilter by provider name
capabilitystrNoFilter by capability (e.g., "chat", "embeddings")

Returns: list[ModelInfo]

models.retrieve()โ€‹

model = client.models.retrieve("gpt-4o")
ParameterTypeRequiredDescription
model_idstrYesModel identifier

Returns: ModelInfo

models.search()โ€‹

results = client.models.search("claude")
ParameterTypeRequiredDescription
querystrYesSearch query string

Returns: list[ModelInfo]

ModelInfoโ€‹

@dataclass
class ModelInfo:
id: str
object: str
provider: str
context_window: int
max_output_tokens: int
input_cost_per_token: float
output_cost_per_token: float
capabilities: list[str]
status: str

client.adminโ€‹

Administrative API for managing keys, configuration, logs, providers, and plugins.

caution

Admin endpoints require an API key with admin-level scopes.


admin.keysโ€‹

Manage API keys for the gateway.

admin.keys.list()โ€‹

keys = client.admin.keys.list()

Returns: list[APIKey]

admin.keys.retrieve()โ€‹

key = client.admin.keys.retrieve("key_abc123")
ParameterTypeRequiredDescription
key_idstrYesAPI key identifier

Returns: APIKey

admin.keys.create()โ€‹

new_key = client.admin.keys.create(
name="production-backend",
scopes=["chat", "embeddings"],
expires_at="2025-12-31T23:59:59Z",
)
print(new_key.key) # Only returned on creation
ParameterTypeRequiredDescription
namestrYesHuman-readable key name
scopeslist[str]NoPermission scopes
expires_atstrNoISO 8601 expiration timestamp

Returns: CreatedAPIKey (includes the plaintext key field)

admin.keys.update()โ€‹

client.admin.keys.update("key_abc123", name="renamed-key", active=False)
ParameterTypeRequiredDescription
key_idstrYesAPI key identifier
namestrNoUpdated name
scopeslist[str]NoUpdated scopes
expires_atstrNoUpdated expiration
activeboolNoEnable or disable the key

Returns: APIKey

admin.keys.delete()โ€‹

client.admin.keys.delete("key_abc123")
ParameterTypeRequiredDescription
key_idstrYesAPI key identifier

Returns: None

admin.keys.revoke()โ€‹

client.admin.keys.revoke("key_abc123")
ParameterTypeRequiredDescription
key_idstrYesAPI key identifier

Returns: None

admin.keys.rotate()โ€‹

rotated = client.admin.keys.rotate("key_abc123")
print(rotated.key) # New plaintext key
ParameterTypeRequiredDescription
key_idstrYesAPI key identifier

Returns: CreatedAPIKey

admin.keys.usage()โ€‹

usage = client.admin.keys.usage(limit=10, sort="usage", active=True)
ParameterTypeRequiredDescription
limitintNoNumber of results (default 20)
offsetintNoPagination offset (default 0)
sortstrNoSort field (default "usage")
activeboolNoFilter by active status
sincestrNoISO 8601 start timestamp

Returns: dict


admin.configโ€‹

Manage gateway configuration.

admin.config.get()โ€‹

config = client.admin.config.get()

Returns: GatewayConfig

admin.config.create()โ€‹

result = client.admin.config.create(config={"routing": {"strategy": "cost"}})
ParameterTypeRequiredDescription
configdictYesConfiguration object

Returns: dict

admin.config.update()โ€‹

result = client.admin.config.update(config={"routing": {"strategy": "latency"}})
ParameterTypeRequiredDescription
configdictYesPartial configuration update

Returns: dict

admin.config.delete()โ€‹

result = client.admin.config.delete()

Returns: dict

admin.config.history()โ€‹

history = client.admin.config.history()

Returns: list[ConfigHistoryEntry]

admin.config.rollback()โ€‹

result = client.admin.config.rollback(version=3)
ParameterTypeRequiredDescription
versionintYesConfiguration version to roll back to

Returns: dict


admin.logsโ€‹

Query request logs and statistics.

admin.logs.list()โ€‹

logs = client.admin.logs.list(limit=50, provider="openai", model="gpt-4o")
ParameterTypeRequiredDescription
limitintNoNumber of results (default 50)
offsetintNoPagination offset (default 0)
stagestrNoFilter by pipeline stage
providerstrNoFilter by provider name
modelstrNoFilter by model name
sincestrNoISO 8601 start timestamp

Returns: dict

admin.logs.stats()โ€‹

stats = client.admin.logs.stats(since="2025-01-01T00:00:00Z")
ParameterTypeRequiredDescription
limitintNoLimit on aggregation buckets
sincestrNoISO 8601 start timestamp

Returns: dict

admin.logs.delete()โ€‹

result = client.admin.logs.delete(before="2024-01-01T00:00:00Z")
ParameterTypeRequiredDescription
beforestrNoDelete logs before this ISO 8601 timestamp
stagestrNoDelete only logs matching this stage

Returns: dict


admin.providersโ€‹

admin.providers.list()โ€‹

providers = client.admin.providers.list()

Returns: list[dict]


admin.pluginsโ€‹

admin.plugins.list()โ€‹

plugins = client.admin.plugins.list()

Returns: list[dict]


Convenience Methodsโ€‹

admin.dashboard()โ€‹

Returns a summary of gateway health, key usage, and recent activity.

dashboard = client.admin.dashboard()

Returns: dict

admin.health()โ€‹

Returns the gateway health status.

health = client.admin.health()

Returns: dict


AsyncFerroClientโ€‹

AsyncFerroClient mirrors the FerroClient interface. All methods are async and return awaitables. Streaming returns AsyncIterator[ChatCompletionChunk].

from ferrolabsai import AsyncFerroClient

client = AsyncFerroClient(api_key="sk-ferro-...")

response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.content)

Async Streamingโ€‹

stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True,
)

async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")

Error Handlingโ€‹

The SDK raises typed exceptions for common failure modes.

ExceptionHTTP StatusDescription
AuthenticationError401Invalid or missing API key
PermissionDeniedError403Insufficient scopes
NotFoundError404Resource not found
RateLimitError429Rate limit exceeded
APIError5xxServer-side error
APIConnectionError--Network connectivity failure
APITimeoutError--Request exceeded timeout
from ferrolabsai import FerroClient, RateLimitError

client = FerroClient()

try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")