Skip to main content

Module airbyte_agent_sdk.workspace

Workspace — top-level entry point for hosted-mode workspace operations.

Classes

Workspace(*, client_id: str | None = None, client_secret: str | None = None, workspace_name: str | None = None, organization_id: str | None = None) : Top-level entry point for Airbyte hosted-mode workspace operations.

Provides workspace-level methods: list/create/delete connectors, get a connector executor, and workflow/automation CRUD. Use Workspace when you want to operate against a whole workspace (many connectors, workflows, automations); use connect() when you already know which connector you want to execute.

Example:

import asyncio
from airbyte_agent_sdk import Workspace

async def main():
async with Workspace(
client_id="your_client_id",
client_secret="your_client_secret",
workspace_name="my-workspace",
) as ws:
connectors = await ws.list_connectors()
print(len(connectors))

asyncio.run(main())

Args: client_id: Airbyte OAuth client ID (or set AIRBYTE_CLIENT_ID). client_secret: Airbyte OAuth client secret (or set AIRBYTE_CLIENT_SECRET). workspace_name: Workspace name for scoping operations. Defaults to "default". organization_id: Optional org ID for multi-org routing.

Raises: ValueError: If client_id/client_secret are not supplied and no AIRBYTE_CLIENT_ID/AIRBYTE_CLIENT_SECRET env vars are set.

Methods

close(self) : Close the cloud client.

create_automation(self, workflow_id: str, *, trigger_type: str = 'schedule', enabled: bool = True, cron_expression: str | None = None, timezone: str | None = None, completion_webhook_url: str | None = None) ‑> airbyte_agent_sdk.executor.models.AutomationInfo : Create an automation on a workflow.

create_connector(self, *, definition_id: str, credentials: dict[str, Any] | None = None, name: str | None = None, replication_config: dict[str, Any] | None = None, source_template_id: str | None = None) ‑> str : Create a new connector, returns the connector ID.

create_workflow(self, name: str, *, tasks: list[dict[str, Any]] | None = None) ‑> airbyte_agent_sdk.executor.models.WorkflowInfo : Create a workflow in this workspace.

delete_automation(self, workflow_id: str, automation_id: str) ‑> None : Delete an automation.

delete_connector(self, connector_id: str) ‑> None : Delete a connector.

delete_workflow(self, workflow_id: str) ‑> None : Delete a workflow.

get_automation(self, workflow_id: str, automation_id: str) ‑> airbyte_agent_sdk.executor.models.AutomationInfo : Get a single automation.

get_connector(self, *, connector_id: str | None = None, name: str | None = None) ‑> airbyte_agent_sdk.executor.hosted_executor.HostedExecutor : Get a HostedExecutor for a specific connector.

Provide exactly one of connector_id or name:

  • connector_id: Direct lookup, no API call needed.
  • name: Resolves connector slug (e.g. "stripe") to the single instance of that type in this workspace. Raises ValueError if 0 or >1 found.

Creates an independent HostedExecutor with its own AirbyteCloudClient. The caller is responsible for closing the executor when done.

Example: stripe = await ws.get_connector(name="stripe") try: result = await stripe.execute(...) finally: await stripe.close()

get_workflow(self, workflow_id: str) ‑> airbyte_agent_sdk.executor.models.WorkflowInfo : Get a single workflow by ID.

list_automations(self, workflow_id: str | None = None) ‑> list[airbyte_agent_sdk.executor.models.AutomationInfo] : List automations.

When workflow_id is a non-None string, returns automations for that single workflow (workflow-scoped path, unchanged behavior).

When workflow_id is omitted or passed explicitly as None, returns all automations across every workflow in the workspace via a bounded-concurrency fan-out over list_workflows(). Omission and explicit None are treated identically.

Concurrent per-workflow HTTP requests are bounded by _AUTOMATION_FANOUT_CONCURRENCY. The first per-workflow error propagates to the caller immediately; sibling in-flight requests continue to completion in the background (their results are discarded). Workflows deleted mid-flight surface as an error rather than being silently filtered.

Result ordering: automations are grouped per workflow in the order returned by list_workflows(); ordering within a workflow matches the backend response.

list_connectors(self) ‑> list[airbyte_agent_sdk.executor.models.ConnectorInfo] : List connector instances in this workspace.

list_workflows(self) ‑> list[airbyte_agent_sdk.executor.models.WorkflowInfo] : List workflows in this workspace.

update_automation(self, workflow_id: str, automation_id: str, *, enabled: bool | None = None, trigger_type: str | None = None, cron_expression: str | None = None, timezone: str | None = None, completion_webhook_url: str | None = None) ‑> airbyte_agent_sdk.executor.models.AutomationInfo : Update an automation.

update_workflow(self, workflow_id: str, *, name: str | None = None) ‑> airbyte_agent_sdk.executor.models.WorkflowInfo : Update a workflow.

Was this page helpful?