In this blog, we will learn how to filter rows from spark dataframe using Where and Filter functions.
In the last blog, we have loaded our data to Spark Dataframe. We have also used "inferschema" option to let spark figure out the schema of the Dataframe on its own. But in many cases, you would like to specify a schema for Dataframe. This will give you much better control over column names and especially data types. Let us see how we can add our custom schema while reading data in Spark.
In spark, schema is array StructField of type StructType. Each StructType has 4 parameters.
Let us write our custom schema for our flights data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from pyspark.sql.types import StructField, StructType, StringType,LongType custom_schema = StructType([ StructField("destination", StringType(), True), StructField("source", StringType(), True), StructField("total_flights", LongType(), True), ]) df = spark.read.format("csv") \ .schema(custom_schema) \ .option("header", True) \ .load("data/flights.csv") df.show(2) +-------------+-------+-------------+ | destination| source|total_flights| +-------------+-------+-------------+ |United States|Romania| 15| |United States|Croatia| 1| +-------------+-------+-------------+ only showing top 2 rows |
Now our data frame has column names and data types which we have specified. If you want to print schema for any dataframe you can use below function.
1 |
df.printSchema() |
We can add extra information about columns using the metadata filed. This filed takes key-value pairs and we can choose any number of keys and values depending on our needs.
1 2 3 4 5 6 7 8 9 |
custom_schema_with_metadata = StructType([ StructField("destination", StringType(), True, metadata={"desc": "destination country for flight"}), StructField("source", StringType(), True, metadata={"desc": "source country of flight"}), StructField("total_flights", LongType(), True, metadata={"desc": "Number of flights"}), ]) df = spark.read.format("csv") \ .schema(custom_schema_with_metadata) \ .option("header", True) \ .load("data/flights.csv") |
We can check our data frame and its schema now.
If you want to check schema with its metadata then we need to use following code. We can read all of schema with this function or also read schema for one column as well.
1 2 |
df.schema.json() df.schema.fields[0].metadata["desc"] |
This is how we can add a custom schema to our dataframes. I hope this helps. See you in the next blog.
In this blog, we will learn how to filter rows from spark dataframe using Where and Filter functions.
Getting distinct values from columns or rows is one of most used operations. We will learn how to get distinct values as well as count of distinct values.
In this blog, we will learn how to sort rows in spark dataframe based on some column values.