Querying change data capture events with cloud data warehouses

Learn how to build consistent snapshots of CDC events that were captured from transactional database systems.

...
By Michael Aboagye

Introduction

Data is essential to every organization interested in making business decisions on real-time data. There is, however, a challenge at stake when managing change data capture (CDC) events. With CDC, every INSERT, UPDATE, or DELETE is captured from the data source. For this, we need to ensure that we have a consistent snapshot of the data source. In this article, we discuss how to manage and query CDC events by building an exemplary pipeline that streams CDC events from a PostgreSQL database to the cloud data warehouse, Google Cloud BigQuery.

What is CDC?

In data engineering, CDC means change data capture. Every insert, update, or delete in a data source is recorded, captured, and then transferred to a downstream data sink, like a cloud data warehouse (DWH).

Technically, cloud data warehouses, like Google Cloud BigQuery or Clickhouse, seem to favor CDC events because of the following:

  • Unlike other data systems where primary keys are used to prevent duplicates from occuring in a table, cloud DWHs do not support primary keys. This makes it possible to query transactions over a period of time.
  • In most cloud DWHs, tables are append-only which makes data immutable. For instance, Google Cloud BigQuery does not overwrite existing rows when inserting new data but appends new rows to existing tables. So there is nothing like a deleted or expired row.
  • One can use most cloud DWHs to store CDC events as changelogs, which makes it possible to take notes of all updates within a specific period of time.

These features make it possible to get a consistent view of the data source. The following defines a number of ways to implement CDC systems.

Benefits of CDC for cloud data warehousing

Below are some of benefits to expect or look forward to when you decide to implement CDC based systems

  • Change data capture eliminates the need for bulk-uploading data into your data warehouse.
  • Log-based CDC prevents performance impacts on data source systems since there is no usage of the query layer for extracting data. Instead, log-based CDC consists of reading change events from the replication log of the database system.
  • CDC makes it possible to synchronize data systems located in different geographical areas, in real-time.

Streaming CDC events from PostgreSQL to BigQuery

In the first step, we use DataCater to build a data pipeline that streams CDC events from a PostgreSQL database to BigQuery. In the second step, we show how to query and build a consistent snapshot of the CDC events stored in BigQuery.

Set up PostgreSQL as the data source

DataCater offers a number of data systems to set up a data source. For this project, we decided to use PostgreSQL since it is a very popular open-source choice for managing business data. This demo should work with any managed PostgreSQL service. The only requirements are that it offers PostgreSQL version 10 or newer and that it allows accessing the logical replication (provided by the pgoutput plugin). Once you have created a free trial with DataCater and signed into your account, click on the Data Sources link at the top of your home dashboard. Then click on the Create Data Source button and select PostgreSQL:

Create new data source

Next, assign preferred values to the following parameters below:

Configure the PostgreSQL data source

  • Hostname: This parameter defines the hostname or IP Address of your database server.
  • Port: This parameter defines the port number of your database server. By default, PostgreSQL uses port number 5432.
  • SSL: This parameter defines whether to use SSL or not for connecting to your PostgreSQL instance.
  • Username: This parameter specifies the username (or role) to use for connecting to your PostgreSQL database server. Make sure that the user has all required permissions for extracting data from the replication log, especially the REPLICATION permission. Check this documentation for detailed information on how to inherit privileges or add users to defined roles.
  • Password: This parameter defines the password of the user.
  • Database Name: This parameter defines the name of the PostgreSQL database of interest.
  • Schema Name: This parameter defines the name of the PostgreSQL schema of interest. By the default, PostgreSQL uses the schema public.
  • Table Name: This parameter defines the name of the PostgreSQL table of interest.

Afterwards, test the connection to the Postgresql database server by clicking on the Test connection button. You can then click on the Create data source button to finish setting up PostgreSQL as the data source.

Set up Google Cloud BigQuery as the data sink

You need a Google Cloud platform account to set up BigQuery as the data sink. In addition, ensure that you have a BigQuery dataset already available

Click on the Data Sinks link at the top and then choose Create Data Sink. Next, select Google Cloud BigQuery.

Configure the BigQuery data sink

You need to specify preferred values for the following parameters:

  • Google Cloud credentials: This parameter defines the credentials of the GCP service account in a JSON format.
  • Project Name: This parameter defines the name of the Google Cloud project. It gets automatically filled when you enter the Google Cloud credentials.
  • Dataset Name: This parameter specifies the name of the BigQuery dataset.
  • Table Name: This parameter defines the name of the BigQuery table.
  • Table Partitioning: This parameter defines the method to partition tables.
  • Column Partitioning: If using a column for the partitioning, this parameter defines the name of the to-be-used column.

Create pipeline for streaming CDC events

Now it’s time to connect the data source and the data sink together via a pipeline. Navigate to Pipelines and click on the Create pipeline button to create a pipeline for streaming the CDC Events.

Then select your preferred data source as shown via the screenshot below:

Create a pipeline

Once the pipeline designer is loaded, navigate to Data sink and choose the BigQuery sink we created.

Next, it is time to deploy the pipeline. Navigate to Deploy and click on the Create deployment button. Once the deployment has been built, you can activate the pipeline by starting the deployment.

Create a deployment and activate the pipeline

You can confirm that the CDC pipeline has been deployed by viewing its logs.

Managing and querying CDC events in BigQuery

There are a number of ways to manage CDC events in a cloud data warehouse, like Google Cloud BigQuery. This section discusses how to manage CDC events in BigQuery using the consistent immediate approach, originally brought up by the folks at WePay, which makes it possible to get a consistent view of the data from the data source.

Let’s assume that our pipeline streamed CDC events to the BigQuery table users in the dataset pg_cdc and that this table features the columns id, resembling the primary key or identifier for each row in the PostgreSQL database table, and updated_at, resembling the time of the most recent change for each row.

We could build the following view on top of the table users which returns the most recent change event for each row:

CREATE VIEW
  pg_cdc.users_snapshot
AS (
  SELECT
    * EXCEPT(row_num)
  FROM
    (
      SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) row_num
      FROM
        `pg_cdc.users`
    )
);

When working with the data in BigQuery, you could directly query this view. For instance, the following query would return the most recent version of the row with the id 42:

SELECT * FROM pg_cdc.users_snapshot WHERE id = 42;

Of course, you could still access the full changelog. For instance, the following query would return all change events for the same row:

SELECT * FROM pg_cdc.users WHERE id = 42;

Pros:

  • This approach works perfectly for analytics teams interested in visualizing recent trends to stakeholders.

  • This approach allows the use of the same queries for the CDC events in the cloud data warehouse and for the source data in the transactional database system.

Cons:

  • This approach creates a view over the table with CDC events. So instead of managing one relation, you end up managing another relations/views. You could mitigate this downside by managing the views with a tool, like dbt or Dataform.

Summary

This article showed how to stream CDC events from PostgreSQL, a transactional database system, to Google Cloud BigQuery, a cloud data warehouse.

By default, you end up with a BigQuery table that contains one row for each change applied to a row in PostgreSQL. We built a view on top of this raw table that creates a consistent snapshot of the CDC events, allowing data consumers to work with a clean and consistent snapshot of the data, without having to implement a deduplication of the CDC events themselves.

Get started with DataCater, for free

The real-time ETL platform for data and dev teams. Benefit from the power of event streaming without having to handle its complexity.

Start free