how can i convert this sql query into pyspark code?

I would do it a bit differently. Not following your SQL but applying your business rules directly :

w = Window.partitionBy("id").orderBy("date")

df.withColumn(
    "rnk", F.row_number().over(w)
).withColumn(
    "old", F.lag(F.struct([F.col("phone_number"), F.col("email")])).over(w)
).where(
    (F.col("rnk") == 1)
    | (F.col("phone_number") != F.col("old.phone_number"))
    | (F.col("email") != F.col("old.email"))
).show()

+---+------------+-----------+----------+---+--------------------+
| id|phone_number|      email|      date|rnk|                 old|
+---+------------+-----------+----------+---+--------------------+
|  1|       12345|[email protected]|2020-01-01|  1|                null|
|  1|       23456|[email protected]|2020-01-03|  3|[12345, [email protected]]|
|  1|       34567|[email protected]|2020-01-04|  4|[23456, [email protected]]|
|  1|       12345|[email protected]|2020-01-05|  5|[34567, [email protected]]|
|  1|       45678|[email protected]|2020-01-06|  6|[12345, [email protected]]|
|  3|       78901|[email protected]|2020-01-01|  1|                null|
|  2|       56789|[email protected]|2020-01-01|  1|                null|
|  2|       56789|[email protected]|2020-01-03|  3|[56789, [email protected]]|
|  2|       67890|[email protected]|2020-01-04|  4|[56789, [email protected]]|
+---+------------+-----------+----------+---+--------------------+

NB : You can replace the test on rnk with a test on F.col("old").isNull() (and therefore, you do not have to compute the rnk)

CLICK HERE to find out more related problems solutions.

Leave a Comment

Your email address will not be published.

Scroll to Top