Skip to main content

Module airbyte_agent_sdk.executor.local_executor

Local executor for direct HTTP execution of connector operations.

Classes

LocalExecutor(config_path: str | None = None, model: ConnectorModel | None = None, secrets: dict[str, SecretStr] | None = None, auth_config: dict[str, SecretStr] | None = None, auth_scheme: str | None = None, enable_logging: bool = False, log_file: str | None = None, execution_context: str | None = None, max_connections: int = 100, max_keepalive_connections: int = 20, max_logs: int | None = 10000, config_values: dict[str, str] | None = None, on_token_refresh: TokenRefreshCallback = None, retry_config: RetryConfig | None = None) : Async executor for Entity×Action operations with direct HTTP execution.

This is the "local mode" executor that makes direct HTTP calls to external APIs. It performs local entity/action lookups, validation, and request building.

Implements ExecutorProtocol.

Initialize async executor.

Args: config_path: Path to connector.yaml. If neither config_path nor model is provided, an error will be raised. model: ConnectorModel object to execute. secrets: (Legacy) Auth parameters that bypass x-airbyte-auth-config mapping. Directly passed to auth strategies (e.g., {"username": "...", "password": "..."}). Cannot be used together with auth_config. auth_config: User-facing auth configuration following x-airbyte-auth-config spec. Will be transformed via auth_mapping to produce auth parameters. Cannot be used together with secrets. auth_scheme: (Multi-auth only) Explicit security scheme name to use. If None, SDK will auto-select based on provided credentials. Example: auth_scheme="githubOAuth" enable_logging: Enable request/response logging log_file: Path to log file (if enable_logging=True) execution_context: Execution context (mcp, direct, blessed, agent) max_connections: Maximum number of concurrent connections max_keepalive_connections: Maximum number of keepalive connections max_logs: Maximum number of logs to keep in memory before rotation. Set to None for unlimited (not recommended for production). Defaults to 10000. config_values: Optional dict of config values for server variable substitution (e.g., {"subdomain": "acme"} for URLs like https://{subdomain}.api.example.com). on_token_refresh: Optional callback function(new_tokens: dict) called when OAuth2 tokens are refreshed. Use this to persist updated tokens. Can be sync or async. Example: lambda tokens: save_to_db(tokens) retry_config: Optional retry configuration override. If provided, overrides the connector.yaml x-airbyte-retry-config. If None, uses connector.yaml config or SDK defaults.

Methods

check(self) ‑> airbyte_agent_sdk.executor.models.ExecutionResult : Perform a health check by running a lightweight operation.

Uses shared find_check_operation() to find the best operation, then executes it to verify connectivity and credentials.

Returns: ExecutionResult with data containing status, error, and checked operation details.

check_entities(self, entities: list[str]) ‑> airbyte_agent_sdk.executor.models.ExecutionResult : Perform health checks for specific entities by probing their list or get operations.

For each entity, looks up (entity_name, Action.LIST) in the operation index. If not found, falls back to (entity_name, Action.GET). Runs all probes concurrently and returns per-entity results.

Args: entities: List of entity names to check.

Returns: ExecutionResult with per-entity health check results.

close(self) : Close async HTTP client and logger.

execute(self, config_or_entity: ExecutionConfig | str, action: str | None = None, *, params: dict[str, Any] | None = None) ‑> airbyte_agent_sdk.executor.models.ExecutionResult : Execute connector operation using handler pattern.

Accepts either an :class:ExecutionConfig or positional (entity, action) strings with an optional params keyword argument.

Args: config_or_entity: ExecutionConfig object or entity name string action: Action string (required when entity is a string) params: Optional parameters dict (only with string form)

Returns: ExecutionResult with success/failure status and data

Example: config = ExecutionConfig( entity="customers", action="list", params={"limit": 10} ) result = await executor.execute(config) if result.success: print(result.data)

Shorthand form:

result = await executor.execute("customers", "list", params={"limit": 10})

execute_batch(self, operations: list[tuple[str, str | Action, dict[str, Any] | None]]) ‑> list[dict[str, typing.Any] | collections.abc.AsyncIterator[bytes]] : Execute multiple operations concurrently (supports all action types including download).

Args: operations: List of (entity, action, params) tuples

Returns: List of responses in the same order as operations. Standard operations return dict[str, Any]. Download operations return AsyncIterator[bytes].

Raises: ValueError: If any entity or action not found HTTPClientError: If any API request fails

Example: results = await executor.execute_batch([ ("Customer", "list", {"limit": 10}), ("Customer", "get", {"id": "cus_123"}), ("attachments", "download", {"id": "att_456"}), ])

ParamResolutionError(*args, **kwargs) : Raised when a path parameter cannot be resolved for entity probing.

Covers structural resolution failures (unresolvable param, self-reference, parent with no LIST op, parent returning no records, missing parent key, max recursion depth). _probe_entity converts these into UNHEALTHY results so the backend controller classifies them as INCONCLUSIVE. SKIPPED is reserved exclusively for "this entity has no list/get action at all".

Execution failures from probing a parent entity use ParentProbeError instead, so the child can inherit the parent's status_code for 401/403 -> FAILED classification.

Ancestors (in MRO)

  • builtins.Exception
  • builtins.BaseException

ParentProbeError(message: str, status_code: int | None = None) : Raised when a parent entity's LIST probe fails during param resolution.

Wraps the original exception's message with parent-entity context for debuggability, while preserving the parent's status_code so the child can be classified the same way the parent would be (401/403 -> FAILED, everything else -> INCONCLUSIVE at the backend controller layer).

Distinct from ParamResolutionError so the status_code survives -- both route through _probe_entity's UNHEALTHY branch, but only ParentProbeError carries the parent's HTTP status.

Ancestors (in MRO)

  • builtins.Exception
  • builtins.BaseException