Reading Data From SQL Tables in Spark
SQL databases or relational databases are around for decads now. many systems store their data in RDBMS. Often we have to connect Spark to one of the relational database and process that data. In this article, we are going to learn about reading data from SQL tables in spark data frames. In this blog, I am using MySQL but you can use any relational database using same way.
Setting Up MySQL Connector
When we want spark to communicate with some RDBMS, we need a compatible connector. For MySQL, you can download its connector at this link MySQL Connector. Once you download it, we have to pass jar to Spark when we create SparkSession.
1 2 3 4 5 |
import pyspark from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SQL Tables")\ .config("spark.jars", "D:\\code\\spark\\spark-basics\\python-code\\mysql-connector-java.jar")\ .getOrCreate() |
If this does not work for you, you can also use below method to pass connector jar.
1 2 3 4 5 6 |
import pyspark from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SQL Tables")\ .config("spark.executor.extraClassPath","D:\\code\\spark\\spark-basics\\python-code\\mysql-connector-java.jar")\ .config("spark.driver.extraClassPath","D:\\code\\spark\\spark-basics\\python-code\\mysql-connector-java.jar")\ .getOrCreate() |
Once we initialize spark correctly, we can communicate with MySQL server and read table data.
Reading Table From MySQL using Spark
Let us see how to read entire table from MySQL and create its data frame in Spark. I have employees database and in that employees table on MySQL server.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
show tables; +----------------------+ | Tables_in_employees | +----------------------+ | current_dept_emp | | departments | | dept_emp | | dept_emp_latest_date | | dept_manager | | employees | | salaries | | titles | +----------------------+ select count(*) from employees.employees; +----------+ | count(*) | +----------+ | 300024 | +----------+ 1 row in set (0.02 sec) |
In spark, we can pass read format as “jdbc” with database url, username and password to read same table.
1 2 3 4 |
url = "jdbc:mysql://localhost:3306/employees" driver = "com.mysql.jdbc.Driver" user = "db_user" password = "db_password" |
1 2 3 4 5 6 7 8 9 10 11 12 |
df = spark.read\ .format("jdbc")\ .option("driver", driver)\ .option("url", url)\ .option("user", user)\ .option("password", password)\ .option("dbtable", "employees")\ .load() df.count() 300024 |
We can notice that we are getting the same number of row as count when we have data frame based on employees table.
Reading Data from SQL Query
Spark does not limit us to read entire table at a time. We can also pass any query to spark read function and we will get query result as data frame.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
query="(select * from employees where gender = 'F') as emp" df = spark.read\ .format("jdbc")\ .option("driver", driver)\ .option("url", url)\ .option("user", user)\ .option("password", password)\ .option("dbtable", query)\ .load() df.show(5) +------+----------+----------+---------+------+----------+ |emp_no|birth_date|first_name|last_name|gender| hire_date| +------+----------+----------+---------+------+----------+ | 10002|1964-06-02| Bezalel| Simmel| F|1985-11-21| | 10006|1953-04-20| Anneke| Preusig| F|1989-06-02| | 10007|1957-05-23| Tzvetan|Zielinski| F|1989-02-10| | 10009|1952-04-19| Sumant| Peac| F|1985-02-18| | 10010|1963-06-01| Duangkaew| Piveteau| F|1989-08-24| +------+----------+----------+---------+------+----------+ |
Remember to pass query in parentheses and renamed to something that is as sub query. Otherwise it will not work.
Reading From Database in Parallel
When we are reading large table, we would like to read that in parallel. This will dramatically improve read performance. We can pass “numPartitions” option to spark read function which will decide parallelism in reading data.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
df = spark.read\ .format("jdbc")\ .option("driver", driver)\ .option("url", url)\ .option("user", user)\ .option("password", password)\ .option("dbtable", "employees")\ .option("numPartitions", 10)\ .load() df.rdd.getNumPartitions() 1 |
In our case, it will still show as 1 partition only. This is because we do not have enough data to create 10 different partitions.
Conclusion
In this article, we have connected Spark with RDBMS and create data frames from tables. This is a powerful feature and allows us to process SQL data easily. You can find the code in this article at GitHub. I hope you will find this useful. See you in the next blog.