Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
138 views
in Technique[技术] by (71.8m points)

Processing data in directories with specific date range using Spark Scala

I am trying to load Incremental Data from HDFS folder using Spark Scala code. So suppose if I have the following folders:

/hadoop/user/src/2021-01-22
/hadoop/user/src/2021-01-23
/hadoop/user/src/2021-01-24
/hadoop/user/src/2021-01-25
/hadoop/user/src/2021-01-26
/hadoop/user/src/2021-01-27
/hadoop/user/src/2021-01-28
/hadoop/user/src/2021-01-29

I am giving path /hadoop/user/src from spark-submit command then writing below code

val Temp_path: String = args(1) // hadoop/user/src
val incre_path = ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val incre_path_day = formatter format incre_path
val new_path = Temp_path.concat("/")
val path = new_path.concat(incre_path_day)

So it processes (sysdate-1) folder i.e. today's date is 2021-01-29 so it will process 2021-01-28 directory's data.

Is there any way to modify code so I can give path like hadoop/user/src/2021-01-22 and code will process data till 2021-01-28 (i.e. 2021-01-23, 2021-01-24, 2021-01-25, 2021-01-26, 2021-01-27, 2021-01-28).

Kindly suggest how should I Modify my code.

question from:https://stackoverflow.com/questions/65949874/processing-data-in-directories-with-specific-date-range-using-spark-scala

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You can use listStatus from the Hadoop FileSystem to list all the folders from the input folder and filter on the date part :

import org.apache.hadoop.fs.Path
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter

val inputPath = "hadoop/user/src/2021-01-22"

val startDate = inputPath.substring(inputPath.lastIndexOf("/") + 1)
val endDate = DateTimeFormatter.ofPattern("yyyy-MM-dd").format(ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1))

val baseFolder = new Path(inputPath.substring(0, inputPath.lastIndexOf("/") + 1))

val files = baseFolder.getFileSystem(sc.hadoopConfiguration).listStatus(baseFolder).map(_.getPath.toString)
val filtredFiles = files.filter(path => path.split("/").last > startDate &&  path.split("/").last < endDate)

// finally load only the folders you want
val df = spark.read.csv(filtredFiles: _*) 

You could also pass a PathFilter function to listStatus to filter the paths while scanning the base folder


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...