how to develop a window operation for your event time aggregation?

You can solve this problem without writing custom aggregation also. You can use when, otherwise with groupBy, agg to achieve this. e.g.

//streaming df
+-------------------+--------+-----+
|datetime           |zone    |event|
+-------------------+--------+-----+
|2019-11-06 11:32:21|Union Sq|PICK |
|2019-11-06 11:32:22|SoHo    |DROP |
|2019-11-06 11:32:23|Union Sq|DROP |
+-------------------+--------+-----+

val enrichedStreamingDf = streamingDf
  .withColumn("demand",
    when(col("event") === "PICK", lit(1))
      .when(col("event") === "DROP", lit(0))
      .otherwise(lit(0)))
  .withColumn("supply",
    when(col("event") === "PICK", lit(1))
      .when(col("event") === "DROP", lit(1))
      .otherwise(lit(0)))
  .filter(col("zone") === "Union Sq")

//enriched streaming df
+-------------------+--------+-----+------+------+
|datetime           |zone    |event|demand|supply|
+-------------------+--------+-----+------+------+
|2019-11-06 11:32:21|Union Sq|PICK |1     |1     |
|2019-11-06 11:32:23|Union Sq|DROP |0     |1     |
+-------------------+--------+-----+------+------+

val ratioDf = enrichedStreamingDf
  .withWatermark("datetime", "10 minutes")
  .groupBy(
    window($"datetime", "10 minutes", "5 minutes"),
    $"zone"
  )
  .agg((sum($"supply") / sum($"demand")).as("sup_dem_ratio"))

//supply to demand ratio streaming df
+------------------------------------------+--------+-------------+
|window                                    |zone    |sup_dem_ratio|
+------------------------------------------+--------+-------------+
|[2019-11-06 11:25:00, 2019-11-06 11:35:00]|Union Sq|2.0          |
|[2019-11-06 11:30:00, 2019-11-06 11:40:00]|Union Sq|2.0          |
+------------------------------------------+--------+-------------+
  

In this example, I have taken total duration of window as “10 minutes”, you can use “1 hour” for your case.

CLICK HERE to find out more related problems solutions.

Leave a Comment

Your email address will not be published.

Scroll to Top