Here simulated input and if the min and max is filled in correctly, then why the need for the indicator TMIN, TMAX? Indeed no need for an accumulator.
rdd = sc.parallelize([ ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmax', 7), ('s0','tmax',14), ('s0','tmin', 3) ])
rddcollect = rdd.collect()
#print(rddcollect)
rdd2 = rdd.map(lambda x: (x[0], x[2]))
#rdd2collect = rdd2.collect()
#print(rdd2collect)
rdd3 = rdd2.groupByKey().sortByKey()
rdd4 = rdd3.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))) )
rdd4.collect()
returns:
Out[27]: [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [0, 7])]
ALTERNATE ANSWER
- after clarification
- assuming that min and max values make sense
- with my own data
- there are other solutions BTW
Here goes:
include = ['tmin','tmax']
rdd0 = sc.parallelize([ ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmin',-12), ('s2','tmax', 7), ('s2','tmax', 17), ('s2','tother', 17), ('s0','tmax',14), ('s0','tmin', 3) ])
rdd1 = rdd0.filter(lambda x: any(e in x for e in include) )
rdd2 = rdd1.map(lambda x: ( (x[0],x[1]), x[2]))
rdd3 = rdd2.groupByKey().sortByKey()
rdd4Min = rdd3.filter(lambda k_v: k_v[0][1] == 'tmin').map(lambda k_v: ( k_v[0][0], min( k_v[1] ) ))
rdd4Max = rdd3.filter(lambda k_v: k_v[0][1] == 'tmax').map(lambda k_v: ( k_v[0][0], max( k_v[1] ) ))
rdd5=rdd4Min.union(rdd4Max)
rdd6 = rdd5.groupByKey().sortByKey()
res = rdd6.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))))
rescollect = res.collect()
print(rescollect)
returns:
[('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [-12, 17])]
CLICK HERE to find out more related problems solutions.