I am not able to find the correct way to write data to s3 from dataset spark. What should be some more configurations that I should add. Do I have to mention the AWS configurations in my code or it will pick it up from local .aws/ profile?
Please guide
import java.util.Properties;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class sparkSqlMysql {
private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(sparkSqlMysql.class);
private static final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Spark2JdbcDs")
.getOrCreate();
public static void main(String[] args) {
// JDBC connection properties
final Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "password");
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
final String dbTable = "(select * from Fielding) t";
final String dbTable1 = "(select * from Salaries) m";
final String dbTable2 = "(select * from Pitching) n";
// Load MySQL query result as Dataset
Dataset<Row> jdbcDF2 = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/lahman2016", dbTable,
connectionProperties);
Dataset<Row> jdbcDF3 = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/lahman2016", dbTable1,
connectionProperties);
Dataset<Row> jdbcDF4 = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/lahman2016", dbTable2,
connectionProperties);
jdbcDF2.createOrReplaceTempView("Fielding");
jdbcDF3.createOrReplaceTempView("Salaries");
jdbcDF4.createOrReplaceTempView("Pitching");
Dataset<Row> sqlDF = sparkSession.sql(
"select Salaries.yearID, avg(Salaries.salary) as Fielding from Salaries inner join Fielding ON Salaries.yearID = Fielding.yearID AND Salaries.playerID = Fielding.playerID group by Salaries.yearID limit 5");
Dataset<Row> sqlDF1 = sparkSession.sql(
"select Salaries.yearID, avg(Salaries.salary) as Pitching from Salaries inner join Pitching ON Salaries.yearID = Pitching.yearID AND Salaries.playerID = Pitching.playerID group by Salaries.yearID limit 5");
// sqlDF.show();
// sqlDF1.show();
sqlDF.createOrReplaceTempView("avg_fielding");
sqlDF1.createOrReplaceTempView("avg_pitching");
Dataset<Row> final_query_1_output = sparkSession.sql(
"select avg_fielding.yearID, avg_fielding.Fielding, avg_pitching.Pitching from avg_fielding inner join avg_pitching ON avg_pitching.yearID = avg_fielding.yearID");
final_query_1_output.show();
The output of the query is :
final_query_1_output.show();
+------+------------------+------------------+
|yearID| Fielding| Pitching|
+------+------------------+------------------+
| 1990| 507978.625320787| 485947.2487437186|
| 2003|2216200.9609838845|2133800.1867612293|
| 2007|2633213.0126475547|2617533.3393665156|
| 2015|3996199.5729421354| 3955581.121535181|
| 2006| 2565803.492487479| 2534756.866972477|
+------+------------------+------------------+
I want to write this dataset to s3 : how can I do that?
final_query_1_output.write().mode("overwrite").save("s3n://druids3migration/data.csv");
question from:
https://stackoverflow.com/questions/65680602/write-the-final-dataset-output-for-spark-java-into-s3 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…