Spark Join Types With Examples
When we are dealing with a lot of data coming from different sources, joining two or more datasets to get required information is a common use case. So it is a good thing Spark supports multiple join types. In this blog, we will learn spark join types with examples.
Spark Join Types
Like SQL, there are varaity of join typps available in spark.
- Inner Join – Keeps data from left and right data frame where keys exist in both
- Outer join – keeps data from left and right data frame where keys exist in either left or right data frame
- Let outer join – keeps data with keys in left data frame
- Right outer join – keeps data with keys in right data frame
- Left semi join – Only gets data from left data frame for which we have matching key in right data frame
- Left anti join – Only gets data from right data frame for which we do not have any matching key in right data frame
- Natural join – Joins two data frames with same column names
- Cross Join – joins every row from left data frame with every other row in right data frame
Now that we know all spark join types let us learn these using code and example data to understand them in a better way.
Setting Up Data
Before we start, we will need data frames on which we can test join types. Instead of reading lot of data, in this case we will set up small data frames which we can easily validate.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
emp = spark.createDataFrame([ (0,"John C", "2000-01-01", "D101"), (1,"Tom D", "2002-02-01", "D102"), (2,"Max", "2003-04-01", "D104"), (3,"Peter J", "2005-06-01", "D104"), (4,"Mark P", "2007-01-01", "D1022") ]).toDF("id","name","joining_date","dept_id") emp.show() +---+-------+------------+-------+ | id| name|joining_date|dept_id| +---+-------+------------+-------+ | 0| John C| 2000-01-01| D101| | 1| Tom D| 2002-02-01| D102| | 2| Max| 2003-04-01| D104| | 3|Peter J| 2005-06-01| D104| | 4| Mark P| 2007-01-01| D1022| +---+-------+------------+-------+ |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
dept = spark.createDataFrame([ ("D101", "Support"), ("D102", "HR"), ("D103", "Marketing"), ("D104", "Sells") ]).toDF("id", "dept_name") dept.show() +----+---------+ | id|dept_name| +----+---------+ |D101| Support| |D102| HR| |D103|Marketing| |D104| Sells| +----+---------+ |
We have set up two simple data frames, one with employees and one for departments. We have department id as a common column between these two. Now this is set up let us start with spark joins.
Inner Join
Inner join returns data from left and right data frame where join key is present in both data frames. For example, it will return data from employee and department data frame where same department id is present in both of them. Let us see that in example.
1 2 3 4 5 6 7 8 9 10 |
emp.join(dept, emp["dept_id"] == dept["id"], "inner").show() +---+-------+------------+-------+----+---------+ | id| name|joining_date|dept_id| id|dept_name| +---+-------+------------+-------+----+---------+ | 2| Max| 2003-04-01| D104|D104| Sells| | 3|Peter J| 2005-06-01| D104|D104| Sells| | 0| John C| 2000-01-01| D101|D101| Support| | 1| Tom D| 2002-02-01| D102|D102| HR| +---+-------+------------+-------+----+---------+ |
We can also format this code for better understanding by making join condition and join type as separate variables.
1 2 3 |
joinType = "inner" joinCondition = emp["dept_id"] == dept["id"] emp.join(dept, joinCondition, joinType).show() |
By default, spark performs inner join. So when we want to do inner join we can skip join type parameter and we will still receive same result.
1 2 3 4 5 6 7 8 9 10 |
emp.join(dept, joinCondition).show() +---+-------+------------+-------+----+---------+ | id| name|joining_date|dept_id| id|dept_name| +---+-------+------------+-------+----+---------+ | 2| Max| 2003-04-01| D104|D104| Sells| | 3|Peter J| 2005-06-01| D104|D104| Sells| | 0| John C| 2000-01-01| D101|D101| Support| | 1| Tom D| 2002-02-01| D102|D102| HR| +---+-------+------------+-------+----+---------+ |
Outer Join
In case of outer joins, Spark will take data from both data frames. If the matching key is not present in left or right data frame, Spark will put “null” for that data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
joinType = "outer" joinCondition = emp["dept_id"] == dept["id"] emp.join(dept, joinCondition, joinType).show() +----+-------+------------+-------+----+---------+ | id| name|joining_date|dept_id| id|dept_name| +----+-------+------------+-------+----+---------+ | 2| Max| 2003-04-01| D104|D104| Sells| | 3|Peter J| 2005-06-01| D104|D104| Sells| | 4| Mark P| 2007-01-01| D1022|null| null| |null| null| null| null|D103|Marketing| | 0| John C| 2000-01-01| D101|D101| Support| | 1| Tom D| 2002-02-01| D102|D102| HR| +----+-------+------------+-------+----+---------+ |
We can see that spark has picked all rows from both data frames and where it did not find a matching key like department id “D1022” it has inserted null for data coming from departments.
Left Outer Join
Spark will pick all rows from left data frame. For keys matching from the right data frame, it will get that data. And if there is no matching key in the right data frame, it will insert null.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
joinType = "left_outer" joinCondition = emp["dept_id"] == dept["id"] emp.join(dept, joinCondition, joinType).show() +---+-------+------------+-------+----+---------+ | id| name|joining_date|dept_id| id|dept_name| +---+-------+------------+-------+----+---------+ | 2| Max| 2003-04-01| D104|D104| Sells| | 3|Peter J| 2005-06-01| D104|D104| Sells| | 4| Mark P| 2007-01-01| D1022|null| null| | 0| John C| 2000-01-01| D101|D101| Support| | 1| Tom D| 2002-02-01| D102|D102| HR| +---+-------+------------+-------+----+---------+ |
Here only data from employee data frame is selected and for department id “D1022” as there is no matching record in department’s data frame, Spark has put null for that record.
Right Outer Join
Like left outer join, spark will only pick records from right data frame while doing right outer join. If there is no matching key in the left data frame, Spark will insert null.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
joinType = "right_outer" joinCondition = emp["dept_id"] == dept["id"] emp.join(dept, joinCondition, joinType).show() +----+-------+------------+-------+----+---------+ | id| name|joining_date|dept_id| id|dept_name| +----+-------+------------+-------+----+---------+ | 2| Max| 2003-04-01| D104|D104| Sells| | 3|Peter J| 2005-06-01| D104|D104| Sells| |null| null| null| null|D103|Marketing| | 0| John C| 2000-01-01| D101|D101| Support| | 1| Tom D| 2002-02-01| D102|D102| HR| +----+-------+------------+-------+----+---------+ |
Left Semi Joins
In case of left semi joins, Spark only picks data from left data frame for which it finds a common key in right data frame. Like most join types, this is better understood with an example.
1 2 3 4 5 6 7 8 9 10 11 12 |
joinType = "left_semi" joinCondition = emp["dept_id"] == dept["id"] emp.join(dept, joinCondition, joinType).show() +---+-------+------------+-------+ | id| name|joining_date|dept_id| +---+-------+------------+-------+ | 2| Max| 2003-04-01| D104| | 3|Peter J| 2005-06-01| D104| | 0| John C| 2000-01-01| D101| | 1| Tom D| 2002-02-01| D102| +---+-------+------------+-------+ |
We can notice that spark has data only from employee data frame and it has not picked employee id 4 as department id “D1022” is not present in department data frame.
Left Anti Join
Here, Spark picks data from left data frame only for those rows where it does not find a common key in the right data frame. For our example, it should only pick employee id 4 as it does not have matching dept_id in department’s data frame.
1 2 3 4 5 6 7 8 9 |
joinType = "left_anti" joinCondition = emp["dept_id"] == dept["id"] emp.join(dept, joinCondition, joinType).show() +---+------+------------+-------+ | id| name|joining_date|dept_id| +---+------+------------+-------+ | 4|Mark P| 2007-01-01| D1022| +---+------+------------+-------+ |
Natural Join
In case of Natura joins, spark joins columns with same name and performs inner join and returns us result. Using implicate join condition is always dangerous. Like in our example, spark will join employee id to department id which is clearly wrong.
1 2 3 4 5 6 7 8 9 10 11 |
emp.createOrReplaceTempView("emp") dept.createOrReplaceTempView("dept") spark.sql("select * from emp NATURAL JOIN dept").show() +---+----+------------+-------+---------+ | id|name|joining_date|dept_id|dept_name| +---+----+------------+-------+---------+ +---+----+------------+-------+---------+ # it is not showing any rows as id in each data frame means different things |
In most cases we should avoid using this join and specify our own joining condition.
Cross Join
In Cross join, Sparks joins each row from left data frame with each row in right data frame. This will result in huge data created in the cluster. We should we careful while using cross join as it can break our application.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
spark.sql("select * from emp CROSS JOIN dept").show() +---+-------+------------+-------+----+---------+ | id| name|joining_date|dept_id| id|dept_name| +---+-------+------------+-------+----+---------+ | 0| John C| 2000-01-01| D101|D101| Support| | 0| John C| 2000-01-01| D101|D102| HR| | 0| John C| 2000-01-01| D101|D103|Marketing| | 0| John C| 2000-01-01| D101|D104| Sells| | 1| Tom D| 2002-02-01| D102|D101| Support| | 1| Tom D| 2002-02-01| D102|D102| HR| | 1| Tom D| 2002-02-01| D102|D103|Marketing| | 1| Tom D| 2002-02-01| D102|D104| Sells| | 2| Max| 2003-04-01| D104|D101| Support| | 2| Max| 2003-04-01| D104|D102| HR| | 2| Max| 2003-04-01| D104|D103|Marketing| | 2| Max| 2003-04-01| D104|D104| Sells| | 3|Peter J| 2005-06-01| D104|D101| Support| | 3|Peter J| 2005-06-01| D104|D102| HR| | 3|Peter J| 2005-06-01| D104|D103|Marketing| | 3|Peter J| 2005-06-01| D104|D104| Sells| | 4| Mark P| 2007-01-01| D1022|D101| Support| | 4| Mark P| 2007-01-01| D1022|D102| HR| | 4| Mark P| 2007-01-01| D1022|D103|Marketing| | 4| Mark P| 2007-01-01| D1022|D104| Sells| +---+-------+------------+-------+----+---------+ |
Cross join have created so many rows for our small data frames. You can imagine what will happen if you do cross join on very large data. So be careful when using this join type.
Conclusion
In this blog, we have gone through spark join types and also written code for them. I hope you have found this useful. You can get all code in this blog at GitHub. See in the next article. Until then, keep learning.