Read CSV Data in Spark
CSV (Comma-Separated Values) is one of most common file type to receive data. That is why, when you are working with Spark, having a good grasp on how to process CSV files is a must. Spark provides out of box support for CSV file types. In this blog, we will learn how to read CSV data in spark and different options available with this method.
Reading CSV File
Spark has built in support to read CSV file. We can use spark read command to it will read CSV data and return us DataFrame.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
df = spark.read.csv("data\\flight-data\\csv\\2010-summary.csv") df.printSchema() root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) df.show(2) +-----------------+-------------------+-----+ | _c0| _c1| _c2| +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| | United States| Romania| 1| +-----------------+-------------------+-----+ |
We can use read CSV function and passed path to our CSV file. Spark will read this file and return us a data frame. There are other generic ways to read CSV file as well.
1 2 3 4 5 6 7 |
# reading data using spark format option df2 = spark.read.format("csv").load("data/flight-data/csv/2010-summary.csv") # we can also pass path as option to spark read df3 = spark.read.format("csv")\ .option("path", "data/flight-data/csv/2010-summary.csv")\ .load() |
You can use either of method to read CSV file. In end, spark will return an appropriate data frame.
Handling Headers in CSV
More often than not, you may have headers in your CSV file. If you directly read CSV in spark, spark will treat that header as normal data row.
When we print our data frame using show command, we can see that column names are _c0, _c1 and _c2 and our first data row is DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, Count.
To handle headers in CSV file, in spark we can pass header flag as true while reading data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
df = spark.read\ .option("header", "true")\ .csv("data/flight-data/csv/2010-summary.csv") df.show(5) +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| |Equatorial Guinea| United States| 1| +-----------------+-------------------+-----+ only showing top 5 rows |
We can see that Spark has handled header row from CSV properly.
Reading CSV files in Folder
While reading CSV files in Spark, we can also pass path of folder which has CSV files. This will read all CSV files in that folder.
1 2 3 4 5 6 |
df = spark.read\ .option("header", "true")\ .csv("data/flight-data/csv") df.count() 1502 |
You will need to be more careful when passing path of the directory. If there is some other data or files (in any format) in that directory, Spark will treat that as input data and you may see wrong results or exceptions while processing such DataFrames.
To avoid this, we can also pass multiple paths while reading data.
1 2 3 |
df = spark.read\ .option("header", "true")\ .csv("data/flight-data/csv/2010-summary.csv, data/flight-data/csv/2011-summary.csv") |
DataFrame schema while reading CSV files
If you print schema of our current data frame, you notice that all column names are correct which have been picked up from header row. But data types of all columns are string.
1 2 3 4 5 6 |
df.printSchema() root |-- DEST_COUNTRY_NAME: string (nullable = true) |-- ORIGIN_COUNTRY_NAME: string (nullable = true) |-- count: string (nullable = true) |
Spark by default sets column data type as string. If we want spark to correctly identify data types, we can pass infer schema option while reading CSV file.
1 2 3 4 5 6 7 8 9 10 |
df = spark.read\ .option("header", "true")\ .option("inferSchema", "true")\ .csv("data/flight-data/csv/2010-summary.csv") df.printSchema() root |-- DEST_COUNTRY_NAME: string (nullable = true) |-- ORIGIN_COUNTRY_NAME: string (nullable = true) |-- count: integer (nullable = true) |
Now we can observe that data type for “count” column has been correctly identified as integer. When we pass infer schema as true, Spark reads a few lines from the file. So that it can correctly identify data types for each column.
Though in most cases Spark identifies column data types correctly, in production workloads it is recommended to pass our custom schema while reading file. We can do that using Spark’s “StructType” and “StructFiled” functions.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from pyspark.sql.types import StructField, StructType, StringType,LongType custom_schema = StructType([ StructField("destination", StringType(), True), StructField("source", StringType(), True), StructField("total_flights", LongType(), True), ]) df = spark.read.format("csv") \ .schema(custom_schema) \ .option("header", True) \ .load("data/flight-data/csv/2010-summary.csv") df.show(2) +-------------+-------+-------------+ | destination| source|total_flights| +-------------+-------+-------------+ |United States|Romania| 15| |United States|Croatia| 1| +-------------+-------+-------------+ only showing top 2 rows |
If you want to lean more about how to add custom schema while reading files in spark, you can check this article Adding Custom Schema to Spark DataFrame
Reading CSV with different delimiter
Sometimes, we have different delimiter in file other than comma “,”. In such cases we can specify separator character while reading CSV file. Below is example reading pipe (|) delimited file. But you can use any other character applicable in your case.
1 2 3 4 5 |
df = spark.read\ .option("header", "true")\ .option("inferSchema", "true")\ .option("sep", "|") \ .csv("data/flight-data/csv/piped_data.csv") |
Handling comma in column value
Sometimes, our column value itself has a comma in it. So when spark tried to read such a file, it cannot identify each column value. Consider sample data below.
1 2 |
name,address,age abc,123 some road, city, 30 |
We have comma present in our address, so when spark read this it will give us a below result which is incorrect.
1 2 3 4 5 6 7 8 9 10 11 |
df = spark.read\ .option("header", "true")\ .option("inferSchema", "true")\ .csv("data/flight-data/csv/sample_data.csv") df.show() +----+-------------+-----+ |name| address| age| +----+-------------+-----+ | abc|123 some road| city| +----+-------------+-----+ |
If we want to handle such cases, we need to make sure our data is enclosed in quotes. Then we can set escaped quotes as true while reading data to handle such cases.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
df = spark.read\ .option("header", "true")\ .option("inferSchema", "true")\ .option("escapeQuotes", "true")\ .csv("data/flight-data/csv/sample_data.csv") df.show() +----+-------------------+-----+ |name| address| age| +----+-------------------+-----+ | abc| 123 some road| city| | abc|123 some road, city| 30| +----+-------------------+-----+ # data in file name,address,age abc,123 some road, city, 30 "abc","123 some road, city", 30 |
We can see that spark has correctly put all address information in one column, and our age column has correct value when we have data enclosed in quotes.
Few important options while reading CSV files
Though CSV is one of common formats to store data, it is one of the most difficult one for processing. There are a lot of cases we have to be accountable for like spaces, date formats, null values, etc.
Apart from options we have discussed, below are few more options which you may find useful while dealing with CSV data in spark. If you need more details about these options, let me know.
CSV Option | Acceptable Values | Purpose |
---|---|---|
ignoreLeadingWhiteSpace | true or false | Removes any leading white spaces from data |
ignoreTrailingWhiteSpace | true or false | Removes any trailing white spaces from data |
nullValue | Any string character | This is used to spacify which sting represents null values in data |
nanValue | Any string character | This is used to spacify which sting represents NAN values in data |
dateFormat | String in Java's simpleDateFormat (yyyy-mm-dd) | Date format in data. |
maxColumns | Any integer | Maximum number of columns to be read from file |
maxCharsPerColumn | Any integer | maximum number of characters to be read for each column |
Conclusion
In this blog, we have written spark code to read CSV data. we have also checked different options to deal with common pitfalls while dealing with CSV files. You can find code in this git repo. I hope you have found this useful. See you on the next blog.