Alternate or better approach to aggregateByKey in pyspark RDD

Here simulated input and if the min and max is filled in correctly, then why the need for the indicator TMIN, TMAX? Indeed no need for an accumulator.

rdd = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmax', 7), ('s0','tmax',14), ('s0','tmin', 3)  ])
rddcollect = rdd.collect()
#print(rddcollect)

rdd2 = rdd.map(lambda x:  (x[0], x[2]))
#rdd2collect = rdd2.collect()
#print(rdd2collect)

rdd3 = rdd2.groupByKey().sortByKey()
rdd4 = rdd3.map(lambda k_v: ( k_v[0], (sorted(k_v[1])))  )
rdd4.collect()

returns:

Out[27]: [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [0, 7])]

ALTERNATE ANSWER

  • after clarification
  • assuming that min and max values make sense
  • with my own data
  • there are other solutions BTW

Here goes:

include = ['tmin','tmax']

rdd0 = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmin',-12), ('s2','tmax', 7), ('s2','tmax', 17), ('s2','tother', 17), ('s0','tmax',14), ('s0','tmin', 3)  ])
rdd1 = rdd0.filter(lambda x: any(e in x for e in include) )
rdd2 = rdd1.map(lambda x:  ( (x[0],x[1]), x[2]))
rdd3 = rdd2.groupByKey().sortByKey()
rdd4Min = rdd3.filter(lambda k_v: k_v[0][1] == 'tmin').map(lambda k_v: ( k_v[0][0], min( k_v[1]  ) ))
rdd4Max = rdd3.filter(lambda k_v: k_v[0][1] == 'tmax').map(lambda k_v: ( k_v[0][0], max( k_v[1]  ) ))
rdd5=rdd4Min.union(rdd4Max)
rdd6 = rdd5.groupByKey().sortByKey()
res = rdd6.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))))
rescollect = res.collect()
print(rescollect)

returns:

[('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [-12, 17])]

CLICK HERE to find out more related problems solutions.

Leave a Comment

Your email address will not be published.

Scroll to Top