Aggregation Functions in Spark
Aggregation Functions are important part of big data analytics. When processing data, we need to a lot of different functions so it is a good thing Spark has provided us many in built functions. In this blog, we are going to learn aggregation functions in Spark.
Count
This is one of basic function where we count number of records or specify column to count. Let us see its example.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
flights_df = spark.read\
.option("inferSchema", "true")\
.option("header","true")\
.csv("D:\\code\\spark\\spark-basics\\data\\flight-data\\csv")
flights_df.registerTempTable("flights")
flights_df.count()
1506
spark.sql("select count(*) from flights").show()
+--------+
|count(1)|
+--------+
| 1506|
+--------+
|
We can also use count with select expression for Data frame.
1
2
3
4
5
6
7
8
|
from pyspark.sql.functions import count
flights_df.select(count("DEST_COUNTRY_NAME")).show()
+------------------------+
|count(DEST_COUNTRY_NAME)|
+------------------------+
| 1506|
+------------------------+
|
If you want to know more about how to run SQL queries on spark data frames, you can read Running SQL queries on Spark DataFrames.
Count Distinct
We can also count distinct number of values from some column. For example, when we count number of countries surely we should not get 1506. Here, we will need to use count distinct function.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from pyspark.sql.functions import countDistinct
flights_df.select( \
countDistinct("DEST_COUNTRY_NAME")\
.alias("dest_countries"))\
.show()
+--------------+
|dest_countries|
+--------------+
| 170|
+--------------+
spark.sql("""select
count(distinct DEST_COUNTRY_NAME) as dest_countries
from flights""").show()
+--------------+
|dest_countries|
+--------------+
| 170|
+--------------+
|
Approximate Count Distinct
When we are dealing with huge data sets, many times we do not need an exact value for distinct count. We can work with approximate value only. This will run much faster compared to count distinct function.
1
2
3
4
5
6
7
8
9
10
11
|
from pyspark.sql.functions import approx_count_distinct
flights_df.select (\
approx_count_distinct("DEST_COUNTRY_NAME")\
.alias("approx countries"))\
.show()
+----------------+
|approx countries|
+----------------+
| 164|
+----------------+
|
We can see that output with count distinct 164 is approximately near to the actual value of 170. With approx count distinct function we can also pass second parameter which decides maximum acceptable error while calculating distinct count.
1
2
3
4
5
6
7
8
9
10
|
flights_df.select (\
approx_count_distinct("DEST_COUNTRY_NAME",0.25)\
.alias("approx countries"))\
.show()
+----------------+
|approx countries|
+----------------+
| 160|
+----------------+
|
We can also run SQL query to get approximate count.
1
2
3
4
5
6
7
8
9
10
11
12
|
spark.sql("""
SELECT approx_count_distinct(DEST_COUNTRY_NAME,0.1)
as approx_countries
FROM flights
""").show()
+----------------+
|approx_countries|
+----------------+
| 162|
+----------------+
|
First and Last
With First and Last function we can get first and last value of some column from data frame.
1
2
3
4
5
|
from pyspark.sql.functions import first, last
flights_df.select(\
first("DEST_COUNTRY_NAME"),
last("DEST_COUNTRY_NAME"))\
.show()
|
Min and Max
With these aptly named functions, we can find minimum and maximum value for some column in the data frame.
1
2
3
4
5
6
7
8
9
10
11
|
from pyspark.sql.functions import min, max
flights_df.select(\
min("count"),
max("count"))\
.show()
+----------+----------+
|min(count)|max(count)|
+----------+----------+
| 1| 348113|
+----------+----------+
|
1
2
|
# SQL query to get same result
spark.sql("select min(count),max(count) from flights").show()
|
Sum
Another function available is SUM, which we can use to sum all values from the column.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
from pyspark.sql.functions import sum
flights_df.select(sum("count")).show()
# with SQL query
spark.sql("select sum(count) from flights").show()
# output
+----------+
|sum(count)|
+----------+
| 422269|
+----------+
|
Sum Distinct
Like Count Distinct, we can also sum only distinct values from some column. This below example, it will not make a lot of sense but it should give you an idea how to use sum distinct function.
1
2
3
4
5
6
7
8
|
from pyspark.sql.functions import sumDistinct
flights_df.select(sumDistinct("count")).show()
+-------------------+
|sum(DISTINCT count)|
+-------------------+
| 419432|
+-------------------+
|
Average
Though we can calculate average by sum of values divided by count for some column, there is in built average function available as well.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from pyspark.sql.functions import avg
flights_df.select(avg("count")).show()
# SQL code
spark.sql("select avg(count) from flights").show()
# output
+-----------------+
| avg(count)|
+-----------------+
|1655.956862745098|
+-----------------+
|
Collect Set and Collect List
We can also aggregate values from some column using collect set and collect list functions. Both functions create an array from all values of that column. Only difference is collect set does not have any duplicates whereas collect list will have duplicate values as well.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from pyspark.sql.functions import collect_list, collect_set
flights_df.select(\
collect_list("DEST_COUNTRY_NAME"),
collect_set("DEST_COUNTRY_NAME"))\
.show()
# SQL
spark.sql("""select
collect_Set(DEST_COUNTRY_NAME),
collect_list(DEST_COUNTRY_NAME)
from flights""").show()
+-------------------------------+------------------------------+
|collect_list(DEST_COUNTRY_NAME)|collect_set(DEST_COUNTRY_NAME)|
+-------------------------------+------------------------------+
| [United States, U...| [Italy, Slovakia,...|
+-------------------------------+------------------------------+
|
Grouping Data
Till now we have done aggregations on the data frame level. We can also split data in groups depending on some value and get aggregate values. Below are some examples for this.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
flights_df.groupBy("DEST_COUNTRY_NAME").count().show(5)
# SQL
spark.sql("""
select DEST_COUNTRY_NAME, count(*) from
flights group by DEST_COUNTRY_NAME
limit 5""").show()
+-----------------+--------+
|DEST_COUNTRY_NAME|count(1)|
+-----------------+--------+
| Anguilla| 1|
| Russia| 1|
| Paraguay| 1|
| Senegal| 1|
| Sweden| 1|
+-----------------+--------+
|
1
2
3
4
5
6
7
8
9
10
11
12
|
flights_df.groupBy("DEST_COUNTRY_NAME").avg("count").show(5)
+-----------------+----------+
|DEST_COUNTRY_NAME|avg(count)|
+-----------------+----------+
| Anguilla| 21.0|
| Russia| 152.0|
| Paraguay| 90.0|
| Senegal| 29.0|
| Sweden| 65.0|
+-----------------+----------+
only showing top 5 rows
|
Conclusion
In this blog, we have gone through basic aggregation functions in Spark. There are many more functions available and we will go over them in the next few blogs. You can find code written in this blog at GitHub. See you in the next article.