Getting started with Kafka Connector for Snowflake

Sudhendu
8 min readSep 5, 2021

Objective

The aim of this short tutorial is to get the reader up and running with Snowflake Connector for Kafka. The Snowflake documentation does a good job of explaining the concept/working of this connector. This article can be used in conjunction with the documentation.

Steps

All these steps are detailed and assume you have little to no background on how to set up either Kafka or Snowflake (to be fair, no real setup is required on Snowflake, just sign-up, and some configurations). Wherever I think there are better reference articles available, I will point you to move there, complete the steps and resume here.

Below is the agenda. The focus is to have a quick win and have a working flow ready. You can skip any section/s as per your setup.

  1. Setting up Snowflake Account
  2. Setting up Kafka (over Ubuntu WSL2)
    Installation.
    Running Kafka
    Verifying Kafka Setup
    Install and configure the Kafka Connect cluster
  3. Setting up Snowflake Connect for Kafka
    Downloading the required artifacts
    Creating the public/private key pair for authentication.
    Configuring the connector.
    Configuring the Snowflake Database Objects.
    Starting the Kafka Snowflake Connector
    Sending messages to Snowflake via the Kafka Connector
    Verifying the Kafka Message flow into the Snowflake table.
  4. Possible setup issues and remedies

Setting up Snowflake Account

You will need an account on Snowflake. The trial account works. You can sign-up for one here https://signup.snowflake.com/ (no credit card is required, but always keep a tap on how much Snowflake credit you are spending).

Setting up Kafka

Setting up Apache Kafka is straightforward. I am using a Windows machine and have installed Windows Subsystem for Linux WSL2

You can follow the article below on Confluent.io on how to set up Kafka on WSL2. The steps are detailed and easy to understand. Below I will summarize the high-level steps and commands for Kafka setup.

Kafka installation:

Steps to install Kafka (once you are ready with WSL2):

Please verify the Kafka Connect vs Kafka version compatibility before proceeding with the installation.

From Snowflake Documentation
sudo apt-get update && sudo apt-get upgrade -y#get your java
sudo apt install openjdk-8-jdk -y
wget https://ftp.wayne.edu/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
#NOTE: this is a older version of Kafka, please see the contability matrix above for version of Kafka vs Snowflake Kafka Connect
tar -xzf kafka_2.13-2.6.0.tgz
cd kafka_2.13-2.6.0

Running Kafka:

Follow these steps to run Kafka:

start zookeeper (in the latest version of Apache Kafka, you don't need Zookeeper);
start Kafka;
create a Topic;
create a producer.

Please note you have to do all these steps in the Ubuntu Linux that you have just installed on the WSL2.

bin/zookeeper-server-start.sh config/zookeeper.properties#Once Zookeeper has started, you can run the below command in new Ubuntu console
bin/kafka-server-start.sh config/server.properties

Tip: Download the Windows Terminal. Best way to work with multiple command windows.

Verifying Kafka

You can verify from the logs if your Kafka instance is up and running. If there are any errors, you might want to debug and fix that before moving forward.

To test, we will use the command-line producer and consumer to test our Kafka instance and see if we are able to send and receive messages on the Topic we have created.

#Create a Kafka Topic called TEST_KAFKAbin/kafka-topics.sh --create --topic TEST_KAFKA --bootstrap-server localhost:9092#Create a Kafka Producer for the Topic TEST_KAFKAbin/kafka-console-producer --broker-list localhost:9092 --topic TEST_KAFKA#Open a new Ubuntu window and start a Kafka consumer for the Topic TEST_KAFKAbin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --topic TEST_KAFKA

Setting up Snowflake Connector for Kafka

Snowflake provides very good documentation on how to set up Kafka Connector for Snowflake. I managed to complete the entire setup by just following the documentation, which means it is good enough :)

For this tutorial, we are just going to create a working example with the least possible customization. You can refer to the documentation of details on individual parameters, steps, etc.

  1. Downloading the required artifacts

You can either use a confluent version of Kafka or the Open-Source Software (OSS) Apache Kafka package. For this setup, I am using the OSS package.

Download the below (3 jars):

Kafka Snowflake connector (check for version): https://mvnrepository.com/artifact/com.snowflake

BouncyCastle Crypotgrahpy library:
https://mvnrepository.com/artifact/org.bouncycastle/bc-fips/1.0.1
https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-fips/1.0.3

Place all the downloaded jars in the /lib/ folder of your Kafka setup.

2. Creating the public/private key pair for authentication

To authenticate, the Snowflake connector only accepts key pair authentication (instead of basic authentication).

You can easily generate the public-private key pair using OpenSSL. The public key is assigned to the Snowflake user defined in the configuration file.

#Create a private key in  some temporary location. Remember the password that you type. We are going to use this value in our config file below.openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out C:\tmp\cert\new_rsa_key_v1.p8#Create a public key using the above private key. We are going to update our Snowflake user to have this public key.openssl rsa -in C:\tmp\cert\new_rsa_key_v1.p8 -pubout -out C:\tmp\cert\new_rsa_key_v1.pub

Expected Output:

3. Configuring the connector.

For our example, we are going to configure the Kafka Connector in Standalone mode (in contrast to distributed mode. More info here).

a) Create a file name SF_connect.properties and save it in your Kafka /config/ folder.

b) Edit the file and copy-paste the following configuration parameter.

Note: The following properties are specific to your instance and hence need to be updated in accordance with your setup.

#your Snowflake instance ID. Please see the first part of the url.
snowflake.url.name=acb1234.snowflakecomputing.com:443
#your account username
snowflake.user.name=thisismyusername
#privatekey you generated in last step
snowflake.private.key= #place-your-private-key-here-.-no-linefeed-
everything-should-be-in-one-line
#passphrase you used to generate the private key
snowflake.private.key.passphrase= #the-password-you-entered-while-creating-private-key

4. Configuring the Snowflake Database Objects.

Database objects with access details

There are a couple of steps you need to do to ensure the right database objects are available along with the appropriate access for Kafka Connect to work.

You can use the following database script as-is* and execute it on your Snowflake Worksheet.

*Please note the comments (!!!!!!) where you need to make changes.

5. Starting the Kafka Snowflake Connector

The next step is to start the connector. This is probably where you will face most of the issues. But hopefully, you can debug and resolve as the error logs are very explanatory. I have also listed the errors that I encountered and the fix that I did in the last section.

Command to start Kafka Standalone connect for Snowflake:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties#Example:
bin/connect-standalone.sh /home/sudh/kafka_2.13-2.6.0/config/connect-standalone.properties /home/sudh/kafka_2.13-2.6.0/config/SF_connect.properties

On successful startup of the connector, you should see something like the below. Ensure that there is no ERROR log in the startup by scrolling through the logs.

6. Sending messages to Snowflake via the Kafka Connector

Almost there!

Made it this far ha! Amazing! We can now start sending messages on the Kafka Topic and if everything is set up right, we should be seeing our messages landing up in the Kafka table we have created (one Topic is linked to one table).

Important stuff to know, every Snowflake table loaded by the Kafka connector has a schema consisting of at least two VARIANT columns:

  • RECORD_CONTENT. This contains the Kafka message.
  • RECORD_METADATA. This contains metadata about the message, for example, the Topic from which the message was read.

Let us start by creating a new Topic (the same name we gave in our connect config properties file (SF_connect.properties) above). Once the Topic is created, we can start the producer on the Topic.

Below are the commands for the creation and starting producer (use as is)

#Create the Topic
bin/kafka-topics.sh --create --topic snowflake_in_topic --bootstrap-server localhost:9092
#Start the producer
bin/kafka-console-producer --broker-list localhost:9092 --topic snowflake_in_topic

At the producer prompt, you can send a sample JSON data given below (or anything you have):

{“order_id”: {“int”: 1212}, “customer_id”: {“string”: abcd}, “order_ts”: {“int”: 333333}, “order_total_usd”: {“double”: 3.8900000000000001}, “item”: {“string”: “Wainwright”}}

At this point, go and check your Kafka Connector console window, it provides some logs like below:

[SF_KAFKA_CONNECTOR] init pipe: SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_PIPE_KAFKA_TABLE_IN_0 (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)
[2021-09-05 20:02:11,563] INFO
[SF_KAFKA_CONNECTOR] Using existing table KAFKA_TABLE_IN. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)
[2021-09-05 20:02:12,590] INFO
[SF_KAFKA_CONNECTOR] list stage SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_STAGE_KAFKA_TABLE_IN retrieved 0 file names (com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1:79)
[2021-09-05 20:02:12,590] INFO
[SF_KAFKA_CONNECTOR] Using existing stage SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_STAGE_KAFKA_TABLE_IN. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)
[2021-09-05 20:02:14,342] INFO
[SF_KAFKA_CONNECTOR] list stage SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_STAGE_KAFKA_TABLE_IN retrieved 0 file names (com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1:79)
[2021-09-05 20:02:14,344] INFO
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_PIPE_KAFKA_TABLE_IN_0, recovered from existing pipe (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)
[2021-09-05 20:02:14,345] INFO
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_PIPE_KAFKA_TABLE_IN_0: cleaner started (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)
[2021-09-05 20:02:14,346] INFO
[SF_KAFKA_CONNECTOR] pipe SNOWFLAKE_KAFKA_CONNECTOR_mykafkaconnectsnowflake_PIPE_KAFKA_TABLE_IN_0: flusher started (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)

7. Verifying the Kafka Message flow into the Snowflake table.

Login to your Snowflake Console. The URL should look something like the below:

abcd1234.snowflakecomputing.com/console#/internal/worksheet

Navigate to your Database->Schema->Table. Right-click and select Query table. You should see the one you entered.

you did it yeah!

If you made it this far, amazing! The Snowflake documentation makes it a lot easier!

Possible issues you might face and fixes

  1. Error message: unable to decode base64 string: invalid characters encountered in base64 data

Version mismatch between Kafka Connect vs Kafka: I had an older version of Kafka on my laptop (1+ years old) whereas I downloaded the latest version of Kafka Connect. There was some issue with the version (some java deserialization issue which).

2. Error message: Invalid JWT token

You have an issue with your key pair generated. Ensure you have followed step (Creating the public/private key pair for authentication) and also added the correct entry in the database for rsa_key_pub

3. Error message: Cannot perform CREATE TABLE. This session does not have a current database. Call ‘USE DATABASE’, or use a qualified name.

You have to ensure you have executed the following statement (change the default role)

ALTER USER YOURUSERNAME set DEFAULT_ROLE = KAFKA_CONNECT_ROLE;

Credit and Ref:

Header Image

Gifs

If you have any questions, please comment or head over to Snowflake Community

--

--

Sudhendu
Sudhendu

Written by Sudhendu

Mostly running, hiking. Snowflake Data Superhero. Quora Top Writer २०१४. Work @ kipi.bi. Views are my own।

Responses (3)