Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
100 views
in Technique[技术] by (71.8m points)

How to utilize Redis while Processing a DataFrame in Spark Scala

I am new to Spark and I am trying to understand how to utilize Redis while processing a Dataframe.

I need to maintain a mutable, distributed state that all executors read from and write to while looping through a Dataframe and I would like to use Redis as the mutable distributed state.

What is the best way to accomplish this? Is it possible to access the Spark Session while looping in order to read & write to Redis?

Example of what I would like to do :

df.foreachPartition(partition => { 
    partition.foreach(row => {
    val currentCount = spark.read.format("org.apache.spark.sql.redis").schema(StructType(Array(
      StructField("id", LongType),
      StructField("count", LongType)
    )))
      .option("keys.pattern", s"id:${row.getAs("id")}")
      .load()

      if(currentCount.select("count").head().get(0) < row.getAs("maxCount")){
        ///check if row is eligable
        //update currentCount
        currentCount.write.format("org.apache.spark.sql.redis").save()
      }
    })
})
question from:https://stackoverflow.com/questions/65890277/how-to-utilize-redis-while-processing-a-dataframe-in-spark-scala

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...