Python CDK Speedrun: Creating a Source

CDK Speedrun (HTTP API Source Creation Any% Route)

This is a blazing fast guide to building an HTTP source connector. Think of it as the TL;DR version of this tutorial.
If you are a visual learner and want to see a video version of this guide going over each part in detail, check it out below.

Dependencies

  1. 1.
    Python >= 3.7
  2. 2.
    Docker
  3. 3.
    NodeJS

Generate the Template

1
$ cd airbyte-integrations/connector-templates/generator # start from repo root
2
$ ./generate.sh
Copied!
Select the Python HTTP API Source and name it python-http-example.

Create Dev Environment

1
cd ../../connectors/source-python-http-example
2
python -m venv .venv # Create a virtual environment in the .venv directory
3
source .venv/bin/activate
4
pip install -r requirements.txt
Copied!

Define Connector Inputs

1
cd source_python_http_example
Copied!
We're working with the PokeAPI, so we need to define our input schema to reflect that. Open the spec.json file here and replace it with:
1
{
2
"documentationUrl": "https://docs.airbyte.io/integrations/sources/pokeapi",
3
"connectionSpecification": {
4
"$schema": "http://json-schema.org/draft-07/schema#",
5
"title": "Pokeapi Spec",
6
"type": "object",
7
"required": ["pokemon_name"],
8
"additionalProperties": false,
9
"properties": {
10
"pokemon_name": {
11
"type": "string",
12
"description": "Pokemon requested from the API.",
13
"pattern": "^[a-z0-9_\\-]+quot;,
14
"examples": ["ditto, luxray, snorlax"]
15
}
16
}
17
}
18
}
Copied!
As you can see, we have one input to our input schema, which is pokemon_name, which is required. Normally, input schemas will contain information such as API keys and client secrets that need to get passed down to all endpoints or streams.
Ok, let's write a function that checks the inputs we just defined. Nuke the source.py file. Now add this code to it. For a crucial time skip, we're going to define all the imports we need in the future here. Also note that your AbstractSource class name must be a camel-cased version of the name you gave in the generation phase. In our case, this is SourcePythonHttpExample.
1
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
2
3
import requests
4
from airbyte_cdk.sources import AbstractSource
5
from airbyte_cdk.sources.streams import Stream
6
from airbyte_cdk.sources.streams.http import HttpStream
7
8
from . import pokemon_list
9
10
class SourcePythonHttpExample(AbstractSource):
11
def check_connection(self, logger, config) -> Tuple[bool, any]:
12
input_pokemon = config["pokemon_name"]
13
if input_pokemon not in pokemon_list.POKEMON_LIST:
14
return False, f"Input Pokemon {input_pokemon} is invalid. Please check your spelling and input a valid Pokemon."
15
else:
16
return True, None
17
18
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
19
return [Pokemon(pokemon_name=config["pokemon_name"])]
Copied!
Create a new file called pokemon_list.py at the same level. This will handle input validation for us so that we don't input invalid Pokemon. Let's start with a very limited list - any Pokemon not included in this list will get rejected.
1
"""
2
pokemon_list.py includes a list of all known pokemon for config validation in source.py.
3
"""
4
5
POKEMON_LIST = [
6
"bulbasaur",
7
"charizard",
8
"wartortle",
9
"pikachu",
10
"crobat",
11
]
Copied!
Test it.
1
cd ..
2
mkdir sample_files
3
echo '{"pokemon_name": "pikachu"}' > sample_files/config.json
4
echo '{"pokemon_name": "chikapu"}' > sample_files/invalid_config.json
5
python main.py check --config sample_files/config.json
6
python main.py check --config sample_files/invalid_config.json
Copied!
Expected output:
1
> python main.py check --config sample_files/config.json
2
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}
3
4
> python main.py check --config sample_files/invalid_config.json
5
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "FAILED", "message": "'Input Pokemon chikapu is invalid. Please check your spelling our input a valid Pokemon.'"}}
Copied!

Define your Stream

In your source.py file, add this Pokemon class. This stream represents an endpoint you want to hit, which in our case, is the single Pokemon endpoint.
1
class Pokemon(HttpStream):
2
url_base = "https://pokeapi.co/api/v2/"
3
4
# Set this as a noop.
5
primary_key = None
6
7
def __init__(self, pokemon_name: str, **kwargs):
8
super().__init__(**kwargs)
9
self.pokemon_name = pokemon_name
10
11
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
12
# The API does not offer pagination, so we return None to indicate there are no more pages in the response
13
return None
14
15
def path(
16
self,
17
) -> str:
18
return "" # TODO
19
20
def parse_response(
21
self,
22
) -> Iterable[Mapping]:
23
return None # TODO
Copied!
Now download this file. Name it pokemon.json and place it in /source_python_http_example/schemas.
This file defines your output schema for every endpoint that you want to implement. Normally, this will likely be the most time-consuming section of the connector development process, as it requires defining the output of the endpoint exactly. This is really important, as Airbyte needs to have clear expectations for what the stream will output. Note that the name of this stream will be consistent in the naming of the JSON schema and the HttpStream class, as pokemon.json and Pokemon respectively in this case. Learn more about schema creation here.
Test your discover function. You should receive a fairly large JSON object in return.
1
python main.py discover --config sample_files/config.json
Copied!
Note that our discover function is using the pokemon_name config variable passed in from the Pokemon stream when we set it in the __init__ function.

Reading Data from the Source

Update your Pokemon class to implement the required functions as follows:
1
class Pokemon(HttpStream):
2
url_base = "https://pokeapi.co/api/v2/"
3
4
# Set this as a noop.
5
primary_key = None
6
7
def __init__(self, pokemon_name: str, **kwargs):
8
super().__init__(**kwargs)
9
# Here's where we set the variable from our input to pass it down to the source.
10
self.pokemon_name = pokemon_name
11
12
def path(self, **kwargs) -> str:
13
pokemon_name = self.pokemon_name
14
# This defines the path to the endpoint that we want to hit.
15
return f"pokemon/{pokemon_name}"
16
17
def request_params(
18
self,
19
stream_state: Mapping[str, Any],
20
stream_slice: Mapping[str, Any] = None,
21
next_page_token: Mapping[str, Any] = None,
22
) -> MutableMapping[str, Any]:
23
# The api requires that we include the Pokemon name as a query param so we do that in this method.
24
return {"pokemon_name": self.pokemon_name}
25
26
def parse_response(
27
self,
28
response: requests.Response,
29
stream_state: Mapping[str, Any],
30
stream_slice: Mapping[str, Any] = None,
31
next_page_token: Mapping[str, Any] = None,
32
) -> Iterable[Mapping]:
33
# The response is a simple JSON whose schema matches our stream's schema exactly,
34
# so we just return a list containing the response.
35
return [response.json()]
36
37
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
38
# While the PokeAPI does offer pagination, we will only ever retrieve one Pokemon with this implementation,
39
# so we just return None to indicate that there will never be any more pages in the response.
40
return None
Copied!
We now need a catalog that defines all of our streams. We only have one stream: Pokemon. Download that file here. Place it in /sample_files named as configured_catalog.json. More clearly, this is where we tell Airbyte all the streams/endpoints we support for the connector and in which sync modes Airbyte can run the connector on. Learn more about the AirbyteCatalog here and learn more about sync modes here.
Let's read some data.
1
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
Copied!
If all goes well, containerize it so you can use it in the UI:
1
docker build . -t airbyte/source-python-http-example:dev
Copied!
You're done. Stop the clock :)
Last modified 6d ago