Kafka Connections¶
This document walks through installing Kafka and then connecting to the ZTF alert stream via 2 different methods:
Console Consumer: Command-line consumer (installed with Confluent Platform) that prints alert content to stdout; useful for testing the connection.
Kafka Connectors: Plugins that listen to a stream and route the message to another service. Our consumer is a Kafka -> Pub/Sub connector that simply passes the bytes through (no data decoding or conversions).
Console Consumer (useful for testing the connection)
Kafka Connectors (run a consumer and route the messages to another service)
Pre-configured instance¶
The example code that follows creates a Compute Engine (CE) instance called kafka-consumer-test and then installs and configures both methods to listen to the ZTF stream. (ZTF auth files are required, but not provided here.) There is an existing CE instance, kafka-consumer, that has been setup following this example. You(*) can log into it and test or use the methods described here to connect to ZTF without having to install or configure anything (see the “Run” sections below; you could also take advantage of the installed software and auth files, but create/configure your own working directory). It is not a Production instance. The command line commands to access kafka-consumer are:
# first start the instance
gcloud compute instances start kafka-consumer --zone us-central1-a
# then log in
gcloud beta compute ssh kafka-consumer --zone us-central1-a
# make sure you STOP the instance when you are done
# so that we don't pay for it to run
gcloud compute instances stop kafka-consumer --zone us-central1-a
If you need permissions to access it or use sudo
, you should be able
to grant them in the IAM section of the GCP Console. I can help if you
get stuck.
(*) Assuming “you” are a PGB member with access to the GCP project.
Install Kafka (Confluent Platform) manually¶
Confluent Platform is a collection of tools (including Kafka) to run and manage data streams. In some sense, installing the full platform is overkill (listening to a stream requires fewer tools than producing a stream). However, it’s worth it:
This is a (the?) standard way to install Kafka, so it becomes easier to follow online examples/tutorials and to troubleshoot with ZTF folks;
The tasks we need to accomplish (testing and running connections) run smoothly using Confluent Platform (the same cannot be said of other methods I and we have tried); and
It’s easy to imagine needing some of the other components in the package down the road.
Instruction Links:
See the file at code path broker/consumer/vm_install.sh for a quick list of the commands required for steps 2 and 3. (this file is used to set up the production instance ztf-consumer).
(Optional) Create a Compute Engine VM instance (Debian 10):
# configs
instancename=kafka-consumer-test
machinetype=e2-standard-2
zone=us-central1-a
# create the instance
gcloud compute instances create ${instancename} \
--zone=${zone} \
--machine-type=${machinetype} \
--scopes=cloud-platform \
--metadata=google-logging-enabled=true \
--tags=ztfport # firewall rule, opens port used by Kafka/ZTF
# log in
gcloud compute ssh ${instancename} --zone=${zone}
Install Java and the Java Development Kit (JDK).
Debian 10 instructions are at the link above.
From that page you can select different versions or distributions.
I used the “Default” OpenJDK option.
Be sure to set the
JAVA_HOME
environment variable; instructions at the bottom of the page.
Install the Confluent Platform. This installs Kafka + additional tools.
Follow the instructions in in the “Get the Software” section of the Confluent Platform link above.
See links on LHS of the page for RHEL, CentOS, or Docker installs.
Console Consumer¶
kafka-console-consumer.sh is a command line utility that creates a consumer and prints the messages to the terminal. It is useful for testing the connection.
Configure for ZTF access¶
The following instructions are pieced together from:
info I got from Christopher Phillips over phone/email.
Find out where Kafka is installed. On the VM using Marketplace, it is in /opt/kafka. On the VM using manual install of Confluent Platform, components are scattered around a bit; look in:
/etc/kafka (example properties and config files)
/bin (e.g., for kafka-console-consumer and confluent-hub)
The following assumes we are on the VM with Confluent Platform.
Create a working directory. In the following I use /home/ztf_consumer
This requires two authorization files (not provided here):
krb5.conf, which should be at /etc/krb5.conf
pitt-reader.user.keytab. I store this in the directory /home/ztf_consumer; we need the path for config below.
Create kafka_client_jaas.conf in your working directory containing the following (change the keyTab path if needed):
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKeyTab=true
debug=true
serviceName="kafka"
keyTab="/home/ztf_consumer/pitt-reader.user.keytab"
principal="pitt-reader@KAFKA.SECURE"
useTicketCache=false;
};
Make sure there are no extra spaces at the ends of the lines, else the connection will not succeed.
Set an environment variable so Java can find the file we just created:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/ztf_consumer/kafka_client_jaas.conf"
Setup the Kafka config file consumer.properties. Sample config files are provided with the installation in /opt/kafka/config/ (Marketplace VM) or /etc/kafka/ on the manual install VM. Create a consumer.properties file in your working directory that contains the following:
bootstrap.servers=public2.alerts.ztf.uw.edu:9094
group.id=group
session.timeout.ms=6000
enable.auto.commit=False
sasl.kerberos.kinit.cmd='kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}'
sasl.kerberos.service.name=kafka
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
auto.offset.reset=earliest
Run the Kafka Console Consumer¶
The following assumes we are using the manual install VM.
# make sure the KAFKA_OPTS env variable is set
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/ztf_consumer/kafka_client_jaas.conf"
# Set the topic and run the console consumer
topicday=20210105 # yyyymmdd, must be within 7 days of present
cd /bin
./kafka-console-consumer \
--bootstrap-server public2.alerts.ztf.uw.edu:9094 \
--topic ztf_${topicday}_programid1 \
--consumer.config /home/ztf_consumer/consumer.properties
# final argument should point to the consumer.properties file created above
After a few moments, if the connection is successful you will see
encoded alerts printing to stdout. Use control-C
to stop
consuming.
Kafka Connectors¶
Kafka connectors run a Kafka consumer and route the messages to another service.
General Configuration and ZTF Authentication¶
The following uses instructions at:
Create a directory to store the connectors (plugins):
mkdir /usr/local/share/kafka/plugins
To use connectors, the .properties file called when running the consumer/connector must include the following:
plugin.path=/usr/local/share/kafka/plugins
Create a working directory. In the following I use /home/ztf_consumer
Two authorization files are required:
krb5.conf, which should be at /etc/krb5.conf
pitt-reader.user.keytab. I store this in the directory /home/ztf_consumer; we need the path for config below.
Pub/Sub Connector¶
We use a Kafka-Pub/Sub connector (kafka-connector) that is maintained by Pub/Sub developers. There is another connector managed by Confluent (here) but it only supports a Pub/Sub source (i.e., Pub/Sub -> Kafka), we need a Pub/Sub sink.
We pass the alert bytes straight through to Pub/Sub without decoding or converting them.
Install and Configure¶
The following instructions were pieced together from:
Installation:
the copy_tool.py file provided with connector (see the repo )
Configuration:
Worker configuration:
Connector configuration:
Example config files, which you can find at:
/etc/kafka/connect-standalone.properties
/etc/kafka/connect-distributed.properties
cps-sink-connector.properties (link)
The connector can be configured to run in “standalone” or “distributed” mode. Distributed is recommended for production environments, partly due to its fault tolerance. I initially tried distributed, but: a) I got confused about where to put the connector configs, and b) I’m not totally clear on what the distributed-specific worker options are and what they do. Starting with standalone mode for the following (but we should probably switch at some point):
Install
# navigate to the directory created above to store connectors
cd /usr/local/share/kafka/plugins
# download the .jar file
CONNECTOR_RELEASE=v0.5-alpha
sudo wget https://github.com/GoogleCloudPlatform/pubsub/releases/download/${CONNECTOR_RELEASE}/pubsub-kafka-connector.jar
# now the plugin is installed
Configure
Worker configuration
# navigate to the working directory created when configuring Kafka for ZTF
cd /home/ztf_consumer
Create a file called psconnect-worker.properties containing the following:
plugin.path=/usr/local/share/kafka/plugins
# ByteArrayConverter provides a “pass-through” option that does no conversion
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
# offset.flush.interval.ms=10000
# workers need to use SASL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKeyTab=true \
serviceName="kafka" \
keyTab="/home/ztf_consumer/pitt-reader.user.keytab" \
principal="pitt-reader@KAFKA.SECURE" \
useTicketCache=false;
# connecting to ZTF
bootstrap.servers=public2.alerts.ztf.uw.edu:9094
# group.id=group
# session.timeout.ms=6000
# enable.auto.commit=False
# sasl.kerberos.kinit.cmd='kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}'
consumer.auto.offset.reset=earliest
consumer.sasl.mechanism=GSSAPI
consumer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKeyTab=true \
serviceName="kafka" \
keyTab="/home/ztf_consumer/pitt-reader.user.keytab" \
principal="pitt-reader@KAFKA.SECURE" \
useTicketCache=false;
Connector configuration
Create a file in your working directory called ps-connector.properties containing the following:
name=ps-sink-connector-ztf
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
# set ZTF Kafka the topic
topics=ztf_20210107_programid1
# set our Pub/Sub topic and configs
cps.topic=ztf_alert_data-kafka_consumer
cps.project=ardent-cycling-243415
# include Kafka topic, partition, offset, timestamp as msg attributes
metadata.publish=true
Run the Pub/Sub Connector¶
cd /bin
# if you want to leave it running and disconnect your terminal from the VM:
screen
# if needed, change the Kafka/ZTF topic (must be within 7 days of present)
# or other configs in the .properties files called below
./connect-standalone \
/home/ztf_consumer/psconnect-worker.properties \
/home/ztf_consumer/ps-connector.properties
This will start up a Kafka consumer and route the messages to Pub/Sub. After a few minutes, if it is working correctly, you will see log messages similar to
INFO WorkerSinkTask{id=ps-sink-connector-ztf-0} Committing offsets asynchronously using sequence number 3
and messages streaming into the Pub/Sub topic ztf_alert_data-kafka_consumer.
BigQuery Connector¶
This exists and it is free (some connectors require a Confluent Enterprise License), but I haven’t actually tried it.
One question that I haven’t been able to find the answer to is this: If we run two Kafka connectors, does that create two separate connections to ZTF, or do both connectors use the same incoming stream? We could just install this and try it; I just haven’t done it yet. I’m guessing it would be bad form (cost more money on both ends) to pull in two connections every night.