Aggregation Functions in Spark
Aggregation Functions are important part of big data analytics. When processing data, we need to a lot of different functions so it is a good thing Spark has provided us many in built functions. In this blog, we are going to learn aggregation functions in Spark.
Count
This is one of basic function where we count number of records or specify column to count. Let us see its example.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
flights_df = spark.read\ .option("inferSchema", "true")\ .option("header","true")\ .csv("D:\\code\\spark\\spark-basics\\data\\flight-data\\csv") flights_df.registerTempTable("flights") flights_df.count() 1506 spark.sql("select count(*) from flights").show() +--------+ |count(1)| +--------+ | 1506| +--------+ |
We can also use count with select expression for Data frame.
1 2 3 4 5 6 7 8 |
from pyspark.sql.functions import count flights_df.select(count("DEST_COUNTRY_NAME")).show() +------------------------+ |count(DEST_COUNTRY_NAME)| +------------------------+ | 1506| +------------------------+ |
If you want to know more about how to run SQL queries on spark data frames, you can read Running SQL queries on Spark DataFrames.
Count Distinct
We can also count distinct number of values from some column. For example, when we count number of countries surely we should not get 1506. Here, we will need to use count distinct function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
from pyspark.sql.functions import countDistinct flights_df.select( \ countDistinct("DEST_COUNTRY_NAME")\ .alias("dest_countries"))\ .show() +--------------+ |dest_countries| +--------------+ | 170| +--------------+ spark.sql("""select count(distinct DEST_COUNTRY_NAME) as dest_countries from flights""").show() +--------------+ |dest_countries| +--------------+ | 170| +--------------+ |
Approximate Count Distinct
When we are dealing with huge data sets, many times we do not need an exact value for distinct count. We can work with approximate value only. This will run much faster compared to count distinct function.
1 2 3 4 5 6 7 8 9 10 11 |
from pyspark.sql.functions import approx_count_distinct flights_df.select (\ approx_count_distinct("DEST_COUNTRY_NAME")\ .alias("approx countries"))\ .show() +----------------+ |approx countries| +----------------+ | 164| +----------------+ |
We can see that output with count distinct 164 is approximately near to the actual value of 170. With approx count distinct function we can also pass second parameter which decides maximum acceptable error while calculating distinct count.
1 2 3 4 5 6 7 8 9 10 |
flights_df.select (\ approx_count_distinct("DEST_COUNTRY_NAME",0.25)\ .alias("approx countries"))\ .show() +----------------+ |approx countries| +----------------+ | 160| +----------------+ |
We can also run SQL query to get approximate count.
1 2 3 4 5 6 7 8 9 10 11 12 |
spark.sql(""" SELECT approx_count_distinct(DEST_COUNTRY_NAME,0.1) as approx_countries FROM flights """).show() +----------------+ |approx_countries| +----------------+ | 162| +----------------+ |
First and Last
With First and Last function we can get first and last value of some column from data frame.
1 2 3 4 5 |
from pyspark.sql.functions import first, last flights_df.select(\ first("DEST_COUNTRY_NAME"), last("DEST_COUNTRY_NAME"))\ .show() |
Min and Max
With these aptly named functions, we can find minimum and maximum value for some column in the data frame.
1 2 3 4 5 6 7 8 9 10 11 |
from pyspark.sql.functions import min, max flights_df.select(\ min("count"), max("count"))\ .show() +----------+----------+ |min(count)|max(count)| +----------+----------+ | 1| 348113| +----------+----------+ |
1 2 |
# SQL query to get same result spark.sql("select min(count),max(count) from flights").show() |
Sum
Another function available is SUM, which we can use to sum all values from the column.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from pyspark.sql.functions import sum flights_df.select(sum("count")).show() # with SQL query spark.sql("select sum(count) from flights").show() # output +----------+ |sum(count)| +----------+ | 422269| +----------+ |
Sum Distinct
Like Count Distinct, we can also sum only distinct values from some column. This below example, it will not make a lot of sense but it should give you an idea how to use sum distinct function.
1 2 3 4 5 6 7 8 |
from pyspark.sql.functions import sumDistinct flights_df.select(sumDistinct("count")).show() +-------------------+ |sum(DISTINCT count)| +-------------------+ | 419432| +-------------------+ |
Average
Though we can calculate average by sum of values divided by count for some column, there is in built average function available as well.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
from pyspark.sql.functions import avg flights_df.select(avg("count")).show() # SQL code spark.sql("select avg(count) from flights").show() # output +-----------------+ | avg(count)| +-----------------+ |1655.956862745098| +-----------------+ |
Collect Set and Collect List
We can also aggregate values from some column using collect set and collect list functions. Both functions create an array from all values of that column. Only difference is collect set does not have any duplicates whereas collect list will have duplicate values as well.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from pyspark.sql.functions import collect_list, collect_set flights_df.select(\ collect_list("DEST_COUNTRY_NAME"), collect_set("DEST_COUNTRY_NAME"))\ .show() # SQL spark.sql("""select collect_Set(DEST_COUNTRY_NAME), collect_list(DEST_COUNTRY_NAME) from flights""").show() +-------------------------------+------------------------------+ |collect_list(DEST_COUNTRY_NAME)|collect_set(DEST_COUNTRY_NAME)| +-------------------------------+------------------------------+ | [United States, U...| [Italy, Slovakia,...| +-------------------------------+------------------------------+ |
Grouping Data
Till now we have done aggregations on the data frame level. We can also split data in groups depending on some value and get aggregate values. Below are some examples for this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
flights_df.groupBy("DEST_COUNTRY_NAME").count().show(5) # SQL spark.sql(""" select DEST_COUNTRY_NAME, count(*) from flights group by DEST_COUNTRY_NAME limit 5""").show() +-----------------+--------+ |DEST_COUNTRY_NAME|count(1)| +-----------------+--------+ | Anguilla| 1| | Russia| 1| | Paraguay| 1| | Senegal| 1| | Sweden| 1| +-----------------+--------+ |
1 2 3 4 5 6 7 8 9 10 11 12 |
flights_df.groupBy("DEST_COUNTRY_NAME").avg("count").show(5) +-----------------+----------+ |DEST_COUNTRY_NAME|avg(count)| +-----------------+----------+ | Anguilla| 21.0| | Russia| 152.0| | Paraguay| 90.0| | Senegal| 29.0| | Sweden| 65.0| +-----------------+----------+ only showing top 5 rows |
Conclusion
In this blog, we have gone through basic aggregation functions in Spark. There are many more functions available and we will go over them in the next few blogs. You can find code written in this blog at GitHub. See you in the next article.