PostgreSQL

The source connector for PostgreSQL can extract data change events from a PostgreSQL database system in real-time and stream them to the DataCater platform.

The PostgreSQL source connector is available in two variants:

  • The Logical replication-based connector is based on Debezium and uses the logical replication feature of PostgreSQL for extracting INSERTs, UPDATEs, and DELETEs.
  • The JDBC-based connector runs queries against the PostgreSQL database to extract change events. It can extract INSERTs and UPDATEs.

Requirements

Logical replication is available since PostgreSQL version 9.4.

PostgreSQL implements logical replication on its own without requiring the installation of plugins since version 10. Prior PostgreSQL versions require the installation of a plugin, either decoderbufs or wal2json, to support logical replication.


Preparing PostgreSQL (Logical Replication)

Setup user

Create a new PostgreSQL user with the required permissions:

psql> CREATE ROLE [username] SUPERUSER LOGIN PASSWORD '[password]';

Superusers of PostgreSQL have the required permissions assigned by default.

Enable Logical Replication

Add the following lines to the config file of your PostgreSQL installation, typically called postgresql.conf, to enable Logical Replication:

wal_level = logical
max_wal_senders = 1
# set max_replication_slots to the estimated number of pipelines consuming this data source
max_replication_slots = 5
Allow connections from DataCater

Allow the DataCater platform to connect to your PostgreSQL installation by adding an entry to the PostgreSQL configuration file pg_hba.conf.


Preparing PostgreSQL (JDBC)

Setup user

Create a new PostgreSQL user with read-only permissions.

Allow connections from DataCater

Allow the DataCater platform to connect to your PostgreSQL installation by adding an entry to the PostgreSQL configuration file pg_hba.conf.


Configuration

This source connector supports the following configuration options:

General > Hostname or IP

The hostname or IP address of the machine where the PostgreSQL database system is running.

General > Port

The port under which PostgreSQL is available (default: 5432).

General > SSL

Whether to use SSL when connecting to the PostgreSQL database or not.

General > Username

The username used for authenticating with PostgreSQL.

General > Password

The password of the user used for authenticating with PostgreSQL.

General > Database name

The name of the database that should be used as a source.

General > Schema name

The name of the database schema that should be used as a source (default: public).

General > Table name

The name of the database table that should be used as a source. You may retrieve the list of tables available in the given PostgreSQL database by clicking on Fetch table names.

General > Automatically detect primary key column

Whether to automatically profile the primary key column using the PostgreSQL system tables pg_attribute and pg_index or not.

General > Primary key column

Only available if the automated detection of the primary key column is disabled. Name of the attribute that uniquely identifies records. DataCater uses the primary key attribute to detect new records. Please make sure that the column does not hold NULL values.

Change Data Capture > Connector variant

You may choose between the Logical Replication-based and JDBC-based connector variant.

Change Data Capture > Logical replication plugin

Only available for the Logical Replication-based connector.

The logical replication plugin to be used to consume change events from PostgreSQL (default: pgoutput). </p>

If you are using PostgreSQL 10 or newer, please use pgoutput.

If you are using PostgreSQL 9.4 or newer, please use decoderbufs, wal2json, or wal2json Streaming.

If you are using Amazon RDS for PostgreSQL, please use wal2json for Amazon RDS or wal2json Streaming for Amazon RDS.

Change Data Capture > Change Data Capture mode

Only available for the JDBC-based connector.

You may choose one of the following modes for change data capture:

  • BULK: Fetch all data at each sync.
  • INCREMENTING: Use the primary key column to fetch data that have been inserted since the last sync. This mode does only extract INSERTs, but skips UPDATEs and DELETEs.
  • TIMESTAMP/INCREMENTING: Use the primary key column and the timestamp column, specified using the Timestamp column configuration options, to fetch data that have been inserted or updated since the last sync. This mode does only extract INSERTs and UPDATEs, but skips DELETEs.
Change Data Capture > Timestamp column

Only available for the JDBC-based connector.

DataCater can use a timestamp column, which stores the time of the most recent update of a record, to detect record updates. Specifying the timestamp column is required when using TIMESTAMP/INCREMENTING as Change Data Capture mode.

Change Data Capture > Sync interval

Only available for the JDBC-based connector.

The interval in seconds between synchronizations of the PostgreSQL table and DataCater (default: 60).

Advanced > SELECT statement for initial snapshot

Only available for the Logical replication-based connector. Overwrites the SELECT statement used for the initial snapshot (default: SELECT * FROM schema_name.table_name;).

Advanced > Comma-separated list of columns to exclude

List of columns that shall not be considered for the data extraction (default: empty).


Data Types

The following table shows the mapping between PostgreSQL data types and the data types used by DataCater.

PostgreSQL data type DataCater data type
BIGINT Long
BIGSERIAL Long
BIT Boolean
BIT VARYING String
BOOLEAN Boolean
BOX String
BYTEA String
CHAR String
CHARACTER VARYING String
CHARACTER String
CIDR String
CIRCLE String
CITEXT String
DATE Date
DATERANGE String
DECIMAL Double
DOUBLE PRECISION Double
ENUM String
INET String
INT4RANGE String
INT8RANGE String
INTEGER Int
INTERVAL String
JSON String
JSONB String
LINE String
LSEG String
LTREE String
MACADDR String
MONEY Double
NUMERIC Double
NUMRANGE String
PATH String
POINT String
POLYGON String
REAL Float
SMALLINT Int
SMALLSERIAL Int
SERIAL Int
TEXT String
TIME Time
TIMETZ Time
TIME WITH TIME ZONE Time
TIMESTAMP Timestamp
TIMESTAMP WITHOUT TIME ZONE Timestamp
TIMESTAMPTZ Timestamp
TIMESTAMP WITH TIME ZONE Timestamp
TSQUERY String
TSRANGE String
TSTZRANGE String
TSVECTOR String
TXID_SNAPSHOT String
UUID String
VARCHAR String
XML String