How to mask data in Redpanda with Python and DataCater

This tutorial walks you through developing custom data masking functions in Python and applying them to Redpanda topics.

...
By Stefan Sprenger

Data masking is a technique for removing sensitive information from data while keeping their original structure. One practical example is replacing the last block of an IPv4 address with 0s, e.g., transforming 194.99.91.179 to 194.99.91.0. While we can no longer identify specific IP addresses, we keep enough information for making the data useful for purposes, like analytics or testing. Data masking rules depend a lot on the data and use case; it is very likely that you mask email addresses in a different way than names. That is, while some SQL dialects offer pre-built functions for masking data, e.g., ksqlDB’s MASK(), we are typically better off with implementing data masking in a more flexible language, such as Python.

This article walks you through building custom data masking functions in Python and applying them to data in Redpanda topics. We develop and operate the Python functions with DataCater. Our goal is to build a streaming data pipeline that streams data from one Redpanda topic to another one and applies the data masking function to the data on the way. As a practical example, we mask email addresses.

The learnings can be transferred to various use cases. Consider, for instance, building up a pre-production stage for your Redpanda/Kafka cluster. Using the presented approach, you can stream production data to your pre-production cluster and remove sensitive information on the way.

💡 We favor tutorials that are easily reproducible and do not require signing up for any service. In this article, we refrain from using any managed services but run DataCater's Open Core on a local Kubernetes cluster. We also provide scripts for ingesting test data into Redpanda.

Prerequisites and Installation

We used Minikube for developing this tutorial. If you use a different Kubernetes distribution, you might want to use the Helm charts of DataCater and Redpanda Once you installed Minikube, please create a new Kubernetes cluster as follows:

$ minikube start --memory 8192 --cpus 2

In the next step, we install DataCater and Redpanda on the Kubernetes cluster. To this end, we provide a ready-to-use Kubernetes manifest that you can apply and install into your default namespace with the following command:

$ kubectl apply -f https://raw.githubusercontent.com/DataCater/datacater/main/k8s-manifests/minikube-with-postgres-and-redpanda.yaml

If you prefer to use a different namespace or if you are not running Minikube, we recommend installing DataCater and Redpanda using Helm.

Let us follow the progress of the installation with $ kubectl get pods --watch. Once completed, you should see that all pods are ready and running:

$ kubectl get pods --watch
NAME                                                    READY   STATUS    RESTARTS        AGE
datacater-0                                             1/1     Running   0               4d13h
datacater-pg-0                                          1/1     Running   0               4d17h
python-runner-0                                         1/1     Running   0               159m
python-runner-1                                         1/1     Running   0               158m
python-runner-2                                         1/1     Running   0               158m
python-runner-3                                         1/1     Running   0               158m
python-runner-4                                         1/1     Running   0               159m
redpanda-0                                              1/1     Running   2 (6d23h ago)   12d
ui                                                      1/1     Running   0               4d13h

Next, please make sure that you can access both Redpanda and DataCater from outside of the Kubernetes cluster.

Accessing Redpanda from outside of Kubernetes

For accessing Redpanda from your host machine, please create a port forward:

$ kubectl port-forward redpanda-0 9093:9093

You also need to add the following line to your /etc/hosts file to make sure that the ADVERTISED_LISTENER is resolving correctly:

127.0.0.1 redpanda-0.redpanda.default.svc.cluster.local.

Accessing DataCater from outside of Kubernetes

You can access DataCater running inside the Kubernetes cluster by creating a port forward to the UI pod:

$ kubectl port-forward ui 8080:8080

Please confirm that the port forward is working by either visiting http://localhost:8080 in your web browser and signing in with the username admin and password admin or creating an access token via the API:

$ curl -uadmin:admin -XPOST http://localhost:8080/api/v1/authentication
{"token_type":"Bearer","access_token":"[HERE_COMES_THE_ACCESS_TOKEN]","expires_in":604800}

Loading test data into Redpanda

Let us prepare Redpanda by creating two topics. The first topic, users, is used to store user data in the JSON format. Each record features fields, such as an email address, a name, etc. We use this topic as the source of our streaming data pipeline. The second topic, users_masked, is used to store the masked user data. We use it as the sink of our pipeline.

We can execute the following commands to create the topics:

$ kubectl exec -it redpanda-0 -- rpk topic create users --brokers='redpanda-0.redpanda.default.svc.cluster.local.:9093'
$ kubectl exec -it redpanda-0 -- rpk topic create users_masked --brokers='redpanda-0.redpanda.default.svc.cluster.local.:9093'

Next, we load test data into the users topic. To this end, you can use the following Python script. Before executing it (on your host), please make sure that you have installed the dependencies by running python3 -m pip install Faker kafka-python.

from kafka import KafkaProducer
from faker import Faker
import json

fake = Faker()

producer = KafkaProducer(
    bootstrap_servers = "localhost:9093",
    key_serializer    = lambda k: json.dumps(k).encode('ascii'),
    value_serializer  = lambda v: json.dumps(v).encode('ascii')
)

for i in range(10000):
  producer.send(
    "users",
    key = { "entry": i },
    value = {
      "ip":       fake.ipv4(),
      "username": fake.simple_profile()["username"],
      "email":    fake.ascii_company_email(),
      "name":     fake.name(),
      "message":  fake.text(max_nb_chars=1024)
    }
  )

You should now see data arriving in Redpanda when running the following command:

$ kubectl exec -it redpanda-0 -- rpk topic consume users --brokers='redpanda-0.redpanda.default.svc.cluster.local.:9093'

Connecting DataCater with Redpanda

DataCater provides Streams as a building block for integrating with any Kafka API-compatible service, like Redpanda.

Please head over to http://localhost:8080, sign in with admin:admin unless already authenticated, and navigate to Streams. After clicking Create new Stream, you can provide the configuration of the Redpanda/Kafka topic:

Create a new stream in DataCater
Connect DataCater to your Redpanda cluster by creating a new stream.

Please provide the value users for the configuration option name and the value redpanda-0.redpanda.default.svc.cluster.local.:9093 for the configuration option bootstrap.servers. For this source stream, please also set the connection option auto.offset.reset to earliest. Finally, click the button Create stream.

After creating the first stream, please head back to the Create new Stream page. This time, please provide the value users_masked for the configuration option name and the value redpanda-0.redpanda.default.svc.cluster.local.:9093 for the configuration option bootstrap.servers. Again, click the button Create stream to persist the stream.

You should now see both streams when navigating to Streams:

Listing the available streams
Listing the available streams.

Using DataCater's API

Alternatively, you could also use DataCater’s API to create the streams.

In the first step, please retrieve a valid access token for performing API calls:

$ curl -uadmin:admin -XPOST http://localhost:8080/api/v1/authentication
{"token_type":"Bearer","access_token":"[ACCESS_TOKEN]","expires_in":604800}

In the next steps, perform the following API calls for creating the streams (please make sure to replace ACCESS_TOKEN with the one you retrieved in the previous step):

$ curl http://localhost:8080/api/v1/streams \
  -H'Authorization: Bearer ACCESS_TOKEN' \
  -XPOST \
  -H'Content-Type:application/json' \
  -d'{"name":"users","spec":{"kafka":{"topic":{"config":{}},"bootstrap.servers":"redpanda-0.redpanda.default.svc.cluster.local.:9093","auto.offset.reset":"earliest"},"kind":"KAFKA"}}'
$ curl http://localhost:8080/api/v1/streams \
  -H'Authorization: Bearer ACCESS_TOKEN' \
  -XPOST \
  -H'Content-Type:application/json' \
  -d'{"name":"users_masked","spec":{"kafka":{"topic":{"config":{}},"bootstrap.servers":"redpanda-0.redpanda.default.svc.cluster.local.:9093"},"kind":"KAFKA"}}'

Developing the Data Masking function

In DataCater’s UI, navigate to Pipelines and click Create new pipeline.

Please provide a name for your pipeline, for instance, Mask users, choose the stream with the name users as Source stream, and select the stream with the name users_masked as Sink stream. Finally, click the button Create pipeline.

Create a new pipeline in DataCater
Create a new pipeline in DataCater.

After being redirected to the newly-created pipeline resource, please click the button Edit in Pipeline Designer:

Showing the new pipeline in DataCater
Showing the pipeline you just created. Please click on Edit in Pipeline Designer.

DataCater’s Pipeline Designer extracts a sample from the underlying Redpanda/Kafka topic and lets you interactively develop a streaming data pipeline. While you apply filters and transforms, the Pipeline Designer previews their impact on the sample instantly.

DataCater's pipeline designer.
DataCater's interactive pipeline designer samples your source stream.

In this example, we want to mask the field email.

Please click the button Add first step and choose Transform single fields:

Adding a new step to the pipeline.
Adding a new step to the pipeline.

Click on the button Apply transform or filter in the column of the email field. Next, choose User-defined transform in the sidebar:

Choose a transform from the sidebar.
Choose the user-defined transform from the list of available transforms.

A user-defined transform allows you to provide a custom Python function that takes two parameters: (1) The value of the field that you are applying the function to, and (2) the entire record as a Python dict. DataCater automatically fills the return value of the function into the field that you are processing with the transform.

Please fill in the following Python code to mask the first part (before the @ character) of the email addresses:

def transform(field, record: dict):
  email = field
  at_loc = email.find("@")

  masked_email = email[0] + "***" + email[at_loc-1:]

  return masked_email

After clicking the button Save & Run, you can see the preview in the grid:

Previewing a Python transform in the Pipeline Designer.
Interactive preview of a Python transform in the Pipeline Designer.

Using DataCater's API

If you prefer to create the pipeline programmatically, you can perform the following API call (please make sure to replace the references to the streams with the UUIDs of your resources):

$ curl http://localhost:8080/api/v1/pipelines \
  -H'Authorization: Bearer ACCESS_TOKEN' \
  -XPOST \
  -H'Content-Type:application/json' \
  -d'{"name":"Mask users","metadata":{"stream-in":"06c7f086-ef1a-4a9b-a735-af8ad8223805","stream-out":"efb84d3b-45f7-4b5e-a377-55c68f56889e"},"spec":{"steps":[{"kind":"Field","fields":{"email":{"transform":{"key":"user-defined-transformation","config":{"code":"def transform(field, record: dict):\n  email = field\n  at_loc = email.find(\"@\")\n\n  masked_email = email[0] + \\\n    \"***\" + email[at_loc-1:]\n\n  return masked_email"}}}}}]}}'

Deployment on Kubernetes

In the previous steps, we ingested test data into Redpanda, connected DataCater with Redpanda, and defined a custom data masking function in Python. Now, it is time to start streaming.

Please navigate to Deployments and click Create new deployment.

Please provide a name for your deployment, for instance, mask-users, and select the pipeline that you just created. Finally, click the button Create deployment.

Creating a new deployment.
Creating a deployment for your pipeline.

Within a few seconds, the deployment should start streaming the records from the Redpanda topic users to the topic users_masked and mask the field email on the way using the custom Python function that we built:

Showing a new deployment.
DataCater provides metrics and health information for your deployment, including API endpoints that you can access programmatically.

You can execute the following command to inspect the sink stream containing the masked data:

$ kubectl exec -it redpanda-0 -- rpk topic consume users_masked --brokers='redpanda-0.redpanda.default.svc.cluster.local.:9093'

Using DataCater's API

If you prefer to create the deployment programmatically, you can perform the following API call (please make sure to replace the reference to the pipeline with the UUID of your resources):

$ curl http://localhost:8080/api/v1/deployments \
  -H'Authorization: Bearer ACCESS_TOKEN' \
  -XPOST \
  -H'Content-Type:application/json' \
  -d'{"spec":{"pipeline":"f645d65b-4383-48d4-b3e6-1edbffeac9d4"},"name":"mask-users"}'

Summary

We built a streaming data pipeline that masks email addresses in real-time. While this tutorial used Redpanda as the implementation of the Kafka protocol, it can be applied to any Kafka API-compatible broker. It is not limited to streaming data between the same Kafka cluster but can also be used for streaming data between different clusters, which can be very attractive if you, for instance, need to apply transforms when streaming from your production to a pre-production environment.

We used email addresses as an example. Thanks to Python’s flexibility, you can easily develop data masking functions for other data types as well. For instance, the following transform replaces the last block of an IPv4 address:

import ipaddress

def transform(field, record: dict):
  ip_address = ipaddress.ip_address(field)

  return ipaddress.ip_address(int(x) & 0xffffff00)

Feel free to check out our code on GitHub - we appreciate your feedback!