Skip to main content

Postgres

Features

FeatureSupportedNotes
Full Refresh SyncYes
Incremental - Append SyncYes
Replicating ViewsYes
Replicate Incremental DeletesYes
Logical Replication (WAL)Yes
SSL SupportYes
SSH Tunnel ConnectionYes
NamespacesYesEnabled by default.
Custom TypesYes
ArraysYesByte-arrays are not supported yet.
Generating an RSA Private KeyNoComing Soon.
Schema SelectionYesThe 'public' schema is set by default. Multiple schemas may be used at one time. No schemas set explicitly - will sync all of existing.

The Postgres source does not alter the schema present in your database. Depending on the destination connected to this source, however, the schema may be altered. See the destination's documentation for more details.

Getting Started (Airbyte Cloud)

On Airbyte Cloud, only TLS connections to your Postgres instance are supported. Other than that, you can proceed with the open-source instructions below.

Getting Started (Airbyte Open-Source)

Requirements

  1. Postgres v9.3.x or above
  2. Allow connections from Airbyte to your Postgres database (if they exist in separate VPCs)
  3. Create a dedicated read-only Airbyte user with access to all tables needed for replication

1. Make sure your database is accessible from the machine running Airbyte

This is dependent on your networking setup. The easiest way to verify if Airbyte is able to connect to your Postgres instance is via the check connection tool in the UI.

This step is optional but highly recommended for better permission control and auditing. Alternatively, you can use Airbyte with an existing user in your database.

To create a dedicated database user, run the following commands against your database:

CREATE USER <username> PASSWORD 'your_password_here';

Then give it access to the relevant schema:

GRANT USAGE ON SCHEMA <schema_name> TO <username>

Note that to replicate data from multiple Postgres schemas, you can re-run the command above to grant access to all the relevant schemas, but you'll need to set up multiple sources connecting to the same db on multiple schemas.

Next, grant the user read-only access to the relevant tables. The simplest way is to grant read access to all tables in the schema as follows:

GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO airbyte;

-- Allow user to see tables created in the future
ALTER DEFAULT PRIVILEGES IN SCHEMA <schema_name> GRANT SELECT ON TABLES TO <username>;

Syncing a subset of columns

Currently, there is no way to sync a subset of columns using the Postgres source connector.

  • When setting up a connection, you can only choose which tables to sync, but not columns.
  • If the user can only access a subset of columns, the connection check will pass. However, the data sync will fail with a permission denied exception.

The short-term workaround for partial table syncing is to create a view on the specific columns, and grant the user read access to that view:

CREATE VIEW <view_name> as SELECT <columns> FROM <table>;
GRANT SELECT ON TABLE <view_name> IN SCHEMA <schema_name> to <user_name>;

This issue is tracked in #9771.

3. (Advanced) Configure Additional JDBC URL Parameters

This is an advanced configuration option. Users are advised to use it with caution. If you need to customize the JDBC connection beyond common options you can specify additional JDBC URL parameters in JDBC URL Params field. The value of JDBC URL Params field should have the form of key-value pairs separated by the symbol &.

E.g. key1=value1&key2=value2&key3=value3

These parameters will be added at the end of the JDBC URL that the AirByte will use to connect to your Postgres database.

Do not use any of the following keys in JDBC URL Params field, because they will be overwritten by Airbyte:

  • currentSchema
  • user
  • password
  • ssl
  • sslmode

You can read about JDBC URL parameters supported by Postgres in Postgres JDBC Driver Documentation.

4. Optionally, set up CDC. Follow the guide below to do so.

5. That's it!

Your database user should now be ready for use with Airbyte.

Change Data Capture (CDC) / Logical Replication / WAL Replication

We use logical replication of the Postgres write-ahead log (WAL) to incrementally capture deletes using a replication plugin.

We use pgoutput as a default plugin, which is included in Postgres 10+. Also wal2json plugin is supported, please read the section on replication plugins below for more information.

Please read the CDC docs for an overview of how Airbyte approaches CDC.

Should I use CDC for Postgres?

  • If you need a record of deletions and can accept the limitations posted below, you should to use CDC for Postgres.
  • If your data set is small and you just want snapshot of your table in the destination, consider using Full Refresh replication for your table instead of CDC.
  • If the limitations prevent you from using CDC and your goal is to maintain a snapshot of your table in the destination, consider using non-CDC incremental and occasionally reset the data and re-sync.
  • If your table has a primary key but doesn't have a reasonable cursor field for incremental syncing (i.e. updated_at), CDC allows you to sync your table incrementally.

CDC Limitations

  • Make sure to read our CDC docs to see limitations that impact all databases using CDC replication.
  • CDC is only available for Postgres 10+.
  • Airbyte requires a replication slot configured only for its use. Only one source should be configured that uses this replication slot. Instructions on how to set up a replication slot can be found below.
  • Log-based replication only works for master instances of Postgres.
  • Using logical replication increases disk space used on the database server. The additional data is stored until it is consumed.
    • We recommend setting frequent syncs for CDC in order to ensure that this data doesn't fill up your disk space.
    • If you stop syncing a CDC-configured Postgres instance to Airbyte, you should delete the replication slot. Otherwise, it may fill up your disk space.
  • Our CDC implementation uses at least once delivery for all change records.

Setting up CDC for Postgres

1. Enable logical replication

Follow one of these guides to enable logical replication:

2. Add user-level permissions

We recommend using a user specifically for Airbyte's replication so you can minimize access. This Airbyte user for your instance needs to be granted REPLICATION and LOGIN permissions. You can create a role with CREATE ROLE <name> REPLICATION LOGIN; and grant that role to the user. You still need to make sure the user can connect to the database, use the schema, and to use SELECT on tables (the same are required for non-CDC incremental syncs and all full refreshes).

3. Select replication plugin

We recommend using a pgoutput plugin as it is the standard logical decoding plugin in Postgres. In case the replication table contains a lot of big JSON blobs and table size exceeds 1 GB, we recommend using a wal2json instead. Please note that wal2json may require additional installation for Bare Metal, VMs (EC2/GCE/etc), Docker, etc. For more information read wal2json documentation.

4. Create replication slot

Next, you will need to create a replication slot. Here is the query used to create a replication slot called airbyte_slot:

SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');

If you would like to use wal2json plugin, please change pgoutput to wal2json value in the above query.

5. Create publications and replication identities for tables

For each table you want to replicate with CDC, you should add the replication identity (the method of distinguishing between rows) first. We recommend using ALTER TABLE tbl1 REPLICA IDENTITY DEFAULT; to use primary keys to distinguish between rows. After setting the replication identity, you will need to run CREATE PUBLICATION airbyte_publication FOR TABLE <tbl1, tbl2, tbl3>;. This publication name is customizable. Please refer to the Postgres docs if you need to add or remove tables from your publication in the future.

Please note that:

  • You must add the replication identity before creating the publication. Otherwise, ALTER/UPDATE/DELETE statements may fail if Postgres cannot determine how to uniquely identify rows.
  • The publication should include all the tables and only the tables that need to be synced. Otherwise, data from these tables may not be replicated correctly.

The UI currently allows selecting any tables for CDC. If a table is selected that is not part of the publication, it will not replicate even though it is selected. If a table is part of the publication but does not have a replication identity, that replication identity will be created automatically on the first run if the Airbyte user has the necessary permissions.

5. Create replication slot

Next, you will need to create a replication slot. It's important to create the publication first (as in step 4) before creating the replication slot. Otherwise, you can run into exceptions if there is any update to the database between the creation of the two.

Here is the query used to create a replication slot called airbyte_slot:

SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');

If you would like to use wal2json plugin, please change pgoutput to wal2json value in the above query.

6. Start syncing

When configuring the source, select CDC and provide the replication slot and publication you just created. You should be ready to sync data with CDC!

CDC on Bare Metal, VMs (EC2/GCE/etc), Docker, etc.

Some settings must be configured in the postgresql.conf file for your database. You can find the location of this file using psql -U postgres -c 'SHOW config_file' withe the correct psql credentials specified. Alternatively, a custom file can be specified when running postgres with the -c flag. For example postgres -c config_file=/etc/postgresql/postgresql.conf runs Postgres with the config file at /etc/postgresql/postgresql.conf.

If you are syncing data from a server using the postgres Docker image, you will need to mount a file and change the command to run Postgres with the set config file. If you're just testing CDC behavior, you may want to use a modified version of a sample postgresql.conf.

  • wal_level is the type of coding used within the Postgres write-ahead log. This must be set to logical for Airbyte CDC.
  • max_wal_senders is the maximum number of processes used for handling WAL changes. This must be at least one.
  • max_replication_slots is the maximum number of replication slots that are allowed to stream WAL changes. This must one if Airbyte will be the only service reading subscribing to WAL changes or more if other services are also reading from the WAL.

Here is what these settings would look like in postgresql.conf:

wal_level = logical
max_wal_senders = 1
max_replication_slots = 1

After setting these values you will need to restart your instance.

Finally, follow the rest of steps above.

CDC on AWS Postgres RDS or Aurora

  • Go to the Configuration tab for your DB cluster.
  • Find your cluster parameter group. You will either edit the parameters for this group or create a copy of this parameter group to edit. If you create a copy you will need to change your cluster's parameter group before restarting.
  • Within the parameter group page, search for rds.logical_replication. Select this row and click on the Edit parameters button. Set this value to 1.
  • Wait for a maintenance window to automatically restart the instance or restart it manually.
  • Finally, follow the rest of steps above.

CDC on Azure Database for Postgres

Use either the Azure CLI to:

az postgres server configuration set --resource-group group --server-name server --name azure.replication_support --value logical
az postgres server restart --resource-group group --name server

Finally, follow the rest of steps above.

Connection via SSH Tunnel

Airbyte has the ability to connect to a Postgres instance via an SSH Tunnel. The reason you might want to do this because it is not possible (or against security policy) to connect to the database directly (e.g. it does not have a public IP address).

When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server (a.k.a. a bastion sever) that does have direct access to the database. Airbyte connects to the bastion and then asks the bastion to connect directly to the server.

Using this feature requires additional configuration, when creating the source. We will talk through what each piece of configuration means.

  1. Configure all fields for the source as you normally would, except SSH Tunnel Method.
  2. SSH Tunnel Method defaults to No Tunnel (meaning a direct connection). If you want to use an SSH Tunnel choose SSH Key Authentication or Password Authentication.
    1. Choose Key Authentication if you will be using an RSA Private as your secrets for establishing the SSH Tunnel (see below for more information on generating this key).
    2. Choose Password Authentication if you will be using a password as your secret for establishing the SSH Tunnel.
  3. SSH Tunnel Jump Server Host refers to the intermediate (bastion) server that Airbyte will connect to. This should be a hostname or an IP Address.
  4. SSH Connection Port is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is 22, so unless you have explicitly changed something, go with the default.
  5. SSH Login Username is the username that Airbyte should use when connection to the bastion server. This is NOT the Postgres username.
  6. If you are using Password Authentication, then SSH Login Username should be set to the password of the User from the previous step. If you are using SSH Key Authentication leave this blank. Again, this is not the Postgres password, but the password for the OS-user that Airbyte is using to perform commands on the bastion.
  7. If you are using SSH Key Authentication, then SSH Private Key should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with -----BEGIN RSA PRIVATE KEY----- and ending with -----END RSA PRIVATE KEY-----.

Generating an RSA Private Key

The connector expects an RSA key in PEM format. To generate this key:

ssh-keygen -t rsa -m PEM -f myuser_rsa

This produces the private key in pem format, and the public key remains in the standard format used by the authorized_keys file on your bastion host. The public key should be added to your bastion host to whichever user you want to use with Airbyte. The private key is provided via copy-and-paste to the Airbyte connector configuration screen, so it may log in to the bastion.

Data type mapping

According to Postgres documentation, Postgres data types are mapped to the following data types when synchronizing data. You can check the test values examples here. If you can't find the data type you are looking for or have any problems feel free to add a new test!

Postgres TypeResulting TypeNotes
bigintnumber
bigserial, serial8number
bitstringFixed-length bit string (e.g. "0100").
bit varying, varbitstringVariable-length bit string (e.g. "0100").
boolean, boolboolean
boxstring
byteastringVariable length binary string with hex output format prefixed with "\x" (e.g. "\x6b707a").
character, charstring
character varying, varcharstring
cidrstring
circlestring
datestringParsed as ISO8601 date time at midnight
double precision, float, float8numberInfinity, -Infinity, and NaN are not supported and converted to null. Issue: #8902.
hstorestring
inetstring
integer, int, int4number
intervalstring
jsonstring
jsonbstring
linestring
lsegstring
macaddrstring
macaddr8string
moneynumber
numeric, decimalnumberInfinity, -Infinity, and NaN are not supported and converted to null. Issue: #8902.
pathstring
pg_lsnstring
pointstring
polygonstring
real, float4number
smallint, int2number
smallserial, serial2number
serial, serial4number
textstring
timestringParsed as a time string without a time-zone in the ISO-8601 calendar system.
timetzstringParsed as a time string with time-zone in the ISO-8601 calendar system.
timestampstringParsed as a date-time string without a time-zone in the ISO-8601 calendar system.
timestamptzstringParsed as a date-time string with time-zone in the ISO-8601 calendar system.
tsquerystring
tsvectorstring
uuidstring
xmlstring
enumstring
tsrangestring
arrayarrayE.g. "[\"10001\",\"10002\",\"10003\",\"10004\"]".
composite typestring

Troubleshooting

Sync data from Postgres hot standby server

when the connector is reading from a Postgres replica that is configured as a Hot Standby, any update from the primary server will terminate queries on the replica after a certain amount of time, default to 30 seconds. This default waiting time is obviously not enough to sync any meaning amount of data. See the Handling Query Conflicts section in the Postgres documentation for detailed explanations.

Here is the typical exception:

Caused by: org.postgresql.util.PSQLException: FATAL: terminating connection due to conflict with recovery
Detail: User query might have needed to see row versions that must be removed.
Hint: In a moment you should be able to reconnect to the database and repeat your command.

Possible solutions include:

  • [Recommended] Set hot_standby_feedback to true on the replica server. This parameter will prevent the primary server from deleting the write-ahead logs when the replica is busy serving user queries. However, the downside is that the write-ahead log will increase in size.
  • [Recommended] Sync data when there is no update running in the primary server, or sync data from the primary server.
  • [Not Recommended] Increase max_standby_archive_delay and max_standby_streaming_delay to be larger than the amount of time needed to complete the data sync. However, it is usually hard to tell how much time it will take to sync all the data. This approach is not very practical.

One optimization on the Airbyte side is to break one large and long sync into multiple small ones. This improvement is tracked in https://github.com/airbytehq/airbyte/issues/13783.

Changelog

VersionDatePull RequestSubject
0.4.302022-06-3014251Use more simple and comprehensive query to get selectable tables
0.4.292022-06-2914265Upgrade postgresql JDBC version to 42.3.5
0.4.282022-06-2314077Use the new state management
0.4.262022-06-1713864Updated stacktrace format for any trace message errors
0.4.252022-06-1513823Publish adaptive postgres source that enforces ssl on cloud + Debezium version upgrade to 1.9.2 from 1.4.2
0.4.242022-06-1413549Fixed truncated precision if the value of microseconds or seconds is 0
0.4.232022-06-1313655Fixed handling datetime cursors when upgrading from older versions of the connector
0.4.222022-06-0913655Fixed bug with unsupported date-time datatypes during incremental sync
0.4.212022-06-0613435Adjust JDBC fetch size based on max memory and max row size
0.4.202022-06-0213367Added convertion hstore to json format
0.4.192022-05-2513166Added timezone awareness and handle BC dates
0.4.182022-05-2513083Add support for tsquey type
0.4.172022-05-1913016CDC modify schema to allow null values
0.4.162022-05-1412840Added custom JDBC parameters field
0.4.152022-05-1312834Fix the bug that the connector returns empty catalog for Azure Postgres database
0.4.142022-05-0812689Add table retrieval according to role-based SELECT privilege
0.4.132022-05-0510230Explicitly set null value for field in json
0.4.122022-04-2912480Query tables with adaptive fetch size to optimize JDBC memory consumption
0.4.112022-04-1111729Bump mina-sshd from 2.7.0 to 2.8.0
0.4.102022-04-0811798Fixed roles for fetching materialized view processing
0.4.82022-02-2110242Fixed cursor for old connectors that use non-microsecond format. Now connectors work with both formats
0.4.72022-02-1810242Updated timestamp transformation with microseconds
0.4.62022-02-1410256(unpublished) Add -XX:+ExitOnOutOfMemoryError JVM option
0.4.52022-02-0810173Improved discovering tables in case if user does not have permissions to any table
0.4.42022-01-269807Update connector fields title/description
0.4.32022-01-249554Allow handling of java sql date in CDC
0.4.22022-01-139360Added schema selection
0.4.12022-01-059116Added materialized views processing
0.4.02021-12-138726Support all Postgres types
0.3.172021-12-018371Fixed incorrect handling "\n" in ssh key
0.3.162021-11-287995Fixed money type with amount > 1000
0.3.152021-11-268066Fixed the case, when Views are not listed during schema discovery
0.3.142021-11-178010Added checking of privileges before table internal discovery
0.3.132021-10-267339Support or improve support for Interval, Money, Date, various geometric data types, inventory_items, and others
0.3.122021-09-306585Improved SSH Tunnel key generation steps
0.3.112021-09-025742Add SSH Tunnel support
0.3.92021-08-175304Fix CDC OOM issue
0.3.82021-08-134699Added json config validator
0.3.42021-06-093973Add AIRBYTE_ENTRYPOINT for Kubernetes support
0.3.32021-06-083960Add method field in specification parameters
0.3.22021-05-263179Remove isCDC logging
0.3.12021-04-212878Set defined cursor for CDC
0.3.02021-04-212990Support namespaces
0.2.72021-04-162923SSL spec as optional
0.2.62021-04-162757Support SSL connection
0.2.52021-04-122859CDC bugfix
0.2.42021-04-092548Support CDC
0.2.32021-03-282600Add NCHAR and NVCHAR support to DB and cursor type casting
0.2.22021-03-262460Destination supports destination sync mode
0.2.12021-03-182488Sources support primary keys
0.2.02021-03-092238Protocol allows future/unknown properties
0.1.132021-02-021887Migrate AbstractJdbcSource to use iterators
0.1.122021-01-251746Fix NPE in State Decorator
0.1.112021-01-251765Add field titles to specification
0.1.102021-01-191724Fix JdbcSource handling of tables with same names in different schemas
0.1.92021-01-141655Fix JdbcSource OOM
0.1.82021-01-131588Handle invalid numeric values in JDBC source
0.1.72021-01-081307Migrate Postgres and MySql to use new JdbcSource
0.1.62020-12-091172Support incremental sync
0.1.52020-11-301038Change JDBC sources to discover more than standard schemas
0.1.42020-11-301046Add connectors using an index YAML file