Learn how you can declare streaming data pipelines in YAML.
Historically, Hadoop MapReduce (MR) has been one of the first technologies for the scalable, distributed, and fault-tolerant processing of data. MR jobs are implemented with a map
procedure and a reduce
function. Over time it became clear that he rigid structure of MR jobs enforces a specific linear data flow on data pipelines that does not feel very native for many processing needs.
The release of Apache Spark marked the start of the second generation in data processing frameworks: Spark jobs can be implemented with Scala’s Collections API using functions, such as filter()
, map()
, flatMap()
, or count()
. While Spark and other data processing frameworks adopting the Collections API, like Apache Flink or Apache Kafka Streams, made the implementation of scalable data pipelines more feasible, their approach was still completely imperative, forcing developers to take care of performance issues in their code by themselves and leaving almost no room for automated optimizations.
In recent years, we saw a shift to declarative data pipelines, removing the burden from developers to worry about optimizations and learn every detail of the underlying processing framework to master it. SQL became a popular choice for implementing data pipelines in cloud data warehouses. In parallel to the rise of SQL-powered data pipelines, cloud-native computing emerged and further evangelized the concept of declarative resource definitions, typically implemented in YAML.
Traditionally, data pipelines are implemented with imperative code using programming languages, such as Python. Instead of fixing the flow of data through imperative commands, declarative data pipelines solely describe the logic of the data processing. Runtimes (or interpreters) take care of turning the declaration into executable code, leaving much more room for automated optimizations and parallelization.
Besides leaving much more room for optimizations to the data pipeline runtime, YAML-based declarative data pipelines offer multiple advantages:
Documentation: By describing the transformation logic instead of implementing it in program code, declarative data pipelines offer documentation for free. Declarative data pipelines can be read by most people.
Faster development iterations: Declarative data pipelines allow developers to iterate much faster on data pipelines and implement transformation logic in much fewer lines of code.
The cloud-native language: YAML is the default language for defining resources in cloud-native computing. By using it for data pipelines, developers can rely on a consistent tech stack.
DataOps / GitOps: Declarative data pipelines allow to apply principles from GitOps to data. They lay the foundation for the testing and deployment of data pipelines via CI/CD pipelines.
Automation: Compared to imperative data pipelines, declarative data pipelines are much easier to automate and generate by other software.
Introducing declarative descriptions for data pipelines brings a couple of challenges with it. Data pipelines by their very nature are stateful applications. Their sole purpose is to change external systems.
This contrasts with the general idea of declarative languages, especially the concept of referential transparency as offered by many functional languages, such as Haskell or Scala. Hence, the question to be answered is, what state can be declared in DataCater’s YAML.
The following figure describes how DataCater interprets declarative data pipelines and derives immutable container images, which can be deployed on Kubernetes. DataCater makes use of Apache Kafka and Apache Kafka Connect for event streaming and event sourcing, respectively. In this context, data pipelines are Apache Kafka Streams applications that are built, containerized, and executed on Kubernetes.
With the above information, we can easily answer the question: What state can be declared in DataCater’s YAML? The declared state equals the content of the pipeline container.
To be more precise, pipelines consist of a list of filters, and a list of transformation steps. An example and further explanation follows:
Filters can be used to restrict the data pipeline to a subset of the source data, if needed. A filter is declared as a mapping of the attributeName
, a filter
function (here equal), and optionally a filterConfig
.
filters: - attributeName: "log_level" filter: "equal" filterConfig: value: "error"
Transformation steps are applied in sequential order to the data while streaming them from the data source to the data sink. Each transformation step can apply one transformation function to each attribute.
A transformation step is defined by a name
, which identifies the step, and a list of the following mappings:
attributeName
: reference to the attributetransformation
: name of the transformation functiontransformationConfig
(optional): configuration of the transformation
functionfilter
(optional): filter to restrict the application of the transformation
to specific valuesfilterConfig
(optional): configuration of the filter
functiontransformationSteps: - name: "Mask IP addresses in message" transformations: - attributeName: "message" transformation: "user-defined-transformation" transformationConfig: code: |- import re def transform(value, row): return re.sub(\ "(\.(?:[0-9]{1,3}\.){2}[0-9]{1,3})",\ r".XXX.XXX.XXX",\ value)
DataCater heavily relies on Kubernetes and borrows a lot of concepts from cloud-native technologies. For instance, our YAML format has been influenced by Kubernetes’ Custom Resource Definition. We conclude this article with a complete YAML representation of a pipeline:
apiVersion: "datacater.io/v1" kind: "Pipeline" metadata: id: "1" name: "Sync error logs with masked IP addresses" spec: filters: - attributeName: "log_level" filter: "equal" filterConfig: value: "error" transformationSteps: - name: "Mask IP addresses in message" transformations: - attributeName: "message" transformation: "user-defined-transformation" transformationConfig: code: |- import re def transform(value, row): return re.sub(\ "(\.(?:[0-9]{1,3}\.){2}[0-9]{1,3})",\ r".XXX.XXX.XXX",\ value)
The above pipeline, named Sync error logs with masked IP adresses, results in a Kafka Streams application deployment on Kubernetes. The pipeline syncs error log messages from a data source to a sink and utilizes a Python function to mask IP addresses on the way.
Declarative data pipelines are an important next step in the evolution of ETL tooling. In this blog post, we showed how DataCater pipelines can be represented in YAML. We highlighted benefits of representing data pipelines declaratively, which include, but are not limited to, precise documentation, faster development iterations, and support for automation.