Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
402 views
in Technique[技术] by (71.8m points)

amazon emr - How to run PySpark Structured Streaming Application on AWS EMR?

How can I run PySpark Streaming Application in EMR?

I am using spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 dwh.py this command to run on emr but it stucks. I've used the same code working in my local machine.

I am reading from kafka topic and write stream into console, also tried to write stream to kafka topic but no luck.

Here is the snippet:

if __name__ == "__main__":
    conf = SparkConf().setAppName("UserOrders RDD kafka")
    sc   = SparkContext(conf=conf)
    ssc  = StreamingContext(sc, 2)
    ssc.checkpoint("Checkpoint")
    spark = SparkSession 
        .builder 
        .appName('kafka') 
        .config("spark.eventLog.enabled", "false") 
        .config("spark.driver.memory", "4g") 
        .config("spark.executor.memory", "4g") 
        .master("local[2]") 
        .getOrCreate()
    spark.sparkContext.setLogLevel('ERROR')

    df = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers", "kafka:9092") 
        .option("startingOffsets", "latest") 
        .option("subscribe", "users") 
        .load()

    castdf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    castdf.printSchema()

    
    getmsg = aggr_users 
        .writeStream 
        .trigger(processingTime='5 seconds') 
        .outputMode("update") 
        .format("console") 
        .start().awaitTermination()

    # result = aggr_users.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") 
    #     .writeStream 
    #     .format("kafka") 
    #     .outputMode("append") 
    #     .option("kafka.bootstrap.servers", "kafka:9092") 
    #     .option("topic", "user_aggregate_demo") 
    #     .option("checkpointLocation", "/tmp/vaquarkhan/checkpoint") 
    #     .start() 
    #     .awaitTermination()
question from:https://stackoverflow.com/questions/65644968/how-to-run-pyspark-structured-streaming-application-on-aws-emr

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...