# Alternate or better approach to aggregateByKey in pyspark RDD

Here simulated input and if the min and max is filled in correctly, then why the need for the indicator TMIN, TMAX? Indeed no need for an accumulator.

``````rdd = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmax', 7), ('s0','tmax',14), ('s0','tmin', 3)  ])
rddcollect = rdd.collect()
#print(rddcollect)

rdd2 = rdd.map(lambda x:  (x[0], x[2]))
#rdd2collect = rdd2.collect()
#print(rdd2collect)

rdd3 = rdd2.groupByKey().sortByKey()
rdd4 = rdd3.map(lambda k_v: ( k_v[0], (sorted(k_v[1])))  )
rdd4.collect()
``````

returns:

``````Out[27]: [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [0, 7])]
``````

• after clarification
• assuming that min and max values make sense
• with my own data
• there are other solutions BTW

Here goes:

``````include = ['tmin','tmax']

rdd0 = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmin',-12), ('s2','tmax', 7), ('s2','tmax', 17), ('s2','tother', 17), ('s0','tmax',14), ('s0','tmin', 3)  ])
rdd1 = rdd0.filter(lambda x: any(e in x for e in include) )
rdd2 = rdd1.map(lambda x:  ( (x[0],x[1]), x[2]))
rdd3 = rdd2.groupByKey().sortByKey()
rdd4Min = rdd3.filter(lambda k_v: k_v[0][1] == 'tmin').map(lambda k_v: ( k_v[0][0], min( k_v[1]  ) ))
rdd4Max = rdd3.filter(lambda k_v: k_v[0][1] == 'tmax').map(lambda k_v: ( k_v[0][0], max( k_v[1]  ) ))
rdd5=rdd4Min.union(rdd4Max)
rdd6 = rdd5.groupByKey().sortByKey()
res = rdd6.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))))
rescollect = res.collect()
print(rescollect)
``````

returns:

``````[('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [-12, 17])]
``````