Spark UDF (User defined functions) can be powerful tools if used properly. Many times, though, use of UDF could cause performance issue, even if the code itself looks perfect fine.

We have recently run into a performance issue caused by a pyspark UDF:

def intersect(a, b):
if not a or not b:
return []
return list(set(b) & set(a))

def intersect_udf(data_list):
return udf(lambda l: intersect(l, data_list), ArrayType(IntegerType()))

The intent of the UDF is to compute intersect of two lists. One of the list is predefined and will be passed in as literal value. The other parameter will be the value of specific column from a data frame.

The code itself looks good and all unit tests passed. But when we run this UDF on a relatively small data frame (with about 10M records with total size 440MB) on a cluster of 5 r4.2xlarge instances, it takes about 6 hours to finish.

After spending some time investigating. We found the issue is that we explicitly cast the list parameters to sets in the UDF and this conversion will be done for each record. While it works fast for small list, in our case we have over 9000 items in the list. Thus the list to set conversion takes significant time to finish.

The fix is easy:

def intersect_with_set(select_ispot_ids_set, ispot_ids_list):
if not select_ispot_ids_set or not ispot_ids_list:
return []
return list(select_ispot_ids_set & set(ispot_ids_list))

def intersect_udf_with_set(data_list):
return udf(lambda ispot_ids_list: intersect_with_set(data_list, ispot_ids_list),       ArrayType(IntegerType()))

Now the UDF takes one set and one list as parameters instead of two lists. The list to set conversion for the predefined list only happens once.

The execution time has reduced significantly from 6 hours to less than 1 minute after this change.

This is a very simple example. It shows that even small change in the UDF could make significant impact on the whole job. Spend some time to optimize your UDF, you could get huge performance gain.