Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
715 views
in Technique[技术] by (71.8m points)

apache-spark - 使用结构化流聚合流静态内部联接的输出(Aggregation on output of Stream-Static Inner Joins using Structured Streaming)

This problem pertains to Spark 2.4.4.

(此问题与Spark 2.4.4有关。)

I am doing a Stream-static inner join and getting the result as :-

(我正在做一个流静态内部联接,并得到的结果是:-)

val orderDetailsJoined = orderItemsDF.join(ordersDF, Seq("CustomerID"), joinType = "inner")

+----------+-------+------+---------+--------+--------+------------+-----------------------+---------------------+---------------+-----------------------+
|CustomerID|OrderID|ItemID|ProductID|Quantity|Subtotal|ProductPrice|OrderItemsTimestamp    |OrderDate            |Status         |OrdersTimestamp        |
+----------+-------+------+---------+--------+--------+------------+-----------------------+---------------------+---------------+-----------------------+
|2         |33865  |84536 |957      |1       |299.98  |299.98      |2019-11-30 18:29:17.893|2014-02-18 00:00:00.0|COMPLETE       |2019-11-30 18:29:19.331|
|2         |33865  |84537 |1073     |1       |199.99  |199.99      |2019-11-30 18:29:17.893|2014-02-18 00:00:00.0|COMPLETE       |2019-11-30 18:29:19.331|
|2         |33865  |84538 |502      |1       |50.0    |50.0        |2019-11-30 18:29:17.893|2014-02-18 00:00:00.0|COMPLETE       |2019-11-30 18:29:19.331|

Where "orderItemsDF" is streaming DataFrame and "ordersDF" is a static DF.

(其中“ orderItemsDF”正在流式传输 DataFrame,而“ ordersDF”是静态 DF。)

Now, I am trying to group the result by "CustomerID" and "OrderID", like this :

(现在,我试图按“ CustomerID”和“ OrderID”对结果进行分组,如下所示:)

val aggResult = orderDetailsJoined.withWatermark("OrdersTimestamp", "2 minutes").
      groupBy(window($"OrdersTimestamp", "1 minute"), $"CustomerID", $"OrderID").
      agg(sum("Subtotal")).
      select(col("CustomerID"), col("OrderID"), col("sum(Subtotal)").alias("Total Amount"))

But this gives me blank output when I try to see the results as :

(但是当我尝试查看结果时,这给了我空白的输出:)

val res = aggResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime("20 seconds")).option("truncate", "false").start()
res.awaitTermination()

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------+------------+
|CustomerID|OrderID|Total Amount|
+----------+-------+------------+
+----------+-------+------------+

If I do,

(如果我做,)

res.explain(true)

It says: No physical plan. Waiting for data.

(它说: No physical plan. Waiting for data.) No physical plan. Waiting for data.

Please help!!!

(请帮忙!!!)

  ask by Sarfaraz Hussain translate from so

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...