In this post, we will see how to explode columns containing arrays into rows in a Spark Dataframe using scala.
First, load the files from Databricks dbfs into a dataframe.
import org.apache.spark.sql.functions._
val df = spark.read.option("multiline", "true").json("dbfs:/FileStore/files/json/json_file_1.txt")
Next, issue the following commands one by one.
display(df.select("source_id", "data"))
display(df.select("source_id", "data.sensor1", "data.sensor2"))
display(df.select("source_id", "data.sensor1.c02_level"))
display(df.select("source_id", "data.sensor1.c02_level").withColumn("c02_level", explode(col("c02_level"))))
Cheers!
Thursday, July 30, 2020
Wednesday, July 29, 2020
Load Nested JSON with Array into a Spark Dataframe using Scala
In this article we will load nested JSON data with an array into a Spark Dataframe using Scala.
The nested json file with an array is provided below for reference.
The SBT library dependencies are shown below for reference.
The Scala program is provided below.
The nested json file with an array is provided below for reference.
The SBT library dependencies are shown below for reference.
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
The Scala program is provided below.
Here is the output after running - df.show().import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object NestedJsonReader extends App { System.setProperty("hadoop.home.dir","C:\\intellij.winutils") val spark = SparkSession.builder() .master("local") .appName("JsonFileReader") .getOrCreate() val df = spark.read .format("json") .option("multiline","true") .load("C:\\data\\nested-data.json") //show dataframe with truncated data values if they exceed default column display width df.show() //don't truncate the value, instead expand the column automatically so all the entire value can be viewed df.show(false) //explode the Employees array into rows df.withColumn("Employees", explode(col("Employees"))).show(false) //select each column from the exploded data frame df.withColumn("Employees", explode(col("Employees"))) .select("Department", "Employees.Id", "Employees.Age", "Employees.Name", "Employees.Name.FirstName", "Employees.Name.FirstName").show(false) }
Here is the output after running - df.show(false).
Here is the output after running - df.withColumn("Employees", explode(col("Employees"))).show(false).
Here is the output after running - df.withColumn("Employees", explode(col("Employees")))
.select("Department", "Employees.Id", "Employees.Age", "Employees.Name", "Employees.Name.FirstName", "Employees.Name.FirstName").show(false).
.select("Department", "Employees.Id", "Employees.Age", "Employees.Name", "Employees.Name.FirstName", "Employees.Name.FirstName").show(false).
Thanks. That is all for now!
Subscribe to:
Posts (Atom)