A Beginner’s Guide to User-Defined Functions in ksqlDB

Learn everything you need to know to get started with developing and using UDFs in ksqlDB.

...
By Michael Aboagye

ksqlDB is the SQL layer on top of Kafka Streams, which allows users to process data in motion declaratively. It makes data streaming more accessible to developers that do not have experience with Java or Scala (as required by Kafka Streams). ksqlDB provides a set of pre-built functions for aggregating, joining, filtering, and transforming data. For instance, you can use the following query to register the topic users and select all users that are older than 30 years:

CREATE TABLE users (id INT PRIMARY KEY, name STRING, age INT, password STRING)
    WITH (kafka_topic = 'users', value_format = 'JSON', partitions = 1);

SELECT * FROM users WHERE age > 30;

If you have worked with data before, especially in a production environment, you might know that most use cases have edge cases and custom requirements, which can be hardly implemented with pre-built functions. To this end, ksqlDB provides user-defined functions (UDF). UDFs are a means to extending ksqlDB’s core capabilities and increasing its ability for dealing with different kinds of use cases. In ksqlDB, UDFs must be implemented in Java and packaged as JARs. Once they have been added to the class path of the ksqlDB instance, they are available to users and can be used in queries.

ksqlDB supports different types of UDFs:

  • Scalar functions take a single row as input and produce a single row as output. They resemble the map function from functional programming.
  • Tabular functions take a single row as input and produce an arbitrary number of rows as output. They resemble the flatMap function from functional programming.
  • Aggregate functions take a single row as input and can maintain state. They are most helpful when you need to calculate information based on multiple rows at the same time, e.g., the average of a numeric value.

In this article, we explore the capabilities of UDFs in ksqlDB, show how to implement them, and discuss their advantages and disadvantages compared to other solutions from the data streaming space.

As an exemplary use case, we will have a look at how to extend ksqlDB with a HASH function implemented as a scalar UDF.

Use cases for UDFs in ksqlDB

While ksqlDB provides a lot of functionality for common tasks, like casting data types or finding the maximum value over a time window, its capabilities are limited. To this end, ksqlDB provides UDFs. UDFs are useful when dealing with requirements that cannot be implemented with the built-in capabilities of ksqlDB.

Exemplary use cases for UDFs in ksqlDB are:

  • Calculating a custom score function with a proprietary formula
  • Implementing custom data masking rules
  • Advanced text processing
  • Interaction with third-party systems

Setting up Java for the development of a ksqlDB UDF

In the first step, we need to set up a new Java project. In this article, we use Gradle as the build tool. Maven might be a reasonable alternative if you prefer a different build tool.

Let us get started by specifying our dependencies in the file build.gradle:

buildscript {
    repositories {
        jcenter()
    }
}

plugins {
    id "java"
    id "com.github.johnrengelman.shadow" version "6.0.0"
}

sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1"

repositories {
    mavenCentral()
    jcenter()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

dependencies {
    implementation "io.confluent.ksql:ksqldb-udf:7.3.1"
    implementation "org.apache.kafka:kafka_2.13:3.3.1"
    implementation "org.apache.kafka:connect-api:3.3.1"
}

apply plugin: "com.github.johnrengelman.shadow"
apply plugin: "java"

compileJava {
    options.compilerArgs << "-parameters"
}

shadowJar {
    archiveBaseName = "hash-udf"
    archiveClassifier = ""
    destinationDirectory = file("extensions")
}

Next, we need to create the file src/main/java/com/example/HashUdf.java in our Java project and fill it with the following code:

package com.example;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

@UdfDescription(
  name ="HASH",
  description = "A function for applying hash algorithms to a string value.",
  author = "Michael",
  version = "0.0.1"
)
public class HashUdf {
    @Udf
    public String hash(
        @UdfParameter final String input,
        @UdfParameter final String algorithm) throws NoSuchAlgorithmException {
        MessageDigest messageDigest = MessageDigest.getInstance(algorithm);
        messageDigest.update(input.getBytes());
        return new String(messageDigest.digest());
    }
}

After implementing the UDF we need to package it as a JAR that can be loaded into a ksqlDB instance. To this end, execute the following gradle task on your command line:

$ gradle shadowJar

The gradle task shadowJar creates a uberJar that contains your Java project including all dependencies. You should now see the file hash-udf-0.0.1.jar in the folder extensions.

Loading the UDF into your ksqlDB instance

You can use Docker Compose to spin up a ksqlDB instance for testing your UDF. Please populate the file docker-compose.yml with the following content:

version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.28.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - schema-registry
    volumes:
      - ./extensions:/etc/ksqldb/ext
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksqldb"
      KSQL_KSQL_EXTENSION_DIR: "/etc/ksqldb/ext/"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksqldb/log4j.properties"
      KSQL_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.28.2
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
    environment:
      KSQL_CONFIG_DIR: "/etc/ksqldb"
    volumes:
      - ./src:/opt/app/src
      - ./test:/opt/app/test

Note that we mount the local folder extensions into the container ksqldb-server to make our UDF available to ksqlDB.

Execute the command docker compose up -d to start all services.

Using UDFs in ksqlDB

After including the JAR of the UDF into the class path of the ksqlDB instance, you can start using it.

Open a CLI for ksqlDB with the following command:

$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

When you enter DESCRIBE FUNCTION HASH;, you should see that your UDF has been correctly loaded into ksqlDB:

ksql> DESCRIBE FUNCTION HASH;

Name        : HASH
Author      : Michael
Version     : 0.0.1
Overview    : A function for applying hash algorithms to a string value.
Type        : SCALAR
Jar         : /etc/ksqldb/ext/hash-udf-0.0.1.jar
Variations  :

	Variation   : HASH(input VARCHAR, algorithm VARCHAR)
	Returns     : VARCHAR

Assuming you have already created a stream called users, you can execute the following command to hash the field password:

ksql> SELECT HASH(password, 'SHA-256') AS password_hashed FROM users EMIT CHANGES;

Please visit ksqlDB’s documentation to learn more about UDFs in ksqlDB and see how you can build tabular and aggregate functions.

Downsides of UDFs in ksqlDB

UDFs must be developed in Java

As of the time of writing this blog post, UDFs can be only written in Java. However, most data practitioners know SQL and Python. So when users have the need for an UDF in ksqlDB, they probably need to consul with a Java developer.

UDFs must be deployed as JARs

JARs are files that contain compressed versions of .class files, audio files, image files, or directories. ksqlDB reads JARs at startup time. That means, after each deployment, the entire ksqlDB instance needs to be restarted, impacting the uptime and availability of the ksqlDB instance. This is very bad for production environments, where you expect streaming pipelines to operate continuously 24/7.

UDFs are not available on Confluent Cloud

At the moment, UDFs are not available on Confluent Cloud. If you want to make use of UDFs in ksqlDB, you need to manage ksqlDB on your own.

💡 DataCater provides support for Python-based user-defined functions, which can be instantly applied without restarting any service. Try them out for free with our cloud offering.