Building a Source Connector: The Hard Way
This tutorial walks you through building a simple Airbyte source without using any helpers to demonstrate the following concepts in action:
- The Airbyte Specification and the interface implemented by a source connector
- The AirbyteCatalog
- Packaging your connector
- Testing your connector
This tutorial is meant for those interested in learning how the Airbyte Specification works in detail, not for creating production connectors. If you're building a real source, you should start with using the Connector Builder, or the Connector Development Kit.
Requirements
To run this tutorial, you'll need:
- Docker, Python, and Java with the versions listed in the tech stack section.
- The
requests
Python package installed viapip install requests
(orpip3
ifpip
is linked to a Python2 installation on your system)
Our connector: a stock ticker API
The connector will output the daily price of a stock since a given date. We'll leverage Polygon.io API for this.
We'll use Python to implement the connector, but you could build an Airbyte connector in any language.
Here's the outline of what we'll do to build the connector:
- Use the Airbyte connector template to bootstrap the connector package
- Implement the methods required by the Airbyte Specification for our connector:
spec
: declares the user-provided credentials or configuration needed to run the connectorcheck
: tests if the connector can connect with the underlying data source with the user-provided configurationdiscover
: declares the different streams of data that this connector can outputread
: reads data from the underlying data source (The stock ticker API)
- Package the connector in a Docker image
- Test the connector using Airbyte's Connector Acceptance Test Suite
- Use the connector to create a new Connection and run a sync in Airbyte UI
Part 2 of this article covers:
- Support incremental sync
- Add custom integration tests
Let's get started!
1. Bootstrap the connector package
Start the process from the Airbyte repository root:
$ pwd
/Users/sherifnada/code/airbyte
Airbyte provides a code generator which bootstraps the scaffolding for our connector. Let's use it by running:
$ cd airbyte-integrations/connector-templates/generator
$ ./generate.sh
Select the Generic Source
template and call the connector stock-ticker-api
.
This tutorial uses the bare-bones Generic Source
template to illustrate how all the pieces
of a connector work together. For real connectors, the generator provides Python
and
Python HTTP API
source templates, they use Airbyte CDK.
$ cd ../../connectors/source-stock-ticker-api
$ ls
Dockerfile README.md acceptance-test-config.yml metadata.yaml
2. Implement the connector in line with the Airbyte Specification
In the connector package directory, create a single Python file source.py
that will hold our
implementation:
touch source.py
Implement the spec operation
The spec
operation is described in the
Airbyte Protocol. It's a
way for the connector to tell Airbyte what user inputs it needs in order to connecto to the source
(the stock ticker API in our case). Airbyte expects the command to output a connector specification
in AirbyteMessage
format.
To contact the stock ticker API, we need two things:
- Which stock ticker we're interested in
- The API key to use when contacting the API (you can obtain a free API token from Polygon.io free plan)
For reference, the API docs we'll be using can be found here.
Let's create a JSONSchema file spec.json
encoding these two
requirements:
{
"documentationUrl": "https://polygon.io/docs/stocks/get_v2_aggs_ticker__stocksticker__range__multiplier___timespan___from___to",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["stock_ticker", "api_key"],
"properties": {
"stock_ticker": {
"type": "string",
"title": "Stock Ticker",
"description": "The stock ticker to track",
"examples": ["AAPL", "TSLA", "AMZN"]
},
"api_key": {
"title": "API Key",
"type": "string",
"description": "The Polygon.io Stocks API key to use to hit the API.",
"airbyte_secret": true
}
}
}
}
documentationUrl
is the URL that will appear in the UI for the user to gain more info about this connector. Typically this points todocs.airbyte.com/integrations/sources/source-<connector_name>
but to keep things simple we won't show adding documentationtitle
is the "human readable" title displayed in the UI. Without this field, The Stock Ticker field will have the titlestock_ticker
in the UIdescription
will be shown in the Airbyte UI under each field to help the user understand itairbyte_secret
used by Airbyte to determine if the field should be displayed as a password (e.g:********
) in the UI and not readable from the API
$ ls -1
Dockerfile
README.md
acceptance-test-config.yml
source.py
metadata.yaml
spec.json
Now, let's edit source.py
to detect if the program was invoked with the spec
argument and if so,
output the connector specification:
# source.py
import argparse # helps parse commandline arguments
import json
import sys
import os
from datetime import datetime
def read_json(filepath):
with open(filepath, "r") as f:
return json.loads(f.read())
def log(message):
log_json = {"type": "LOG", "log": message}
print(json.dumps(log_json))
def log_error(error_message):
current_time_in_ms = int(datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))
def spec():
# Read the file named spec.json from the module directory as a JSON file
current_script_directory = os.path.dirname(os.path.realpath(__file__))
spec_path = os.path.join(current_script_directory, "spec.json")
specification = read_json(spec_path)
# form an Airbyte Message containing the spec and print it to stdout
airbyte_message = {"type": "SPEC", "spec": specification}
# json.dumps converts the JSON (Python dict) to a string
print(json.dumps(airbyte_message))
def run(args):
parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")
# Accept the spec command
subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser])
parsed_args = main_parser.parse_args(args)
command = parsed_args.command
if command == "spec":
spec()
else:
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
# had a failure
log_error("Invalid command. Allowable commands: [spec]")
sys.exit(1)
# A zero exit code means the process successfully completed
sys.exit(0)
def main():
arguments = sys.argv[1:]
run(arguments)
if __name__ == "__main__":
main()
Some notes on the above code:
- As described in the
specification,
Airbyte connectors are CLIs which communicate via stdout, so the output of the command is simply
a JSON string formatted according to the Airbyte Specification. So to "return" a value we use
print
to output the return value to stdout. - All Airbyte commands can output log messages that take the form
{"type":"LOG", "log":"message"}
, so we create a helper methodlog(message)
to allow logging. - All Airbyte commands can output error messages that take the form
{"type":"TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}}
, so we create a helper methodlog_error(message)
to allow error messages.
Now if we run python source.py spec
we should see the specification printed out:
python source.py spec
{"type": "SPEC", "spec": {"documentationUrl": "https://polygon.io/docs/stocks/get_v2_aggs_ticker__stocksticker__range__multiplier___timespan___from___to", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "required": ["stock_ticker", "api_key"], "properties": {"stock_ticker": {"type": "string", "title": "Stock Ticker", "description": "The stock ticker to track", "examples": ["AAPL", "TSLA", "AMZN"]}, "api_key": {"type": "string", "description": "The Polygon.io Stocks API key to use to hit the API.", "airbyte_secret": true}}}}}
Implementing check connection
The second command to implement is the
check operation
check --config <config_name>
, which tells the user whether a config file they gave us is correct.
In our case, "correct" means they input a valid stock ticker and a correct API key like we declare
via the spec
operation.
To achieve this, we'll:
- Create valid and invalid configuration files to test the success and failure cases with our
connector. We'll place config files in the
secrets/
directory which is gitignored everywhere in the Airbyte monorepo by default to avoid accidentally checking in API keys. - Add a
check
method which calls the Polygon.io API to verify if the provided token & stock ticker are correct and output the correct airbyte message. - Extend the argument parser to recognize the
check --config <config>
command and call thecheck
method when thecheck
command is invoked.
Let's first add the configuration files:
$ mkdir secrets
$ echo '{"api_key": "put_your_key_here", "stock_ticker": "TSLA"}' > secrets/valid_config.json
$ echo '{"api_key": "not_a_real_key", "stock_ticker": "TSLA"}' > secrets/invalid_config.json
Make sure to add your actual API key instead of the placeholder value <put_your_key_here>
when
following the tutorial.
Then we'll add the check
method:
import requests
from datetime import date
from datetime import datetime
from datetime import timedelta
def _call_api(ticker, token):
today = date.today()
to_day = today.strftime("%Y-%m-%d")
from_day = (today - timedelta(days=7)).strftime("%Y-%m-%d")
return requests.get(f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/1/day/{from_day}/{to_day}?sort=asc&limit=120&apiKey={token}")
def check(config):
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
response = _call_api(ticker=config["stock_ticker"], token=config["api_key"])
if response.status_code == 200:
result = {"status": "SUCCEEDED"}
elif response.status_code == 403:
# HTTP code 403 means authorization failed so the API key is incorrect
result = {"status": "FAILED", "message": "API Key is incorrect."}
else:
result = {"status": "FAILED", "message": "Input configuration is incorrect. Please verify the input stock ticker and API key."}
output_message = {"type": "CONNECTION_STATUS", "connectionStatus": result}
print(json.dumps(output_message))
In Airbyte, the contract for input files is that they will be available in the current working directory if they are not provided as an absolute path. This method helps us achieve that:
def get_input_file_path(path):
if os.path.isabs(path):
return path
else:
return os.path.join(os.getcwd(), path)
We'll then add the check
command support to run
:
def run(args):
parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")
# Accept the spec command
subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser])
# Accept the check command
check_parser = subparsers.add_parser("check", help="checks the config used to connect", parents=[parent_parser])
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
parsed_args = main_parser.parse_args(args)
command = parsed_args.command
if command == "spec":
spec()
elif command == "check":
config_file_path = get_input_file_path(parsed_args.config)
config = read_json(config_file_path)
check(config)
else:
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
# had a failure
log_error("Invalid command. Allowable commands: [spec, check]")
sys.exit(1)
# A zero exit code means the process successfully completed
sys.exit(0)
Let's test our new method:
$ python source.py check --config secrets/valid_config.json
{'type': 'CONNECTION_STATUS', 'connectionStatus': {'status': 'SUCCEEDED'}}
$ python source.py check --config secrets/invalid_config.json
{'type': 'CONNECTION_STATUS', 'connectionStatus': {'status': 'FAILED', 'message': 'API Key is incorrect.'}}
Our connector is able to detect valid and invalid configs correctly. Two methods down, two more to go!
Implementing Discover
The discover
command outputs a Catalog, a struct that declares the Streams and Fields (Airbyte's
equivalents of tables and columns) output by the connector. It also includes metadata around which
features a connector supports (e.g. which sync modes). In other words it describes what data is
available in the source. If you'd like to read a bit more about this concept check out our
Beginner's Guide to the Airbyte Catalog
or for a more detailed treatment read the
Airbyte Specification.
The stock ticker connector outputs records belonging to exactly one Stream (table). Each record
contains three Fields (columns): date
, price
, and stock_ticker
, corresponding to the price
of a stock on a given day.
To implement discover
, we'll:
- Add a method
discover
insource.py
which outputs the Catalog. To better understand what a catalog is, check out our Beginner's Guide to the AirbyteCatalog - Extend the arguments parser to use detect the
discover --config <config_path>
command and call thediscover
method
Let's implement discover
by adding the following in source.py
:
def discover():
catalog = {
"streams": [{
"name": "stock_prices",
"supported_sync_modes": ["full_refresh"],
"json_schema": {
"properties": {
"date": {
"type": "string"
},
"price": {
"type": "number"
},
"stock_ticker": {
"type": "string"
}
}
}
}]
}
airbyte_message = {"type": "CATALOG", "catalog": catalog}
print(json.dumps(airbyte_message))
Note that we describe the schema of the output stream using JSONSchema.
Then we'll extend the arguments parser by adding the following blocks to the run
method:
# Accept the discover command
discover_parser = subparsers.add_parser("discover", help="outputs a catalog describing the source's schema", parents=[parent_parser])
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
and
elif command == "discover":
discover()
We need to update our list of available commands:
log("Invalid command. Allowable commands: [spec, check, discover]")
You may be wondering why config
is a required input to discover
if it's not used. This
is done for consistency: the Airbyte Specification requires --config
as an input to discover
because many sources require it (e.g: to discover the tables available in a Postgres database, you
must supply a password). So instead of guessing whether the flag is required depending on the
connector, we always assume it is required, and the connector can choose whether to use it.
The full run method is now below:
def run(args):
parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")
# Accept the spec command
subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser])
# Accept the check command
check_parser = subparsers.add_parser("check", help="checks the config used to connect", parents=[parent_parser])
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
# Accept the discover command
discover_parser = subparsers.add_parser("discover", help="outputs a catalog describing the source's schema", parents=[parent_parser])
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
parsed_args = main_parser.parse_args(args)
command = parsed_args.command
if command == "spec":
spec()
elif command == "check":
config_file_path = get_input_file_path(parsed_args.config)
config = read_json(config_file_path)
check(config)
elif command == "discover":
discover()
else:
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
# had a failure
log_error("Invalid command. Allowable commands: [spec, check, discover]")
sys.exit(1)
# A zero exit code means the process successfully completed
sys.exit(0)
Let's test our new command:
$ python source.py discover --config secrets/valid_config.json
{"type": "CATALOG", "catalog": {"streams": [{"name": "stock_prices", "supported_sync_modes": ["full_refresh"], "json_schema": {"properties": {"date": {"type": "string"}, "price": {"type": "number"}, "stock_ticker": {"type": "string"}}}}]}}
With that, we're done implementing the discover
command.
Implementing the read operation
We've done a lot so far, but a connector ultimately exists to read data! This is where the
read
command comes in.
The format of the command is:
python source.py read --config <config_file_path> --catalog <configured_catalog.json> [--state <state_file_path>]
Each of these are described in the Airbyte Specification in detail, but we'll give a quick description of the two options we haven't seen so far:
--catalog
points to a Configured Catalog. The Configured Catalog contains the contents for the Catalog (remember the Catalog we output from discover?). It also contains some configuration information that describes how the data will by replicated. For example, we hadsupported_sync_modes
in the Catalog. In the Configured Catalog, we select which of thesupported_sync_modes
we want to use by specifying thesync_mode
field. (This is the most complicated concept when working Airbyte, so if it is still not making sense that's okay for now. If you're just dying to understand how the Configured Catalog works checkout the Beginner's Guide to the Airbyte Catalog).--state
points to a state file. The state file is only relevant when some Streams are synced with the sync modeincremental
, so we'll cover the state file in more detail in the incremental section below.
Our connector only supports one Stream, stock_prices
, so we'd expect the input catalog to contain
that stream configured to sync in full refresh. Since our connector doesn't support incremental sync
yet, we'll ignore the state option for now.
To read data in our connector, we'll:
- Create a configured catalog which tells our connector that we want to sync the
stock_prices
stream - Implement a method
read
insource.py
. For now we'll always read the last 7 days of a stock price's data - Extend the arguments parser to recognize the
read
command and its arguments
First, let's create a configured catalog fullrefresh_configured_catalog.json
to use as test input
for the read operation:
{
"streams": [
{
"stream": {
"name": "stock_prices",
"supported_sync_modes": [
"full_refresh"
],
"json_schema": {
"properties": {
"date": {
"type": "string"
},
"price": {
"type": "number"
},
"stock_ticker": {
"type": "string"
}
}
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Then we'll define the read
method in source.py
:
def log_error(error_message):
current_time_in_ms = int(datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))
def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
# Find the stock_prices stream if it is present in the input catalog
stock_prices_stream = None
for configured_stream in catalog["streams"]:
if configured_stream["stream"]["name"] == "stock_prices":
stock_prices_stream = configured_stream
if stock_prices_stream is None:
log_error("No stream selected.")
return
# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log_error("This connector only supports full refresh syncs! (for now)")
sys.exit(1)
# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by date in ascending order
# We want to output them one by one as AirbyteMessages
results = response.json()["results"]
for result in results:
data = {"date": date.fromtimestamp(result["t"]/1000).isoformat(), "stock_ticker": config["stock_ticker"], "price": result["c"]}
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.now().timestamp()) * 1000}
output_message = {"type": "RECORD", "record": record}
print(json.dumps(output_message))
After doing some input validation, the code above calls the API to obtain daily prices for the input stock ticker, then outputs the prices. As always, our output is formatted according to the Airbyte Specification. Let's update our args parser with the following blocks:
# Accept the read command
read_parser = subparsers.add_parser("read", help="reads the source and outputs messages to STDOUT", parents=[parent_parser])
read_parser.add_argument("--state", type=str, required=False, help="path to the json-encoded state file")
required_read_parser = read_parser.add_argument_group("required named arguments")
required_read_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
required_read_parser.add_argument(
"--catalog", type=str, required=True, help="path to the catalog used to determine which data to read"
)
and:
elif command == "read":
config = read_json(get_input_file_path(parsed_args.config))
configured_catalog = read_json(get_input_file_path(parsed_args.catalog))
read(config, configured_catalog)
and:
log_error("Invalid command. Allowable commands: [spec, check, discover, read]")
this yields the following run
method:
def run(args):
parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")
# Accept the spec command
subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser])
# Accept the check command
check_parser = subparsers.add_parser("check", help="checks the config used to connect", parents=[parent_parser])
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
# Accept the discover command
discover_parser = subparsers.add_parser("discover", help="outputs a catalog describing the source's schema", parents=[parent_parser])
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
# Accept the read command
read_parser = subparsers.add_parser("read", help="reads the source and outputs messages to STDOUT", parents=[parent_parser])
read_parser.add_argument("--state", type=str, required=False, help="path to the json-encoded state file")
required_read_parser = read_parser.add_argument_group("required named arguments")
required_read_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
required_read_parser.add_argument(
"--catalog", type=str, required=True, help="path to the catalog used to determine which data to read"
)
parsed_args = main_parser.parse_args(args)
command = parsed_args.command
if command == "spec":
spec()
elif command == "check":
config_file_path = get_input_file_path(parsed_args.config)
config = read_json(config_file_path)
check(config)
elif command == "discover":
discover()
elif command == "read":
config = read_json(get_input_file_path(parsed_args.config))
configured_catalog = read_json(get_input_file_path(parsed_args.catalog))
read(config, configured_catalog)
else:
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
# had a failure
log_error("Invalid command. Allowable commands: [spec, check, discover, read]")
sys.exit(1)
# A zero exit code means the process successfully completed
sys.exit(0)
Let's test out our new command:
$ python source.py read --config secrets/valid_config.json --catalog fullrefresh_configured_catalog.json
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-15', 'stock_ticker': 'TSLA', 'price': 633.25}, 'emitted_at': 1608626365000}}
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-16', 'stock_ticker': 'TSLA', 'price': 622.77}, 'emitted_at': 1608626365000}}
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-17', 'stock_ticker': 'TSLA', 'price': 655.9}, 'emitted_at': 1608626365000}}
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-18', 'stock_ticker': 'TSLA', 'price': 695}, 'emitted_at': 1608626365000}}
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-21', 'stock_ticker': 'TSLA', 'price': 649.86}, 'emitted_at': 1608626365000}}
With this method, we now have a fully functioning connector! Let's pat ourselves on the back for getting there.
For reference, the full source.py
file now looks like this:
import argparse
import json
import sys
import os
import requests
from datetime import date
from datetime import datetime
from datetime import timedelta
def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
# Find the stock_prices stream if it is present in the input catalog
stock_prices_stream = None
for configured_stream in catalog["streams"]:
if configured_stream["stream"]["name"] == "stock_prices":
stock_prices_stream = configured_stream
if stock_prices_stream is None:
log_error("No streams selected")
return
# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log_error("This connector only supports full refresh syncs! (for now)")
sys.exit(1)
# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by date in ascending order
# We want to output them one by one as AirbyteMessages
results = response.json()["results"]
for result in results:
data = {"date": date.fromtimestamp(result["t"]/1000).isoformat(), "stock_ticker": config["stock_ticker"], "price": result["c"]}
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.now().timestamp()) * 1000}
output_message = {"type": "RECORD", "record": record}
print(json.dumps(output_message))
def read_json(filepath):
with open(filepath, "r") as f:
return json.loads(f.read())
def _call_api(ticker, token):
today = date.today()
to_day = today.strftime("%Y-%m-%d")
from_day = (today - timedelta(days=7)).strftime("%Y-%m-%d")
return requests.get(f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/1/day/{from_day}/{to_day}?sort=asc&limit=120&apiKey={token}")
def check(config):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
else:
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
response = _call_api(ticker=config["stock_ticker"], token=config["api_key"])
if response.status_code == 200:
result = {"status": "SUCCEEDED"}
elif response.status_code == 403:
# HTTP code 403 means authorization failed so the API key is incorrect
result = {"status": "FAILED", "message": "API Key is incorrect."}
else:
# Consider any other code a "generic" failure and tell the user to make sure their config is correct.
result = {"status": "FAILED", "message": "Input configuration is incorrect. Please verify the input stock ticker and API key."}
# Format the result of the check operation according to the Airbyte Specification
output_message = {"type": "CONNECTION_STATUS", "connectionStatus": result}
print(json.dumps(output_message))
def log(message):
log_json = {"type": "LOG", "log": message}
print(json.dumps(log_json))
def log_error(error_message):
current_time_in_ms = int(datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))
def discover():
catalog = {
"streams": [{
"name": "stock_prices",
"supported_sync_modes": ["full_refresh"],
"json_schema": {
"properties": {
"date": {
"type": "string"
},
"price": {
"type": "number"
},
"stock_ticker": {
"type": "string"
}
}
}
}]
}
airbyte_message = {"type": "CATALOG", "catalog": catalog}
print(json.dumps(airbyte_message))
def get_input_file_path(path):
if os.path.isabs(path):
return path
else:
return os.path.join(os.getcwd(), path)
def spec():
# Read the file named spec.json from the module directory as a JSON file
current_script_directory = os.path.dirname(os.path.realpath(__file__))
spec_path = os.path.join(current_script_directory, "spec.json")
specification = read_json(spec_path)
# form an Airbyte Message containing the spec and print it to stdout
airbyte_message = {"type": "SPEC", "spec": specification}
# json.dumps converts the JSON (Python dict) to a string
print(json.dumps(airbyte_message))
def run(args):
parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")
# Accept the spec command
subparsers.add_parser("spec", help="outputs the json configuration specification", parents=[parent_parser])
# Accept the check command
check_parser = subparsers.add_parser("check", help="checks the config used to connect", parents=[parent_parser])
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
# Accept the discover command
discover_parser = subparsers.add_parser("discover", help="outputs a catalog describing the source's schema", parents=[parent_parser])
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
# Accept the read command
read_parser = subparsers.add_parser("read", help="reads the source and outputs messages to STDOUT", parents=[parent_parser])
read_parser.add_argument("--state", type=str, required=False, help="path to the json-encoded state file")
required_read_parser = read_parser.add_argument_group("required named arguments")
required_read_parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
required_read_parser.add_argument(
"--catalog", type=str, required=True, help="path to the catalog used to determine which data to read"
)
parsed_args = main_parser.parse_args(args)
command = parsed_args.command
if command == "spec":
spec()
elif command == "check":
config_file_path = get_input_file_path(parsed_args.config)
config = read_json(config_file_path)
check(config)
elif command == "discover":
discover()
elif command == "read":
config = read_json(get_input_file_path(parsed_args.config))
configured_catalog = read_json(get_input_file_path(parsed_args.catalog))
read(config, configured_catalog)
else:
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
# had a failure
log_error("Invalid command. Allowable commands: [spec, check, discover, read]")
sys.exit(1)
# A zero exit code means the process successfully completed
sys.exit(0)
def main():
arguments = sys.argv[1:]
run(arguments)
if __name__ == "__main__":
main()
A full connector in about 200 lines of code. Not bad! We're now ready to package & test our connector then use it in the Airbyte UI.
3. Package the connector in a Docker image
Our connector is very lightweight, so the Dockerfile needed to run it is very light as well. Edit
the Dockerfile
as follows:
FROM python:3.9-slim
# We change to a directory unique to us
WORKDIR /airbyte/integration_code
# Install any needed Python dependencies
RUN pip install requests
# Copy source files
COPY source.py .
COPY spec.json .
# When this container is invoked, append the input argemnts to `python source.py`
ENTRYPOINT ["python", "/airbyte/integration_code/source.py"]
# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.name=airbyte/source-stock-ticker-api
LABEL io.airbyte.version=0.1.0
# In order to launch a source on Kubernetes in a pod, we need to be able to wrap the entrypoint.
# The source connector must specify its entrypoint in the AIRBYTE_ENTRYPOINT variable.
ENV AIRBYTE_ENTRYPOINT='python /airbyte/integration_code/source.py'
Once we save the Dockerfile
, we can build the image by running:
docker build . -t airbyte/source-stock-ticker-api:dev
To run any of our commands, we'll need to mount all the inputs into the Docker container first, then
refer to their mounted paths when invoking the connector. This allows the connector to access your
secrets without having to build them into the container. For example, we'd run check
or read
as
follows:
$ docker run airbyte/source-stock-ticker-api:dev spec
{"type": "SPEC", "spec": {"documentationUrl": "https://polygon.io/docs/stocks/get_v2_aggs_ticker__stocksticker__range__multiplier___timespan___from___to", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "required": ["stock_ticker", "api_key"], "properties": {"stock_ticker": {"type": "string", "title": "Stock Ticker", "description": "The stock ticker to track", "examples": ["AAPL", "TSLA", "AMZN"]}, "api_key": {"type": "string", "description": "The Polygon.io Stocks API key to use to hit the API.", "airbyte_secret": true}}}}}
$ docker run -v $(pwd)/secrets/valid_config.json:/data/config.json airbyte/source-stock-ticker-api:dev check --config /data/config.json
{'type': 'CONNECTION_STATUS', 'connectionStatus': {'status': 'SUCCEEDED'}}
$ docker run -v $(pwd)/secrets/valid_config.json:/data/config.json airbyte/source-stock-ticker-api:dev discover --config /data/config.json
{"type": "CATALOG", "catalog": {"streams": [{"name": "stock_prices", "supported_sync_modes": ["full_refresh"], "json_schema": {"properties": {"date": {"type": "string"}, "price": {"type": "number"}, "stock_ticker": {"type": "string"}}}}]}}
$ docker run -v $(pwd)/secrets/valid_config.json:/data/config.json -v $(pwd)/fullrefresh_configured_catalog.json:/data/fullrefresh_configured_catalog.json airbyte/source-stock-ticker-api:dev read --config /data/config.json --catalog /data/fullrefresh_configured_catalog.json
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-15', 'stock_ticker': 'TSLA', 'price': 633.25}, 'emitted_at': 1608628424000}}
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-16', 'stock_ticker': 'TSLA', 'price': 622.77}, 'emitted_at': 1608628424000}}
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-17', 'stock_ticker': 'TSLA', 'price': 655.9}, 'emitted_at': 1608628424000}}
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-18', 'stock_ticker': 'TSLA', 'price': 695}, 'emitted_at': 1608628424000}}
{'type': 'RECORD', 'record': {'stream': 'stock_prices', 'data': {'date': '2020-12-21', 'stock_ticker': 'TSLA', 'price': 649.86}, 'emitted_at': 1608628424000}}
4. Test the connector
The minimum requirement for testing your connector is to pass the Connector Acceptance Test suite. The connector acceptence test is a blackbox test suite containing a number of tests that validate your connector behaves as intended by the Airbyte Specification. You're encouraged to add custom test cases for your connector where it makes sense to do so e.g: to test edge cases that are not covered by the standard suite. But at the very least, your connector must pass Airbyte's acceptance test suite.
The code generator makes a minimal acceptance test configuration. Let's modify it as follows to
setup tests for each operation with valid and invalid credentials. Edit
acceptance-test-config.yaml
to look as follows:
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-stock-ticker-api:dev
acceptance_tests:
basic_read:
tests:
- config_path: secrets/valid_config.json
configured_catalog_path: fullrefresh_configured_catalog.json
empty_streams: []
connection:
tests:
- config_path: secrets/valid_config.json
status: succeed
- config_path: secrets/invalid_config.json
status: failed
discovery:
tests:
- config_path: secrets/valid_config.json
full_refresh:
tests:
- config_path: secrets/valid_config.json
configured_catalog_path: fullrefresh_configured_catalog.json
spec:
tests:
- config_path: secrets/valid_config.json
spec_path: spec.json
# incremental: # TODO uncomment this once you implement incremental sync in part 2 of the tutorial
# tests:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
To run the test suite, we'll use
airbyte-ci
.
You can build and install airbyte-ci
locally from Airbyte repository root by running make
.
Assuming you have it already:
airbyte-ci connectors --name=<name-of-your-connector></name-of-your-connector> --use-remote-secrets=false test
airbyte-ci
will build and then test your connector, and provide a report on the test results.
That's it! We've created a fully functioning connector. Now let's get to the exciting part: using it from the Airbyte UI.
Use the connector in the Airbyte UI
Let's recap what we've achieved so far:
- Implemented a connector
- Packaged it in a Docker image
- Ran Connector Acceptance Tests for the connector with
airbyte-ci
To use it from the Airbyte UI, we need to:
- Publish our connector's Docker image somewhere accessible by Airbyte Core (Airbyte's server, scheduler, workers, and webapp infrastructure)
- Add the connector via the Airbyte UI and setup a connection from our new connector to a local CSV file for illustration purposes
- Run a sync and inspect the output
1. Publish the Docker image
Since we're running this tutorial locally, Airbyte will have access to any Docker images available
to your local docker
daemon. So all we need to do is build & tag our connector. For real
production connectors to be available on Airbyte Cloud, you'd need to publish them on DockerHub.
Airbyte's build system builds and tags your connector's image correctly by default as part of the
connector's standard build
process. From the Airbyte repo root, run:
airbyte-ci connectors --name source-stock-ticker-api build
This is the equivalent of running docker build . -t airbyte/source-stock-ticker-api:dev
from the
connector root, where the tag airbyte/source-stock-ticker-api
is extracted from the label
LABEL io.airbyte.name
inside your Dockerfile
.
Verify the image was built by running:
$ docker images | head
REPOSITORY TAG IMAGE ID CREATED SIZE
airbyte/source-stock-ticker-api dev 9494ea93b7d0 16 seconds ago 121MB
<none> <none> 8fe5b49f9ae5 3 hours ago 121MB
<none> <none> 4cb00a551b3c 3 hours ago 121MB
<none> <none> 1caf57c72afd 3 hours ago 121MB
airbyte/source-stock-ticker-api
was built and tagged with the dev
tag. Now let's head to the
last step.
2. Add the connector via the Airbyte UI
If the Airbyte server isn't already running, start it by running from the Airbyte repository root:
docker compose up
When Airbyte server is done starting up, it prints the following banner in the log output (it can take 10-20 seconds for the server to start):
airbyte-server | 2022-03-11 18:38:33 INFO i.a.s.ServerApp(start):121 -
airbyte-server | ___ _ __ __
airbyte-server | / | (_)____/ /_ __ __/ /____
airbyte-server | / /| | / / ___/ __ \/ / / / __/ _ \
airbyte-server | / ___ |/ / / / /_/ / /_/ / /_/ __/
airbyte-server | /_/ |_/_/_/ /_.___/\__, /\__/\___/
airbyte-server | /____/
airbyte-server | --------------------------------------
airbyte-server | Now ready at http://localhost:8000/
airbyte-server | --------------------------------------
airbyte-server | Version: dev
airbyte-server |
After you see the above banner printed out in the terminal window where you are running
docker compose up
, visit http://localhost:8000 in your browser and log in
with the default credentials: username airbyte
and password password
.
Setup a connection to sync data in Airbyte
-
In the UI, click the "Settings" button in the left side bar. Then on the Settings page, select Sources.
-
Then on the Settings/Sources page, click "+ New Connector" button at the top right.
-
On the modal that pops up, enter the following information then click "Add"
-
After you click "Add", the modal will close and you will be back at the Settings page. Now click "Sources" in the navigation bar on the left:
-
You will be redirected to Sources page, which, if you have not set up any connections, will be empty. On the Sources page click "+ new source" in the top right corner:
-
A new modal will prompt you for details of the new source. Type "Stock Ticker" in the Name field. Then, find your connector in the Source type dropdown. We have lots of connectors already, so it might be easier to find your connector by typing part of its name.
-
After you select your connector in the Source type dropdown, the modal will show two more fields: API Key and Stock Ticker. Remember that
spec.json
file you created at the very beginning of this tutorial? These fields should correspond to theproperties
section of that file. Copy-paste your Polygon.io API key and a stock ticker into these fields and then click "Set up source" button at the bottom right of the modal. -
Once you click "Set up source", Airbyte will spin up your connector and run "check" method to verify the configuration. You will see a progress bar briefly and if the configuration is valid, you will see a success message, the modal will close and you will see your connector on the updated Sources page.
-
Next step is to add a destination. On the same page, click "add destination" and then click "+ add a new destination":
-
"New destination" wizard will show up. Type a name (e.g. "Local JSON") into the Name field and select "Local JSON" in Destination type drop-down. After you select the destination type, type
/local/tutorial_json
into Destination path field. When we run syncs, we'll find the output on our local filesystem in/tmp/airbyte_local/tutorial_json
. -
Click "Set up destination" at the lower right of the form.
-
After that Airbyte will test the destination and prompt you to configure the connection between Stock Ticker source and Local JSON destination. Select "Mirror source structure" in the Destination Namespace, check the checkbox next to the stock_prices stream, and click "Set up connection" button at the bottom of the form.
-
Ta-da! Your connection is now configured to sync once a day. Airbyte will run the first sync job as soon as your connection is saved. Navigate to "Connections" in the side bar and wait for the first sync to succeed:
Let's verify the output. From your shell, run:
$ cat /tmp/airbyte_local/tutorial_json/_airbyte_raw_stock_prices.jsonl
{"_airbyte_ab_id":"7383c6c1-783a-4a8a-a39c-3890ab562495","_airbyte_emitted_at":1647026803000,"_airbyte_data":{"date":"2022-03-04","stock_ticker":"TSLA","price":838.29}}
{"_airbyte_ab_id":"cf7dc8d9-1ece-4a40-a7d6-35cae54b94e5","_airbyte_emitted_at":1647026803000,"_airbyte_data":{"date":"2022-03-07","stock_ticker":"TSLA","price":804.58}}
{"_airbyte_ab_id":"da7da131-41d2-4ba7-bba1-1a0a5329a30a","_airbyte_emitted_at":1647026803000,"_airbyte_data":{"date":"2022-03-08","stock_ticker":"TSLA","price":824.4}}
{"_airbyte_ab_id":"20df0d78-5a5e-437b-95d8-aa57cf19fce1","_airbyte_emitted_at":1647026803000,"_airbyte_data":{"date":"2022-03-09","stock_ticker":"TSLA","price":858.97}}
{"_airbyte_ab_id":"0b7a8d33-4500-4a6d-9d74-11716bd22f01","_airbyte_emitted_at":1647026803000,"_airbyte_data":{"date":"2022-03-10","stock_ticker":"TSLA","price":838.3}}
Congratulations! We've successfully written a fully functioning Airbyte connector. You're an Airbyte contributor now ;)
- Follow the next tutorial to implement incremental sync.
- Implement another connector using the Low-code CDK, Connector Builder, or Connector Development Kit
- We welcome low-code configuration based connector contributions! If you make a connector in the connector builder and want to share it with everyone using Airbyte, pull requests are welcome!