The UDF that you have written is correct. You only need to change the code where you actually use it. This can easily be done if you use .map
in rdd
:
#Let the udf that you have written be a normal python function
def create_label(x):
# If the length of the dictionary is less than 20, I want to return the keys of all the items in the dict.
if len(x) >= 20:
val_sort = sorted(list(x.values()), reverse = True)
cutoff = {k: v for (k, v) in x.items() if v > val_sort[20]}
return cutoff.keys()
else:
return x.keys()
The part that you need to change is:
label_df_col = ['query','prod_count_dict']
label_df = label_count_df.rdd.map(lambda x:(x.query, create_label(x.prod_count_dict))).toDF(label_df_col)
label_df.show()
This should work.
CLICK HERE to find out more related problems solutions.