10 Useful Python Transforms for your Streaming Data Pipeline

Learn how to transform your data with a few lines of Python code.

...
...
By Christiane Scherch (Data engineer, news.de), Stefan Sprenger

Python is the lingua franca of the data community. From interactive tools, like Jupyter notebooks, over great libraries, such as Pandas and Numpy, to data processing at scale with PySpark or PyFlink: If you have processed data with code before, it’s very likely that you used Python.

DataCater offers a declarative interface to streaming data pipelines. It allows users to declare how data shall be transformed, instead of requiring them to implement data pipelines with a fixed, code-based control flow.

DataCater has been built by data engineers for data teams. We know that many use cases have special requirements for data transformation. To this end, DataCater allows users to extend declarative data pipelines with lightweight Python transforms.

This article provides a brief introduction to Python transforms in DataCater and describes 10 Python transforms that we find most useful in the context of streaming data pipelines.

Python transforms in DataCater

In DataCater, Python transforms take two parameters and are applied to a single attribute of the data set:

def transform(value, row):
  return value

The first parameter, value, provides the value of the attribute that the function is applied to. The second parameter, row, is a Python dictionary and allows the function to access all attributes of the row (or event) by their name.

10 Useful Python Transforms

The following sections introduce ten Python transforms, which we frequently use in our work.

Perform API calls

When streaming data from sources to sinks, many use cases require the data to be enriched with information from a third-party API on the fly. Let’s assume a data source providing price points in USD. We could be interested in adding price points in EUR, too. The following Python transform uses the Python module requests to retrieve the current USD/EUR exchange rate from an API:

import json
import requests

def transform(value, row):
  exchange_rate = requests\
    .get("https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies/usd/eur.json")\
    .json()["eur"]
  return int(row["price_usd"] * exchange_rate)

By default, this approach will perform an API request for each individual event processed by a DataCater pipeline. If you want to cache API results across events, please have a look at the following section.

Cache results of API calls

In some cases, it might not be necessary to perform API requests for each individual event. For instance, we don’t expect exchange rates to change at a very high frequency. Given that each API request increases the latency of the streaming data pipeline, we might decide to (temporarily) cache the results of API calls. The Python module requests-cache is based on the requests module and allows to cache API requests. We might extend the Python transform from the previous section to enable a caching of the exchange rate for one day:

import json
import requests_cache
from datetime import timedelta

session = requests_cache.CachedSession('demo', expire_after=timedelta(days=1))

def transform(value, row):
  exchange_rate = session\
    .get("https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies/usd/eur.json")\
    .json()["eur"]
  return int(row["price_usd"] * exchange_rate)

Flatten nested JSON structures

When consuming data from APIs, we often deal with (deeply) nested JSON structures. The Python standard module json allows us to flatten JSON structures, such as objects or arrays, in order to reduce their complexity.

Let’s assume that the attribute address is a JSON object (DataCater stores JSON objects and arrays using the data type string), which holds the keys street, zip_code, and city. The following Python transform loads the JSON object from the attribute and gets the city:

import json

def transform(value, row):
  address = json.loads(value)

  return address["city"]

Translate text values with DeepL

In most cases, APIs provide text data in English. Oftentimes, we would like to use the texts in other languages, such as German, too. To this end, we can use the free plan of the DeepL API together with the Python module requests that we introduced above.

import requests
import json

def transform(value, row):
  return requests.post(\
    url="https://api-free.deepl.com/v2/translate",\
    data={\
      "target_lang": "DE",\ # replace DE with symbol of target language
      "auth_key": "fill_in_auth_key",\
      "text": row["text_to_translate"]}).json()

For more extensive translation requests, a DeepL Pro account with the corresponding API key is worthwhile.

Detect the language of texts

In the last section, we assumed that we know the language of a text value. That is not always the case. If we need to identify the language of a text value before processing it further, we might get help from the Python module langdetect:

from langdetect import detect

def transform(value, row):
  # returns "en" for English, etc.
  return detect(row["description"])

Encode images in base64

Base64 is a method for encoding 8-bit binary data into a string consisting only of readable, codepage-independent ASCII characters. In DataCater, we can use the base64 encoding to transfer, for instance, images in attributes of type String:

import base64
import requests

def transform(value, row):
  image = requests.get(row["image_url"]).content
  return str(base64.b64encode(image))

Please note that base64 encodings might get quite large and exceed the maximum size of an event in DataCater (3MB).

Work with geo coordinates

GeoJSON is a popular format for managing geographic data in JSON. Let’s assume we consume an API endpoint, which returns a list of shapes in the GeoJSON format. We can use the Python module shapely to, for instance, find the shape that contains a specific 2-d coordinate. The following Python transform returns the name of the corresponding shape:

import requests
from shapely.geometry import shape, Point

def transform(value, row):
  # shapes in GeoJSON format
  regions = sesion.get(endpoint_with_geojsondata).json()
  # coordinates of Berlin, Germany
  point = Point(13.405, 52.52)
  for feature in regions["features"]:
    polygon = shape(feature["geometry"])
    if polygon.contains(point):
      return feature["properties"]["name"]
  return None

Testing text values with regular expressions

Python offers the standard module re for working with regular expressions. We might use them to test the structure of text values. For instance, we might validate email addresses.

The following Python transform checks the correctness of an email address stored in the column email and returns whether it is valid:

import re
import json

email_regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'

def transform(value, row):
  if re.fullmatch(email_regex, row["email"]):
    return True
   else:
    return False

Mask IP addresses

Oftentimes, we work with sensitive data, like IP addresses. We can use Python transforms to mask these data, leaving enough information for a downstream analysis but removing all the sensitive information that conflict with data privacy laws.

The following Python transform masks the last parts of an IPv4 IP address, i.e., it replaces the last 2 blocks with XXX, but leaves the first two blocks untouched. We might still be able to identify the country of the IP address but not the exact geolocation.

import re

def transform(value, row):
  return re.sub(\
    "(\.(?:[0-9]{1,3}\.){2}[0-9]{1,3})",\
    r".XXX.XXX.XXX",\
    value)

Combine multiple attributes into a single JSON object

DataCater allows users to store JSON objects in attributes of type String. Sometimes, we might be interested in denormalizing data by merging multiple attributes into one. Let’s assume a data set holding address information, like street, city, etc. We could use a Python transform to merge all of these information into one attribute:

import json

def transform(value, row):
  return json.dumps({
    "name":     row["name"],
    "street":   row["street"],
    "zip_code": row["zip_code"],
    "city":     row["city"]
  })

Summary

In this article, 10 Python transforms that we frequently use to transform data in streaming data pipelines. Whether you’re dealing with complex JSON structures, need to detect the language of texts, or want to mask sensitive information: Python transforms are often a powerful way to perform complicated transformations in a few lines of code.

We are very interested in your experience. Did we miss a Python transform that you frequently apply? Let us know! :)