This can be done with window functions and higher order functions. It should be more efficient than UDF.
df.withColumn("value", col("value").cast("int")) \
.withColumn("values", collect_list("value").over(Window.partitionBy("name"))) \
.withColumn("in_range", expr("filter(values, v -> abs(v - value) <= 3)")) \
.withColumn("n_vals_in_rng", size(col("in_range"))) \
.withColumn("avg_val_in_rng",
expr("aggregate(in_range, 0, (acc, value) -> value + acc, acc -> acc / n_vals_in_rng)")) \
.select("name", "value", "n_vals_in_rng", "avg_val_in_rng") \
.show()
You can read more about filter
and aggregate
functions here: https://spark.apache.org/docs/latest/api/sql/
CLICK HERE to find out more related problems solutions.