Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

apache spark - Read file path from Kafka topic and then read file and write to DeltaLake in Structured Streaming

I have a use case where the file path of the json records stored in s3 are coming as a kafka message in kafka. I have to process the data using spark structured streaming.

The design which I thought is as follows:

  1. In kafka Spark structured streaming, read the message containing the data path.
  2. Collect the message record in driver. (Messages are small in sizes)
  3. Create the dataframe from the data location.
kafkaDf.select($"value".cast(StringType))
       .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
  //rough code
  //collect to driver
  val records = batchDf.collect()
  //create dataframe and process
  records foreach((rec: Row) =>{
    println("records:######################", rec.toString())
    val path = rec.getAs[String]("data_path")
    val dfToProcess = spark.read.json(path)
    ....
  })
}

I would like to know your views, if this approach is fine? Specifically if there is some problem with creating the Dataframe after calling collect. If there is any better approach, please let me know the same.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Your idea works perfectly fine.

Actually, it is mandatory to collect your Dataframe to the driver. Otherwise, you can not create a distributed dataset by calling the SparkSession on each executor. Without the collect you will end up having a NullPointerException.

I have slightly re-written your code sceleton and also implemented the part on how to write your Dataframe into a delta table (based on your other question). In addition, I am using a Dataset[String] instead of a Dataframe[Row] which makes life a bit easier.

Using Spark 3.0.1 with delta-core 0.7.0 works fine. As an example my test file looks like

{"a":"foo1","b":"bar1"}
{"a":"foo2","b":"bar2"}

I sent the location of that file to a Kafka topic called "test" and applied the following code to parse the file and write its columns (based on a given schema) into a delta table using the code below:

  val spark = SparkSession.builder()
    .appName("KafkaConsumer")
    .master("local[*]")
    .getOrCreate()

  val jsonSchema = new StructType()
    .add("a", StringType)
    .add("b", StringType)

  val deltaPath = "file:///tmp/spark/delta/test"

  import spark.implicits._
  val kafkaDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as data_path")
    .as[String]

  kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String], batchId:Long) => {
    // collect to driver
    val records = batchDf.collect()

    // create dataframe based on file location and process and write to Delta-Lake
    records.foreach((path: String) => {
      val dfToProcess = spark.read.schema(jsonSchema).json(path)
      dfToProcess.show(false) // replace this line with your custom processing logic
      dfToProcess.write.format("delta").save(deltaPath)
    })
  }).start()

  spark.streams.awaitAnyTermination()

The output of the show call is as expected:

+----+----+
|a   |b   |
+----+----+
|foo1|bar1|
|foo2|bar2|
+----+----+

and the data has been written as delta table into the location specified through deltaPath

/tmp/spark/delta/test$ ll
total 20
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
-rw-r--r-- 1 x x  595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
-rw-r--r-- 1 x x   16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

57.0k users

...