We have an Apache Spark v2.4.4 cluster with three nodes (4 cores each) in standalone mode. We need to decompress and process a list of files. If we use the following code (with RDD):
SparkSession ss = SparkSession
.builder()
.appName("parallelization")
.master("spark://xxx.xxx.xxx.205:7077")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(ss.sparkContext());
List<String> filePaths = Arrays.asList("file_path_1", "file_path_2", "file_path_3",....,"file_path_15");
JavaRDD<String> filesRDD=jsc.parallelize(filePaths);
filesRDD.foreach(path -> decompress_file(path));
each job ('decompress_file') is run sequentially only on the master node, while if we use the following code (with DataFrames), we have only two nodes (not all the three) that elaborate their 'decompress_file' in parallel:
JavaRDD<Row> rowRDD = jsc.parallelize(filePaths).map(
(String row) -> RowFactory.create(row)
);
StructType schema = DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("Path", DataTypes.StringType, false)
}
);
Dataset<Row> listDF = ss.sqlContext().createDataFrame(rowRDD, schema).toDF();
listDF.foreach(path -> decompress_file(path.toString());
The entire batch is submitted in client mode.
How can we have all the three nodes busy with the 'decompress_file' jobs and why do we have so different behavior between RDDs and DataFrames ?
question from:
https://stackoverflow.com/questions/65951819/parallel-vs-sequential-foreach-execution-in-apache-spark-standalone-cluster 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…