Record processing
Connectors built with the connector builder always make HTTP requests, receive the responses and emit records. Besides making the right requests, it's important to properly hand over the records to the system:
- Extract the records (record selection)
- Do optional post-processing (transformations)
- Provide record meta data to the system to inform downstream processes (primary key and declared schema)
Record Selection
When doing HTTP requests, the connector expects the records to be part of the response JSON body. The "Record Selector" component of the stream can be used to configure how records should be extracted from the response body.
The Record Selector component contains a few different levers to configure this extraction:
- Field Path
- Record Filter
- Cast Record Fields to Schema Types
These will be explained below.
Field Path
The Field Path feature lets you define a path into the fields of the response to point to the part of the response which should be treated as the record(s).
Below are a few different examples of what this can look like depending on the API.
Top-level key pointing to array
Very often, the response body contains an array of records along with some suplementary information (for example meta data for pagination).
For example the "Most popular" NY Times API returns the following response body:
{ "status": "OK", "copyright": "Copyright (c) 2023 The New York Times Company. All Rights Reserved.", "num_results": 20, "results": [ { "uri": "nyt://article/c15e5227-ed68-54d9-9e5b-acf5a451ec37", "url": "https://www.nytimes.com/2023/04/16/us/science-of-reading-literacy-parents.html", "id": 100000008811231, "asset_id": 100000008811231, "source": "New York Times", // ... }, // .. ], // ... }
In this case, setting the Field Path to results
selects the array with the actual records, everything else is discarded.
Nested array
In some cases the array of actual records is nested multiple levels deep in the response, like for the "Archive" NY Times API:
{ "copyright": "Copyright (c) 2020 The New York Times Company. All Rights Reserved.", "response": { "docs": [ { "abstract": "From the Treaty of Versailles to Prohibition, the events of that year shaped America, and the world, for a century to come. ", "web_url": "https://www.nytimes.com/2018/12/31/opinion/1919-america.html", "snippet": "From the Treaty of Versailles to Prohibition, the events of that year shaped America, and the world, for a century to come. ", // ... }, // ... ] } }
In this case, setting the Field Path to response
,docs
selects the nested array.
Root array
In some cases, the response body itself is an array of records, like in the CoinAPI API:
[ { "symbol_id": "BITSTAMP_SPOT_BTC_USD", "time_exchange": "2013-09-28T22:40:50.0000000Z", "time_coinapi": "2017-03-18T22:42:21.3763342Z", // ... }, { "symbol_id": "BITSTAMP_SPOT_BTC_USD", "time_exchange": "2013-09-28T22:40:50.0000000Z", "time_coinapi": "2017-03-18T22:42:21.3763342Z", // .. } // ... ]
In this case, the Field Path can be omitted and the whole response becomes the list of records.
Single object
Sometimes, there is only one record returned per request from the API. In this case, the field path can also point to an object instead of an array which will be handled as the only record, like in the case of the Exchange Rates API:
{ "success": true, "historical": true, "date": "2013-12-24", "timestamp": 1387929599, "base": "GBP", "rates": { "USD": 1.636492, "EUR": 1.196476, "CAD": 1.739516 } }
In this case, setting the Field Path to rates
will yield a single record which contains all the exchange rates in a single object.
Fields nested in arrays
In some cases, records are located in multiple branches of the response object (for example within each item of an array):
{
"data": [
{
"record": {
"id": "1"
}
},
{
"record": {
"id": "2"
}
}
]
}
A Field Path with a placeholder *
selects all children at the current position in the path, so in this case setting Field Path to data
,*
,record
will return the following records:
[
{
"id": 1
},
{
"id": 2
}
]
Record Filter
In some cases, certain certain records should be excluded from the final output of the connector, which can be accomplished through the Record Filter feature within the Record Selector component.
For example, say your API response looks like this:
[
{
"id": 1,
"status": "pending"
},
{
"id": 2,
"status": "active"
},
{
"id": 3,
"status": "expired"
}
]
and you only want to sync records for which the status is not expired
.
You can accomplish this by setting the Record Filter to {{ record.status != 'expired' }}
Any records for which this expression evaluates to true
will be emitted by the connector, and any for which it evaluates to false
will be excluded from the output.
Note that Record Filter value must be an interpolated string with the filtering condition placed inside double curly braces {{ }}
.
Cast Record Fields to Schema Types
Sometimes the type of a field in the record is not the desired type. If the existing field type can be simply cast to the desired type, this can be solved by setting the stream's declared schema to the desired type and enabling Cast Record Fields to Schema Types
.
For example, say the API response looks like this:
[
{
"street": "Kulas Light",
"city": "Gwenborough",
"geo": {
"lat": "-37.3159",
"lng": "81.1496"
}
},
{
"street": "Victor Plains",
"city": "Wisokyburgh",
"geo": {
"lat": "-43.9509",
"lng": "-34.4618"
}
}
]
Notice that the lat
and lng
values are strings despite them all being numeric. If you would rather have these fields contain raw number values in your output records, you can do the following:
- In the Declared Schema tab, disable
Automatically import detected schema
- Change the
type
of thelat
andlng
fields fromstring
tonumber
- Enable
Cast Record Fields to Schema Types
in the Record Selector component
This will cause those fields in the output records to be cast to the type declared in the schema, so the output records will now look like this:
[
{
"street": "Kulas Light",
"city": "Gwenborough",
"geo": {
"lat": -37.3159,
"lng": 81.1496
}
},
{
"street": "Victor Plains",
"city": "Wisokyburgh",
"geo": {
"lat": -43.9509,
"lng": -34.4618
}
}
]
Note that this casting is performed on a best-effort basis; if you tried to set the city
field's type to number
in the schema, for example, it would remain unchanged because those string values cannot be cast to numbers.
Transformations
It is recommended to not change records during the extraction process the connector is performing, but instead load them into the downstream warehouse unchanged and perform necessary transformations there in order to stay flexible in what data is required. However there are some reasons that require the modifying the fields of records before they are sent to the warehouse:
- Remove personally identifiable information (PII) to ensure compliance with local legislation
- Pseudonymise sensitive fields
- Remove large fields that don't contain interesting information and significantly increase load on the system
The "transformations" feature can be used for these purposes.
Removing fields
To remove a field from a record, add a new transformation in the "Transformations" section of type "remove" and enter the field path. For example in case of the EmailOctopus API, the campaigns records also include the html content of the mailing which takes up a lot of space:
{
"data": [
{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"name": "Foo",
"subject": "Bar",
"from": {
"name": "John Doe",
"email_address": "[email protected]"
},
"content": {
"html": "<html>lots of text here...<html>",
"plain_text": "Lots of plain text here...."
},
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
},
]
}
Setting the "Path" of the remove-transformation to content
removes these fields from the records:
{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"from": {
"name": "John Doe",
"email_address": "[email protected]"
},
"name": "Foo",
"subject": "Bar",
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
}
Like in case of the record selector's Field Path, properties of deeply nested objects can be removed as well by specifying the path of properties to the target field that should be removed.
Removing fields that match a glob pattern
Imagine that regardless of which level a properties appears, it should be removed from the data. This can be achieved by adding a **
to the "Path" - for example "**
, name
" will remove all "name" fields anywhere in the record:
{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"from": {
"email_address": "[email protected]"
},
"subject": "Bar",
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
}
The *
character can also be used as a placeholder to filter for all fields that start with a certain prefix - the "Path" s*
will remove all fields from the top level that start with the character s:
{
"id": "00000000-0000-0000-0000-000000000000",
"from": {
"email_address": "[email protected]"
},
"created_at": "2023-04-13T15:28:37+00:00",
}
Adding fields
Adding fields can be used to apply a hashing function to an existing field to pseudonymize it. To do this, add a new transformation in the "Transformations" section of type "add" and enter the field path and the new value. For example in case of the EmailOctopus API, the campaigns records include the name of the sender:
{
"data": [
{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"name": "Foo",
"subject": "Bar",
"from": {
"name": "John Doe",
"email_address": "[email protected]"
},
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
},
]
}
To apply a hash function to it, set the "Path" to "from
, name
" to select the name property nested in the from object and set the value to {{ record['from']['name'] | hash('md5') }}
. This hashes the name in the record:
{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"name": "Foo",
"subject": "Bar",
"from": {
"name": "4c2a904bafba06591225113ad17b5cec",
"email_address": "[email protected]"
},
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
}
Another common use case of the "add" transformation is the enriching of records with their parent resource - check out the partitioning documentation for more details.
It's not recommended to use this feature to do projections (like concatenating firstname and lastname into a single "name" field) - in most cases it's beneficial to leave these tasks to a later stage in the data pipeline.
Meta data
Besides bringing the records in the right shape, it's important to communicate some pieces of meta data about records to the downstream system so they can be handled properly.
Primary key
The "Primary key" field specifies how to uniquely identify a record. This is important for downstream de-duplication of records (e.g. by the incremental sync - Append + Deduped sync mode).
In a lot of cases, like for the EmailOctopus example from above, there is a dedicated id field that can be used for this purpose. It's important that the value of the id field is guaranteed to only occur once for a single record.
In some cases there is no such field but a combination of multiple fields is guaranteed to be unique, for example the shipping zone locations of the Woocommerce API do not have an id, but each combination of the code
and type
fields is guaranteed to be unique:
[
{
"code": "BR",
"type": "country"
},
{
"code": "DE",
"type": "country"
},
]
In this case, the "Primary key" can be set to "code
, type
" to allow automatic downstream deduplication of records based on the value of these two fields.
Declared schema
Similar to the "Primary key", the "Declared schema" defines how the records will be shaped via a JSON Schema definition. It defines which fields and nested fields occur in the records, whether they are always available or sometimes missing and which types they are.
This information is used by the Airbyte system for different purposes:
- Column selection when configuring a connection - in Airbyte cloud, the declared schema allows the user to pick which columns/fields are passed to the destination to dynamically reduce the amount of synced data
- Recreating the data structure with right columns in destination - this allows a warehouse destination to create a SQL table which the columns matching the fields of records
- Detecting schema changes - if the schema of a stream changes for an existing connection, this situation can be handled gracefully by Airbyte instead of causing errors in the destination
When doing test reads, the connector builder analyzes the test records and shows the derived schema in the "Detected schema" tab. By default, new streams are configured to automatically import the detected schema into the declared schema on every test read.
This behavior can be toggled off by disabling the Automatically import declared schema
switch, in which case the declared schema can be manually edited in the UI and it will no longer be automatically updated when triggering test reads.
For example the following test records:
[
{
"id": "40205efe-5f94-11ed-aa11-7d1ac831a909",
"status": "SENT",
"subject": "Hello from Integration Test",
"created_at": "2022-11-08T18:36:25+00:00",
"sent_at": "2022-11-08T18:36:55+00:00"
},
{
"id": "91546616-5ef0-11ed-90c7-fbeacb2ee1eb",
"status": "SENT",
"subject": "Hello my first campaign",
"created_at": "2022-11-07T23:04:44+00:00",
"sent_at": "2022-11-08T12:48:27+00:00"
}
]
result in the following schema:
{
"$schema": "http://json-schema.org/schema#",
"properties": {
"created_at": {
"type": "string"
},
"id": {
"type": "string"
},
"sent_at": {
"type": "string"
},
"status": {
"type": "string"
},
"subject": {
"type": "string"
}
},
"type": "object"
}
More strict is always better, but the detected schema is a good default to rely on. See the documentation about supported data types for JSON schema structures that will be picked up by the Airbyte system.
If Automatically import detected schema
is disabled, and the declared schema deviates from the detected schema, the "Detected schema" tab in the testing panel highlights the differences. It's important to note that differences are not necessarily a problem that needs to be fixed - in some cases the currently loaded set of records in the testing panel doesn't feature all possible cases so the detected schema is too strict. However, if the declared schema is incompatible with the detected schema based on the test records, it's very likely there will be errors when running syncs.
In the case of the example above, there are two differences between detected and declared schema. The first difference for the name
field is not problematic:
"name": {
- "type": [
- "string",
- "null"
- ]
+ "type": "null"
},
The declared schema allows the null
value for the name while the detected schema only encountered strings. If it's possible the name
is set to null, the detected schema is configured correctly.
The second difference will likely cause problems:
"subject": {
- "type": "number"
+ "type": "string"
}
The subject
field was detected as string
, but is configured to be a number
in the declared schema. As the API returned string subjects during testing, it's likely this will also happen during syncs which would render the declared schema inaccurate. Depending on the situation this can be fixed in multiple ways:
- If the API changed and subject is always a string now, the declared schema should be updated to reflect this:
"subject": { "type": "string" }
- If the API is sometimes returning subject as number of string depending on the record, the declared schema should be updated to allow both data types:
"subject": { "type": ["string","number"] }
A common situation is that certain record fields do not have any any values for the test read data, so they are set to null
. In the detected schema, these field are of type "null"
which is most likely not correct for all cases. In these situations, the declared schema should be manually corrected.