DataCater introduces support for declarative data pipelines

Learn how you can declare streaming data pipelines in YAML.

...
...
By Hakan Lofcali, Stefan Sprenger

Historically, Hadoop MapReduce (MR) has been one of the first technologies for the scalable, distributed, and fault-tolerant processing of data. MR jobs are implemented with a map procedure and a reduce function. Over time it became clear that he rigid structure of MR jobs enforces a specific linear data flow on data pipelines that does not feel very native for many processing needs.

The release of Apache Spark marked the start of the second generation in data processing frameworks: Spark jobs can be implemented with Scala’s Collections API using functions, such as filter(), map(), flatMap(), or count(). While Spark and other data processing frameworks adopting the Collections API, like Apache Flink or Apache Kafka Streams, made the implementation of scalable data pipelines more feasible, their approach was still completely imperative, forcing developers to take care of performance issues in their code by themselves and leaving almost no room for automated optimizations.

In recent years, we saw a shift to declarative data pipelines, removing the burden from developers to worry about optimizations and learn every detail of the underlying processing framework to master it. SQL became a popular choice for implementing data pipelines in cloud data warehouses. In parallel to the rise of SQL-powered data pipelines, cloud-native computing emerged and further evangelized the concept of declarative resource definitions, typically implemented in YAML.

What are declarative data pipelines?

Traditionally, data pipelines are implemented with imperative code using programming languages, such as Python. Instead of fixing the flow of data through imperative commands, declarative data pipelines solely describe the logic of the data processing. Runtimes (or interpreters) take care of turning the declaration into executable code, leaving much more room for automated optimizations and parallelization.

What are the benefits of YAML-based declarative data pipelines?

Besides leaving much more room for optimizations to the data pipeline runtime, YAML-based declarative data pipelines offer multiple advantages:

  • Documentation: By describing the transformation logic instead of implementing it in program code, declarative data pipelines offer documentation for free. Declarative data pipelines can be read by most people.

  • Faster development iterations: Declarative data pipelines allow developers to iterate much faster on data pipelines and implement transformation logic in much fewer lines of code.

  • The cloud-native language: YAML is the default language for defining resources in cloud-native computing. By using it for data pipelines, developers can rely on a consistent tech stack.

  • DataOps / GitOps: Declarative data pipelines allow to apply principles from GitOps to data. They lay the foundation for the testing and deployment of data pipelines via CI/CD pipelines.

  • Automation: Compared to imperative data pipelines, declarative data pipelines are much easier to automate and generate by other software.

How does DataCater implement declarative data pipelines?

Introducing declarative descriptions for data pipelines brings a couple of challenges with it. Data pipelines by their very nature are stateful applications. Their sole purpose is to change external systems.

This contrasts with the general idea of declarative languages, especially the concept of referential transparency as offered by many functional languages, such as Haskell or Scala. Hence, the question to be answered is, what state can be declared in DataCater’s YAML.

The following figure describes how DataCater interprets declarative data pipelines and derives immutable container images, which can be deployed on Kubernetes. DataCater makes use of Apache Kafka and Apache Kafka Connect for event streaming and event sourcing, respectively. In this context, data pipelines are Apache Kafka Streams applications that are built, containerized, and executed on Kubernetes.

How DataCater interprets declarative data pipelines.
DataCater interprets YAML-based declarations of data pipelines and compiles them to Apache Kafka Streams applications, which are packaged as immutable container images and deployed on Kubernetes.

With the above information, we can easily answer the question: What state can be declared in DataCater’s YAML? The declared state equals the content of the pipeline container.

To be more precise, pipelines consist of a source schema, a list of filters, and a list of transformation steps. An example and further explanation follows:

Source schema

The source schema (sourceSchema) defines the expected input schema of the pipeline and helps users to build deterministic data pipelines. A source schema is defined as a list of attribute mappings. Each attribute mapping consists of an attributeName, a referable identifier, and a dataType.

sourceSchema:
- attributeName: "log_level"
  dataType: "string"
- attributeName: "message"
  dataType: "string"

Filters

Filters can be used to restrict the data pipeline to a subset of the source data, if needed. A filter is declared as a mapping of the attributeName (as defined in the sourceSchema), a filter function (here equal), and optionally a filterConfig.

filters:
- attributeName: "log_level"
  filter: "equal"
  filterConfig:
    value: "error"

Transformation steps

Transformation steps are applied in sequential order to the data while streaming them from the data source to the data sink. Each transformation step can apply one transformation function to each attribute. A transformation step is defined by a name, which identifies the step, and a list of the following mappings:

  • attributeName: reference to the attribute as named in the sourceSchema
  • transformation: name of the transformation function
  • transformationConfig (optional): configuration of the transformation function
  • filter (optional): filter to restrict the application of the transformation to specific values
  • filterConfig (optional): configuration of the filter function
transformationSteps:
- name: "Mask IP addresses in message"
  transformations:
  - attributeName: "message"
    transformation: "user-defined-transformation"
    transformationConfig:
      code: |-
        import re

        def transform(value, row):
          return re.sub(\
            "(\.(?:[0-9]{1,3}\.){2}[0-9]{1,3})",\
            r".XXX.XXX.XXX",\
            value)

Bringing it all together

DataCater heavily relies on Kubernetes and borrows a lot of concepts from cloud-native technologies. For instance, our YAML format has been influenced by Kubernetes’ Custom Resource Definition. We conclude this article with a complete YAML representation of a pipeline:

apiVersion: "datacater.io/v1"
kind: "Pipeline"
metadata:
  id: "1"
  name: "Sync error logs with masked IP addresses"
spec:
  sourceSchema:
  - attributeName: "log_level"
    dataType: "string"
  - attributeName: "message"
    dataType: "string"
  filters:
  - attributeName: "log_level"
    filter: "equal"
    filterConfig:
      value: "error"
  transformationSteps:
  - name: "Mask IP addresses in message"
    transformations:
    - attributeName: "message"
      transformation: "user-defined-transformation"
      transformationConfig:
        code: |-
          import re

          def transform(value, row):
            return re.sub(\
              "(\.(?:[0-9]{1,3}\.){2}[0-9]{1,3})",\
              r".XXX.XXX.XXX",\
              value)

The above pipeline, named Sync error logs with masked IP adresses, results in a Kafka Streams application deployment on Kubernetes. The pipeline syncs error log messages from a data source to a sink and utilizes a Python function to mask IP addresses on the way.

Summary

Declarative data pipelines are an important next step in the evolution of ETL tooling. In this blog post, we showed how DataCater pipelines can be represented in YAML. We highlighted benefits of representing data pipelines declaratively, which include, but are not limited to, precise documentation, faster development iterations, and support for automation.