Getting Started with Data Ingestion Using Spark
Overview
You can use the Apache Spark open-source data engine to work with data in the platform. This tutorial demonstrates how to run Spark jobs for reading and writing data in different formats (converting the data format), and for running SQL queries on the data. For more information about Spark, see the Spark v3.2.3 quick-start guide.
Before You Begin
To follow this tutorial, you must first ingest some data, such as a CSV or Parquet file, into the platform (i.e., write data to a platform data container). For information about the available data-ingestion methods, see the Ingesting and Preparing Data and Ingesting and Consuming Files tutorials.
Data Formats
The Spark jobs in this tutorial process data in the following data formats:
-
Comma Separated Value (CSV)
-
Parquet — an Apache columnar storage format that can be used in Apache Hadoop.
For more information about Parquet, see https://parquet.apache.org/.
For more information about Hadoop, see the Apache Hadoop web site. -
NoSQL — the platform's NoSQL format. A NoSQL table is a collection of items (objects) and their attributes. "items" are the equivalent of NoSQL database rows, and "attributes" are the equivalent of NoSQL database columns.
All items in the platform share one common attribute, which serves as an item's name and primary key. The value of this attribute must be unique to each item within a given NoSQL table. The primary key enables unique identification of specific items in the table, and efficient sharding of the table items.
For more information, see Working with NoSQL Data.You can use Spark Datasets API Reference, or the platform's NoSQL Web API Reference, to add, retrieve, and remove NoSQL table items. You can also use the platform's Spark API extensions or NoSQL Web API to extend the basic functionality of Spark Datasets (for example, to conditionally update an item in a NoSQL table). For more information, see the related API references.
Using a Web Notebook
A common way to run Spark data jobs is by using web notebook for performing interactive data analytics, such as Jupyter Notebook. You create a web notebook with notes that define Spark jobs for interacting with the data, and then run the jobs from the web notebook. The code can be written in any of the supported language interpreters. This tutorial contains examples in Scala and Python. For more information about Jupyter Notebook, see the product documentation. See also Running Spark Jobs from a Web Notebook in the Spark reference overview.
The examples in this tutorial were tested with Spark v3.2.3.
Selecting the Programming Language and Creating a Spark Session
In JupyterLab, select to create a new Python or Scala notebook.
Then, add the following code in your Jupyter notebook cell to perform required imports and create a new Spark session; you're encouraged to change the appName
string to provide a more unique description:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types._
val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
import sys
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
At the end of your code flow, add a cell/paragraph with the following code to stop the Spark session and release its resources:
spark.stop()
spark.stop()
Sample Workflows
Following are some possible workflows that use the Spark jobs outlined in this tutorial:
-
Write a CSV file to a platform data container.
-
Convert the CSV file into a Parquet table.
-
Run SQL queries on the data in Parquet table.
-
Write a Parquet table to a platform data container.
-
Convert the Parquet table into a NoSQL table.
-
Run SQL queries on the data in NoSQL table.
Reading the Data
Reading CSV Data
Use the following code to read data in CSV format.
You can read both CSV files and CSV directories.
option("inferSchema", "true")
in Scala or csv(..., inferSchema="true")
in Python).
(Note that inferSchema
requires an extra pass over the data.)The
val <schema variable> = StructType(List(
StructField("<column name>", <column type>, nullable = <Boolean value>),
<additional StructField() columns> of the same format, for each column>))
val <DF variable> = spark.read.schema(<schema variable>)
.option("header", "<true/false>")
.option("delimiter", "<delimiter>")
.csv("v3io://<container name>/<path to CSV data>")
<schema variable> = StructType([
StructField("<column name>", <column type>, <is-nullable Boolean value>),
<additional StructField() columns> of the same format, for each column>)])
<DF variable> = spark.read.schema(<schema variable>) \
.option("header", "<true/false>") \
.option("delimiter", "<delimiter>") \
.csv("v3io://<container name>/<path to CSV data>")
The following example reads a myDF
DataFrame variable.
val schema = StructType(List(
StructField("pickup_time", LongType, nullable = true),
StructField("dropoff_time", LongType, nullable = true),
StructField("passenger_count", LongType, nullable = true),
StructField("trip_distance", DoubleType, nullable = true),
StructField("payment_type", LongType, nullable = true),
StructField("fare_amount", DoubleType, nullable = true),
StructField("tip_amount", DoubleType, nullable = true),
StructField("tolls_amount", DoubleType, nullable = true),
StructField("total_amount", DoubleType, nullable = true)
))
val myDF = spark.read.schema(schema)
.option("header", "false")
.option("delimiter", "|")
.csv("v3io://projects/mydata/nycTaxi.csv")
schema = StructType([
StructField("pickup_time", LongType(), True),
StructField("dropoff_time", LongType(), True),
StructField("passenger_count", LongType(), True),
StructField("trip_distance", DoubleType(), True),
StructField("payment_type", LongType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("tip_amount", DoubleType(), True),
StructField("tolls_amount", DoubleType(), True),
StructField("total_amount", DoubleType(), True)
])
myDF = spark.read.schema(schema) \
.option("header", "false") \
.option("delimiter", "|") \
.csv("v3io://projects/mydata/nycTaxi.csv")
Reading Parquet Data
Use the following code to read data as a Parquet database table.
val <DF variable> = spark.read.parquet("v3io://<container name>/<path to Parquet data>")
<DF variable> = spark.read.parquet("v3io://<container name>/<path to Parquet data>")
The following example reads a myDF
DataFrame variable.
val myDF = spark.read.parquet("v3io://projects/mydata/my-parquet-table")
myDf = spark.read.parquet("v3io://projects/mydata/my-parquet-table")
Reading NoSQL Data
Use the following code to read data as a NoSQL table.
option("inferSchema", "true")
).
For more information, see Defining the Table Schema in the Spark NoSQL DataFrame reference.val <schema variable> = StructType(List(
StructField("<column name>", <column type>, nullable = <Boolean value>),
<additional StructField() columns> of the same format, for each column>))
val <DF variable> = spark.read.schema(<schema variable>)
.format("io.iguaz.v3io.spark.sql.kv")
.load("v3io://<container name>/<path to a NoSQL table>")
<schema variable> = StructType([
StructField("<column name>", <column type>, <is-nullable Boolean value>),
<additional StructField() columns> of the same format, for each column>)])
<DF variable> = spark.read.schema(<schema variable>) \
.format("io.iguaz.v3io.spark.sql.kv") \
.load("v3io://<container name>/<path to a NoSQL table>")
The following example reads a myDF
DataFrame variable.
val schema = StructType(List(
StructField("id", StringType, nullable = false),
StructField("origin_country", StringType, nullable = true),
StructField("call_sign", StringType, nullable = true),
StructField("velocity", DoubleType, nullable = true),
StructField("altitude", DoubleType, nullable = true),
StructField("__mtime_secs", LongType, nullable = true)
))
val myDF = spark.read.schema(schema)
.format("io.iguaz.v3io.spark.sql.kv")
.load("v3io://projects/mydata/flights")
schema = StructType([
StructField("id", StringType(), False),
StructField("origin_country", StringType(), True),
StructField("call_sign", StringType(), True),
StructField("velocity", DoubleType(), True),
StructField("altitude", DoubleType(), True),
StructField("__mtime_secs", LongType(), True)
])
myDF = spark.read.schema(schema) \
.format("io.iguaz.v3io.spark.sql.kv") \
.load("v3io://projects/mydata/flights")
Writing the Data (Converting the Format)
Writing Parquet Data
Use the following code to write data as a Parquet database table.
<DF variable>.write.parquet("v3io://<container name>/<path to Parquet data>")
<DF variable>.write.parquet("v3io://<container name>/<path to Parquet data>")
The following example converts the data that is currently associated with the myDF
DataFrame variable into a
myDF.write.parquet("v3io://projects/mydata/my-parquet-table")
myDF.write.parquet("v3io://projects/mydata/my-parquet-table")
Writing NoSQL Data
Use the following code to write data as a NoSQL table.
<DF variable>.write.format("io.iguaz.v3io.spark.sql.kv")
.option("key", <key column>)
.save("v3io://<container name>/<path to a NoSQL table>")
<DF variable>.write.format("io.iguaz.v3io.spark.sql.kv") \
.option("key", <key column>) \
.save("v3io://<container name>/<path to a NoSQL table>")
The following example converts the data that is currently associated with the myDF
DataFrame variable into a
myDF.write.format("io.iguaz.v3io.spark.sql.kv")
.option("key", "ID").save("v3io://projects/mydata/my-nosql-table")
myDF.write.format("io.iguaz.v3io.spark.sql.kv") \
.option("key", "ID").save("v3io://projects/mydata/my-nosql-table")
Writing CSV Data
Use the following code to write data in CSV format.
You can write both CSV files and CSV directories.
The
<DF variable>.write
.option("header", "<true/false>")
.option("delimiter", "<delimiter>")
.csv("v3io://<container name>/<path to CSV data>")
<DF variable>.write \
.option("header", "<true/false>") \
.option("delimiter", "<delimiter>") \
.csv("v3io://<container name>/<path to CSV data>")
The following example converts the data that is currently associated with the myDF
DataFrame variable into
myDF.write.option("header", "true").option("delimiter", ",")
.csv("v3io://projects/mydata/my-csv-data")
myDF.write.option("header", "true").option("delimiter", ",") \
.csv("v3io://projects/mydata/my-csv-data")
Running SQL Data Queries
Use the following syntax to run an SQL query on your data.
The call to
<DF variable>.createOrReplaceTempView("<SQL table name>")
spark.sql("<SQL query string>").show()
<DF variable>.createOrReplaceTempView("<SQL table name>")
spark.sql("<SQL query string>").show()
The following example creates a temporary myTable
SQL table for the database associated with the myDF
DataFrame variable, and runs an SQL query on this table:
myDF.createOrReplaceTempView("myTable")
spark.sql("select column1, count(1) as count from myTable
where column2='xxx' group by column1").show()
myDF.createOrReplaceTempView("myTable")
spark.sql("select column1, \
count(1) as count from myTable where column2='xxx' group by column1") \
.show()