can i compute per row aggregations over rows that satisfy a condition using pyspark?

This can be done with window functions and higher order functions. It should be more efficient than UDF.

df.withColumn("value", col("value").cast("int")) \
    .withColumn("values", collect_list("value").over(Window.partitionBy("name"))) \
    .withColumn("in_range", expr("filter(values, v -> abs(v - value) <= 3)")) \
    .withColumn("n_vals_in_rng", size(col("in_range"))) \
    .withColumn("avg_val_in_rng",
                expr("aggregate(in_range, 0, (acc, value) -> value + acc, acc -> acc / n_vals_in_rng)")) \
    .select("name", "value", "n_vals_in_rng", "avg_val_in_rng") \
    .show()

You can read more about filter and aggregate functions here: https://spark.apache.org/docs/latest/api/sql/

CLICK HERE to find out more related problems solutions.

Leave a Comment

Your email address will not be published.

Scroll to Top