How to use Change Data Capture (CDC) with Elasticsearch

Learn how to extract changes from Elasticsearch in real-time.

...
By Stefan Sprenger

Elasticsearch is a distributed search engine built on top of the open-source project Apache Lucene. It offers a powerful HTTP interface for indexing documents, searching indexes and performing any other interaction. Elasticsearch manages indexed documents as schemaless JSON objects.

Traditionally, most architectures deploy Elasticsearch as a full-text search index for a database system, often feeding changes from the database to Elasticsearch using a data pipeline. However, in recent years, we can observe the emergence of new use cases, where Elasticsearch takes over the role of the primary data store.

Elasticsearch as Data Source

Data architectures that deploy Elasticsearch as a data store often need to feed data from Elasticsearch to downstream applications.

The naive approach to extracting data from Elasticsearch is a recurring bulk load (or Match all query in Elasticsearch terminology). Given that the Elasticsearch index holds a large number of documents, the bulk load cannot be performed too often because it is highly inefficient. In this article, we want to explore how to make data extraction more intelligent!

Capturing Changes from Elasticsearch

Change data capture (CDC) is a recent approach to extracting change events from data stores, mainly applied to database systems. While database systems often offer a recovery log, holding all operations performed on the database, Elasticsearch does not provide similar means. However, we might still use queries to detect changes in an Elasticsearch index.

In this case, we assume that the Elasticsearch index features a date field, which indicates when documents have been updated most recently, called updated_at in the following.

For capturing change events, we would recurringly query Elasticsearch, like in the case of the bulk load. However, we would extend the query as follows:

  • We sort documents by the field updated_at.
  • We ask for documents that have a value in the field updated_at larger than the one we most recently processed.

On our side, we would have to somehow remember the timestamp of the most recently processed document.

The Elasticsearch query could look as follows:

$ curl -XPOST \
  -H"Content-Type:application/json" \
  -d'{"size":100, "sort":[{"updated_at":"asc"}], "search_after":[MOST_RECENTLY_PROCESSED_TIMESTAMP]}' \
  https://elastic:pw@deployment.cloud.es.io:9243/index/_search

When executing the query with a high frequency, we can extract change events from Elasticsearch in (near) real-time.

Extracting Data from Elasticsearch with DataCater

Setting up change data capture for Elasticsearch on your own requires lots of manual work, e.g., for managing the timestamp of the most recently processed change event. Using DataCater’s source connector for REST APIs, you can configure and deploy CDC with Elasticsearch in a matter of minutes. Additionally, you could benefit from the large set of sink connectors and options for transforming change events while streaming them to downstream applications.

In a recent demo video, we show how to build a data pipeline in DataCater to stream change events from Elasticsearch to a JSON file in real-time.

Get started with our 14-day free trial

Risk-free exploration of the DataCater platform for streaming data pipelines - no credit card required.

Sign up

Did you like this article? Did we miss anything? Feel free to reach out to us. We would be more than happy to chat CDC, show you DataCater in action, or discuss any other request with you.

mailbox

Contact us

By clicking "Send request" you agree with the processing of your data according to the privacy policy.