usage of broadcast variables when using only the spark-sql api

How do broadcast-variables work internally?

The broadcasted data is serialized and physically moved to all executors. According to the documentation on Broadcast Variables, it says

“Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.”

Is there a way to take advantage of this mechanism in Spark-SQL?

Yes, there is a way to take advantages. Spark applied by default a Broadcast Hash Join when joining a big and small Dataframe.

According to the book “Learning Spark – 2nd edition”, it says:

“By default Spark will use a broadcast join if the smaller data set is less then 10MB. This configuration is set in spark.sql.autoBroadcastJoinThreshold; you can decrease or increase the size depending on how much memory you have on each executor and in the driver.”

In your case you need to list all unique allowedValues into a simple DataFrame (dataframe called allowedeValuesDF) with only one column (column called allowValues) and apply a join to filter your dataframe.

Something like this:

import org.apache.spark.sql.functions.broadcast
val result = dataframe.join(broadcast(allowedValuesDF), "mycol === allowedValues")

Actually, you could leave out the broadcast as Spark will do a broadcast join by default.

Edit:

In later versions of Spark you could also use join hints in the SQL syntax to tell the execution engine which strategies to use. Details are provided in the SQL Documentation and an example is provided below:

-- Join Hints for broadcast join 
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

CLICK HERE to find out more related problems solutions.

Leave a Comment

Your email address will not be published.

Scroll to Top