Shuffling isn’t just a fancy dance move! Shuffling is caused when you try to aggregate or join datasets in distributed environments like Spark or BigQuery. One time when I was working at Facebook, we had a 50 TB table joining with a 150 TB table. The shuffle caused by that join took up 30% of all of our compute! I eliminated that shuffle by bucketing the tables ahead of time and then joining!
In this article we will be going over:
Why does shuffle happen and what SQL keywords trigger shuffle and which do not?
Some techniques you can use to minimize shuffle especially in Apache Spark
Why does shuffle happen?
Shuffle happens when we have data on one machine that needs to be on another machine.
Imagine you have 10 files of data and you’re counting the number of likes by country.
If it were one file, it’d be easy, you loop through the file and build up a hash map of country → count. Simple!
For 10 files, it’s a bit more complicated. Imagine you set the shuffle partitions to 26. Then all the A countries get shuffled together, all the B countries, and so on.
Shuffling gets more expensive with scale in a non-linear fashion. So for this trivial example, it’s probably fine that it shuffles up the data and you get a nice, intuitive query that looks something like:
SELECT country, COUNT(*)
FROM like_table
GROUP BY country
This brings me to the first SQL keyword that triggers shuffle in distributed environments: GROUP BY.
The good news about GROUP BY is that its shuffling isn’t as painful as others!
There are more painful shuffling keywords than GROUP BY. The next most painful keyword is! Drum roll please…. JOIN!
So now imagine you have two data sets, each has 10 files and you need to join them on user_id. We’ll use 100 partitions to keep the math simple.
In that case, the JOIN will need to:
shuffle both the left and right side of the JOIN to make sure the right data is available for comparison
Make the comparisons to match rows (and filter rows if it’s an INNER JOIN)
So now you get the pain of shuffling times two!
But what if I told you, JOIN isn’t the worst. The worst is yet to come!
When it comes to shuffle, the absolute worst keyword to use is ORDER BY!
Why is order by so bad? If you think about global sorting, all the data has to go through a single machine to be verified as correct. Just because files 1,2,3,4,5,6,7,8,9 are all sorted, does not mean that the entire data set is sorted!
ORDER BY takes the worst parts of shuffling and forces all the data to pass through one machine. It removes the distributed part from distributed compute and you might as well just be on a single node Postgres cluster crying your eyes out!
Shuffle minimization techniques
If you’re in a distributed environment, there’s a few options to minimize shuffle.
Eliminating shuffle for GROUP BY or JOIN by bucketing your data
Bucketing your data will help eliminate shuffling for GROUP BY and JOIN
Bucketing the data causes shuffle (but usually less than a JOIN). So this technique is shuffle-free. It works best when you’re doing multiple different aggregations and JOINs with the table.
Bucketing works by putting your data into files with guarantees about what data lives in them. For example, at Facebook, we would bucket our tables into 1024 buckets on user_id all the time.
This made it so if you did any aggregation that were GROUP BY user_id or any JOINs on user_id with another bucketed table, those operations could be done without shuffling at all!
One rule here with bucketing is you should always pick a power of two for your number of buckets. This is because when doing a bucket join on tables where the join key is a multiple that’s okay.
Imagine a table with 4 buckets and 8 buckets both on the user_id column.
Bucket 1 and 2 in the 8 bucket table would join with Bucket 1 on the 4 bucket table, Bucket 3 and 4 in the 8 bucket table would join with Bucket 2 of the 4 bucket table, etc, etc.
This multiple trick is awesome when doing bucket joins of tables of varying sizes!
Using a broadcast join
This trick works great and avoids shuffle entirely but only if one side of the join is “small.” Small meaning 5 gigabytes or less.
When I worked at Netflix, we had a 2 petabyte per day pipeline of network logs. We needed to join an ip address table to these logs to see which microservice app worked. When we did the lookups with IPv4, it worked great because the lookup table was ~4 GBs and we could broadcast join it with Spark. When we wanted to enhance this pipeline with IPv6 addresses, we couldn’t use Spark anymore because broadcast join no longer was an option!
So, what we did to minimize shuffle after upgrading to IPv6 was the next technique below!
Log the needed data at the point of generation
When data is logged, it often has access to a rich context of fields such as device, IP address, location, etc, etc. Putting what context you need here can avoid painful downstream.
At Netflix, we had teams adopt sidecar proxies to log the needed app information with each network request. This made requirement to join obsolete and the pipeline became much more efficient!
Model your data with One Big Table data modeling (I did a 42 minute video explaining this here)
Nowadays in data engineering, compute is more expensive than storage. So maybe you model your data in such a way that the join already happened?
Imagine you had a join between users and likes tables:
users
user_id BIGINT
country STRING
gender STRING
operation_system STRING
likes
post_id BIGINT
user_id BIGINT
timestamp TIMESTAMP
You could imagine every time you wanted to count the number of likes on Sunday in India
SELECT COUNT(1) as num_likes FROM users u JOIN likes l ON u.user_id = l.user_id WHERE u.country = 'India' AND DATE_PART('day', l.timestamp) = 0
The problem with this is the shuffle happening everyday across a lot of data as likes and users grow!
Imagine a schema like this:
user_likes
user_id BIGINT
country STRING
gender STRING
operation_system STRING
post_likes ARRAY<STRUCT<post_id, timestamp»
With this schema you could do a query like:
SELECT REDUCE(post_likes, 0, (x,s) -> CASE WHEN x.country = 'India' AND DATE_PART('day', x.timestamp) = 0 THEN s + 1 ELSE s END, s -> s) as num_likes FROM user_likes
You’ll see with this query, there is no JOIN and this query will give you the same result but be incredibly fast! Keeping in mind the syntax here is for Trino!
We cover shuffle and data modeling in much more detail in my DataExpert.io boot camp that starts on May 6th of this year! If you use code SHUFFLE15 at checkout, you can get 15% off!
Also, the DataExpert.io SQL training experience is now in open beta! You can check out our 50 questions here!
What other techniques have you used to reduce shuffle in your pipelines? If you enjoyed this content, please share with your friends and coworkers!
This was great. I do lots of complex joining across big data tables in our data lake within data warehouses in order to aggregate real time data. I’d like to add the distributing your dataflow such that you can aggregate parts of the data separately into intermediate tables then rejoin it later on allows you to minimize disk spillage. As mentioned it’s compute and transfer costs that are expensive not storage.
Hi Zach, I working in Amazon and recently we got the exact same query performance problem. We are using RDS PostgreSQL db and we are bucketing our table by month. But our query pattern is user can select max 2 years time frame and group by seller id and product id to get total sales in max last 2 years. And the queries for big sellers which have tons of product ids for long time frame is really slow, it took 4mins to run. Then based on your suggestion, I think we can instead of bucketing by date, we should bucketing by seller id to put all data related to this seller in one machine. But question is:1. How do we handle skew data, means some sellers can have million products for 2 years of data but some sellers only have few products for 2 years , this will make some bucket are really big and have bad performance still . 2. Is this a good practice we have thousands buckets in RDS?