Learn everything you need to know to get started with developing and using UDFs in ksqlDB.
ksqlDB is the SQL layer on top of Kafka Streams, which allows users to process data in motion declaratively. It makes data streaming more accessible to developers that do not have experience with Java or Scala (as required by Kafka Streams). ksqlDB provides a set of pre-built functions for aggregating, joining, filtering, and transforming data. For instance, you can use the following query to register the topic users
and select all users that are older than 30 years:
CREATE TABLE users (id INT PRIMARY KEY, name STRING, age INT, password STRING) WITH (kafka_topic = 'users', value_format = 'JSON', partitions = 1); SELECT * FROM users WHERE age > 30;
If you have worked with data before, especially in a production environment, you might know that most use cases have edge cases and custom requirements, which can be hardly implemented with pre-built functions. To this end, ksqlDB provides user-defined functions (UDF). UDFs are a means to extending ksqlDB’s core capabilities and increasing its ability for dealing with different kinds of use cases. In ksqlDB, UDFs must be implemented in Java and packaged as JARs. Once they have been added to the class path of the ksqlDB instance, they are available to users and can be used in queries.
ksqlDB supports different types of UDFs:
In this article, we explore the capabilities of UDFs in ksqlDB, show how to implement them, and discuss their advantages and disadvantages compared to other solutions from the data streaming space.
As an exemplary use case, we will have a look at how to extend ksqlDB with a HASH function implemented as a scalar UDF.
While ksqlDB provides a lot of functionality for common tasks, like casting data types or finding the maximum value over a time window, its capabilities are limited. To this end, ksqlDB provides UDFs. UDFs are useful when dealing with requirements that cannot be implemented with the built-in capabilities of ksqlDB.
Exemplary use cases for UDFs in ksqlDB are:
In the first step, we need to set up a new Java project. In this article, we use Gradle as the build tool. Maven might be a reasonable alternative if you prefer a different build tool.
Let us get started by specifying our dependencies in the file build.gradle
:
buildscript { repositories { jcenter() } } plugins { id "java" id "com.github.johnrengelman.shadow" version "6.0.0" } sourceCompatibility = "1.8" targetCompatibility = "1.8" version = "0.0.1" repositories { mavenCentral() jcenter() maven { url "https://packages.confluent.io/maven" } } dependencies { implementation "io.confluent.ksql:ksqldb-udf:7.3.1" implementation "org.apache.kafka:kafka_2.13:3.3.1" implementation "org.apache.kafka:connect-api:3.3.1" } apply plugin: "com.github.johnrengelman.shadow" apply plugin: "java" compileJava { options.compilerArgs << "-parameters" } shadowJar { archiveBaseName = "hash-udf" archiveClassifier = "" destinationDirectory = file("extensions") }
Next, we need to create the file src/main/java/com/example/HashUdf.java
in our Java project and fill it with the following code:
package com.example; import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @UdfDescription( name ="HASH", description = "A function for applying hash algorithms to a string value.", author = "Michael", version = "0.0.1" ) public class HashUdf { @Udf public String hash( @UdfParameter final String input, @UdfParameter final String algorithm) throws NoSuchAlgorithmException { MessageDigest messageDigest = MessageDigest.getInstance(algorithm); messageDigest.update(input.getBytes()); return new String(messageDigest.digest()); } }
After implementing the UDF we need to package it as a JAR that can be loaded into a ksqlDB instance. To this end, execute the following gradle task on your command line:
$ gradle shadowJar
The gradle task shadowJar creates a uberJar that contains your Java project including all dependencies. You should now see the file hash-udf-0.0.1.jar
in the folder extensions
.
You can use Docker Compose to spin up a ksqlDB instance for testing your UDF. Please populate the file docker-compose.yml
with the following content:
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.3.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 schema-registry: image: confluentinc/cp-schema-registry:7.3.0 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092' ksqldb-server: image: confluentinc/ksqldb-server:0.28.2 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - schema-registry volumes: - ./extensions:/etc/ksqldb/ext ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksqldb" KSQL_KSQL_EXTENSION_DIR: "/etc/ksqldb/ext/" KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksqldb/log4j.properties" KSQL_BOOTSTRAP_SERVERS: "broker:9092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" ksqldb-cli: image: confluentinc/ksqldb-cli:0.28.2 container_name: ksqldb-cli depends_on: - broker - ksqldb-server entrypoint: /bin/sh tty: true environment: KSQL_CONFIG_DIR: "/etc/ksqldb" volumes: - ./src:/opt/app/src - ./test:/opt/app/test
Note that we mount the local folder extensions
into the container ksqldb-server
to make our UDF available to ksqlDB.
Execute the command docker compose up -d
to start all services.
After including the JAR of the UDF into the class path of the ksqlDB instance, you can start using it.
Open a CLI for ksqlDB with the following command:
$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
When you enter DESCRIBE FUNCTION HASH;
, you should see that your UDF has been correctly loaded into ksqlDB:
ksql> DESCRIBE FUNCTION HASH; Name : HASH Author : Michael Version : 0.0.1 Overview : A function for applying hash algorithms to a string value. Type : SCALAR Jar : /etc/ksqldb/ext/hash-udf-0.0.1.jar Variations : Variation : HASH(input VARCHAR, algorithm VARCHAR) Returns : VARCHAR
Assuming you have already created a stream called users, you can execute the following command to hash the field password:
ksql> SELECT HASH(password, 'SHA-256') AS password_hashed FROM users EMIT CHANGES;
Please visit ksqlDB’s documentation to learn more about UDFs in ksqlDB and see how you can build tabular and aggregate functions.
As of the time of writing this blog post, UDFs can be only written in Java. However, most data practitioners know SQL and Python. So when users have the need for an UDF in ksqlDB, they probably need to consul with a Java developer.
JARs are files that contain compressed versions of .class files, audio files, image files, or directories. ksqlDB reads JARs at startup time. That means, after each deployment, the entire ksqlDB instance needs to be restarted, impacting the uptime and availability of the ksqlDB instance. This is very bad for production environments, where you expect streaming pipelines to operate continuously 24/7.
At the moment, UDFs are not available on Confluent Cloud. If you want to make use of UDFs in ksqlDB, you need to manage ksqlDB on your own.