Lessons Learned Using Spark Structured Streaming
Recently, I had to update a Spark application using Spark Streaming to Spark Structured Streaming. During this process, there were some parts I couldn’t find documentation or examples on. I decided to write about those parts, someone(probably future me) will find them useful.
Reading from Kafka
Spark has a good guide for integration with Kafka. However, some parts were not easy to grasp.
Deserializing records from Kafka was one of them. When using Spark Structured Streaming to read from Kafka, the developer has to handle deserialization of records. By default, records are deserialized as String or Array[Byte]
. However, if your records are not in either of these formats, you have to perform deserialization in Dataframe operations.
Here is a code to stream data from Kafka:
val topic : String = ???
val bootStrapServers: String = ???
val sparkSession: SparkSession = ???
val dataFrame: DataFrame = sparkSession.readStream
.format("kafka")
.option("subscribe", topic)
.option( "kafka.bootstrap.servers", kafkaConfig.bootstrapServers )
.option("startingOffsets", "earliest")
.options(extraParams)
.load()
The schema of the DataFrame created from the streaming source is:
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
As you can see from the schema, the key and value are Array[Byte]
. In order to modify them to our desired type, we have to deserialize them.
I created a generic method that can be used for deserializing key K
and values V
from a DataFrame
.
import org.apache.kafka.common.serialization.Deserializer
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try
def kafkaReader\[K: TypeTag: Encoder, V: TypeTag: Encoder\](
topic: String,
bootStrapServers: String,
sparkSession: SparkSession,
keyDeserializer: Deserializer\[K\],
valueDeserializer: Deserializer\[V\]
): Dataset\[(K, V)\] = {
import sparkSession.implicits.\_
sparkSession.readStream
.format("kafka")
.option("subscribe", topic)
.option("kafka.bootstrap.servers", bootstrapServers)
.load()
.selectExpr("key", "value") // Selecting only key & value
.as\[(Array\[Byte\], Array\[Byte\])\]
.flatMap {
case (key, value) =>
for {
deserializedKey <- Try {
keyDeserializer.deserialize(topic, key)
}.toOption
deserializedValue <- Try {
valueDeserializer.deserialize(topic, value)
}.toOption
} yield (deserializedKey, deserializedValue)
}
}
Note that the deserializer has to be serializable, else the spark executor will fail with a org.apache.spark.SparkException: Task not serializable error
. An easy way to achieve this is to extend deserializer with Serializable.
For e.g
val keyDeserializer: StringDeserializer = new StringDeserializer with Serializable
Writing data to Kafka
Writing data to Kafka in Spark Structured Streaming is quite similar to reading from Kafka. The developer has to serialize the data to either Array[Byte]
or String
before writing.
Here is a generic function to stream a Dataset
of Tuple2[K,V]
to Kafka:
import org.apache.kafka.common.serialization.Serializer
import org.apache.spark.sql.\_
import org.apache.spark.sql.streaming.DataStreamWriter
def writeToKafka\[K, V\](
topic: String,
bootStrapServers: String,
stream: Dataset\[(K, V)\],
keySerializer: Serializer\[K\],
valueSerializer: Serializer\[V\],
kafkaConfig: KafkaConfig
): StreamingQuery = {
import stream.sparkSession.implicits.\_
stream.map { case ((key, value)) =>
(keySerializer.serialize(topic, key), valueSerializer.serialize(topic, value))
}
.selectExpr("\_1 as key", "\_2 as value")
.writeStream
.format("kafka")
.outputMode("append")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", topic)
.start()
}
Also, a few things to note:
-
The serialisers also have to be Serialisable. An easy option is also extending
Serializable
. - TheDataset/DataFrame
to be inserted to Kafka needs to havekey
andvalue
columns which will be mapped as key and value for KafkaProducerRecord
respectively.
Writing to Cassandra
Writing a dataset stream to Cassandra is quite easy. However, a few things to note:
- org.apache.spark.sql.cassandra
format does not support writing streams but batch writing. - Spark Cassandra connector [quotes column names](https://stackoverflow.com/a/56819915). Therefore, if you are writing a Dataset
with camel-case column names, you will need to rename those columns to lower-case. Else you will get errors like java.util.NoSuchElementException: Columns not found in table
.
Here is an example function writing a dataset stream to Cassandra:
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.cassandra.\_
import org.apache.spark.sql.streaming.DataStreamWriter
def cassandraSinkWriter\[T\](
keyspace: String,
table: String,
stream: stream: Dataset\[T\]
): DataStreamWriter\[T\] =
stream.writeStream
.foreachBatch { (batchDF: Dataset\[T\], \_: scala.Long) =>
batchDF
.toDF(batchDF.columns map (\_.toLowerCase): \_\*) // transforming column names to lowercase
.write
.cassandraFormat(table, keyspace)
.save()
}
I hope you find these snippets useful. Please comment if you have questions, suggestions or need more clarification.