Getting Started with Data Ingestion Using Spark

Overview

You can use the Apache Spark open-source data engine to work with data in the platform. This tutorial demonstrates how to run Spark jobs for reading and writing data in different formats (converting the data format), and for running SQL queries on the data. For more information about Spark, see the Spark v3.2.3 quick-start guide.

Before You Begin

To follow this tutorial, you must first ingest some data, such as a CSV or Parquet file, into the platform (i.e., write data to a platform data container). For information about the available data-ingestion methods, see the Ingesting and Preparing Data and Ingesting and Consuming Files tutorials.

Data Formats

The Spark jobs in this tutorial process data in the following data formats:

  • Comma Separated Value (CSV)

  • Parquet — an Apache columnar storage format that can be used in Apache Hadoop.
    For more information about Parquet, see https://parquet.apache.org/.
    For more information about Hadoop, see the Apache Hadoop web site.

  • NoSQL — the platform's NoSQL format. A NoSQL table is a collection of items (objects) and their attributes. "items" are the equivalent of NoSQL database rows, and "attributes" are the equivalent of NoSQL database columns.
    All items in the platform share one common attribute, which serves as an item's name and primary key. The value of this attribute must be unique to each item within a given NoSQL table. The primary key enables unique identification of specific items in the table, and efficient sharding of the table items.
    For more information, see Working with NoSQL Data.

    You can use Spark Datasets API Reference, or the platform's NoSQL Web API Reference, to add, retrieve, and remove NoSQL table items. You can also use the platform's Spark API extensions or NoSQL Web API to extend the basic functionality of Spark Datasets (for example, to conditionally update an item in a NoSQL table). For more information, see the related API references.

Using a Web Notebook

A common way to run Spark data jobs is by using web notebook for performing interactive data analytics, such as Jupyter Notebook. You create a web notebook with notes that define Spark jobs for interacting with the data, and then run the jobs from the web notebook. The code can be written in any of the supported language interpreters. This tutorial contains examples in Scala and Python. For more information about Jupyter Notebook, see the product documentation. See also Running Spark Jobs from a Web Notebook in the Spark reference overview.

The examples in this tutorial were tested with Spark v3.2.3.

Selecting the Programming Language and Creating a Spark Session

In JupyterLab, select to create a new Python or Scala notebook.

Scala Jupyter Notebooks
Version 3.6.1 of the platform doesn't support Scala Jupyter notebooks. See the Software Specifications and Restrictions.

Then, add the following code in your Jupyter notebook cell to perform required imports and create a new Spark session; you're encouraged to change the appName string to provide a more unique description:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types._

val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()

Copy

 Copied to clipboard

import sys
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("My Spark Application").getOrCreate()

Copy

 Copied to clipboard

At the end of your code flow, add a cell/paragraph with the following code to stop the Spark session and release its resources:

spark.stop()

Copy

 Copied to clipboard

spark.stop()

Copy

 Copied to clipboard

Sample Workflows

Following are some possible workflows that use the Spark jobs outlined in this tutorial:

Workflow 1: Convert a CSV File into a Partitioned Parquet Table
  1. Write a CSV file to a platform data container.

  2. Convert the CSV file into a Parquet table.

  3. Run SQL queries on the data in Parquet table.

Workflow 2: Convert a Parquet Table into a NoSQL Table
  1. Write a Parquet table to a platform data container.

  2. Convert the Parquet table into a NoSQL table.

  3. Run SQL queries on the data in NoSQL table.

Reading the Data

Reading CSV Data

Use the following code to read data in CSV format.
You can read both CSV files and CSV directories.

Defining the Table Schema
To read CSV data using a Spark DataFrame, Spark needs to be aware of the schema of the data. You can either define the schema programmatically as part of the read operation as demonstrated in this section, or let Spark infer the schema as outlined in the Spark SQL and DataFrames documentation (e.g., option("inferSchema", "true") in Scala or csv(..., inferSchema="true") in Python). (Note that inferSchema requires an extra pass over the data.)
Note
Before running the read job, ensure that the referenced data source exists.
Syntax

The header and delimiter options are optional.

val <schema variable> = StructType(List(
    StructField("<column name>", <column type>, nullable = <Boolean value>),
    <additional StructField() columns> of the same format, for each column>))
val <DF variable> = spark.read.schema(<schema variable>)
    .option("header", "<true/false>")
    .option("delimiter", "<delimiter>")
    .csv("v3io://<container name>/<path to CSV data>")

Copy

 Copied to clipboard

<schema variable> = StructType([
    StructField("<column name>", <column type>, <is-nullable Boolean value>),
    <additional StructField() columns> of the same format, for each column>)])
<DF variable> = spark.read.schema(<schema variable>) \
    .option("header", "<true/false>") \
    .option("delimiter", "<delimiter>") \
    .csv("v3io://<container name>/<path to CSV data>")

Copy

 Copied to clipboard

Example

The following example reads a /mydata/nycTaxi.csv CSV file from the "projects" container into a myDF DataFrame variable.

val schema = StructType(List(
    StructField("pickup_time", LongType, nullable = true),
    StructField("dropoff_time", LongType, nullable = true),
    StructField("passenger_count", LongType, nullable = true),
    StructField("trip_distance", DoubleType, nullable = true),
    StructField("payment_type", LongType, nullable = true),
    StructField("fare_amount", DoubleType, nullable = true),
    StructField("tip_amount", DoubleType, nullable = true),
    StructField("tolls_amount", DoubleType, nullable = true),
    StructField("total_amount", DoubleType, nullable = true)
))
val myDF = spark.read.schema(schema)
    .option("header", "false")
    .option("delimiter", "|")
    .csv("v3io://projects/mydata/nycTaxi.csv")

Copy

 Copied to clipboard

schema = StructType([
    StructField("pickup_time", LongType(), True),
    StructField("dropoff_time", LongType(), True),
    StructField("passenger_count", LongType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("payment_type", LongType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)
])
myDF = spark.read.schema(schema) \
    .option("header", "false") \
    .option("delimiter", "|") \
    .csv("v3io://projects/mydata/nycTaxi.csv")

Copy

 Copied to clipboard

Reading Parquet Data

Use the following code to read data as a Parquet database table.

Note
Before running the read job, ensure that the referenced data source exists.
Syntax
val <DF variable> = spark.read.parquet("v3io://<container name>/<path to Parquet data>")

Copy

 Copied to clipboard

<DF variable> = spark.read.parquet("v3io://<container name>/<path to Parquet data>")

Copy

 Copied to clipboard

Example

The following example reads a /mydata/my-parquet-table Parquet database table from the "projects" container into a myDF DataFrame variable.

val myDF = spark.read.parquet("v3io://projects/mydata/my-parquet-table")

Copy

 Copied to clipboard

myDf = spark.read.parquet("v3io://projects/mydata/my-parquet-table")

Copy

 Copied to clipboard

Reading NoSQL Data

Use the following code to read data as a NoSQL table.

Defining the Table Schema
When using a Spark DataFrame to read data that was written in the platform using a NoSQL Spark DataFrame, the schema of the table structure is automatically identified and retrieved (unless you select to explicitly define the schema for the read operation). However, to read NoSQL data that was written to a table in another way, you first need to define the table schema. You can either define the schema programmatically as part of the read operation as demonstrated in this section, or let the platform infer the schema by using the inferSchema option (option("inferSchema", "true")). For more information, see Defining the Table Schema in the Spark NoSQL DataFrame reference.
Note
Before running the read job, ensure that the referenced data source exists.
Syntax
val <schema variable> = StructType(List(
    StructField("<column name>", <column type>, nullable = <Boolean value>),
    <additional StructField() columns> of the same format, for each column>))
val <DF variable> = spark.read.schema(<schema variable>)
    .format("io.iguaz.v3io.spark.sql.kv")
    .load("v3io://<container name>/<path to a NoSQL table>")

Copy

 Copied to clipboard

<schema variable> = StructType([
    StructField("<column name>", <column type>, <is-nullable Boolean value>),
    <additional StructField() columns> of the same format, for each column>)])
<DF variable> = spark.read.schema(<schema variable>) \
    .format("io.iguaz.v3io.spark.sql.kv") \
    .load("v3io://<container name>/<path to a NoSQL table>")

Copy

 Copied to clipboard

Example

The following example reads a /mydata/flights NoSQL table from the "projects" container into a myDF DataFrame variable.

val schema = StructType(List(
    StructField("id", StringType, nullable = false),
    StructField("origin_country", StringType, nullable = true),
    StructField("call_sign", StringType, nullable = true),
    StructField("velocity", DoubleType, nullable = true),
    StructField("altitude", DoubleType, nullable = true),
    StructField("__mtime_secs", LongType, nullable = true)
))
val myDF = spark.read.schema(schema)
    .format("io.iguaz.v3io.spark.sql.kv")
    .load("v3io://projects/mydata/flights")

Copy

 Copied to clipboard

schema = StructType([
    StructField("id", StringType(), False),
    StructField("origin_country", StringType(), True),
    StructField("call_sign", StringType(), True),
    StructField("velocity", DoubleType(), True),
    StructField("altitude", DoubleType(), True),
    StructField("__mtime_secs", LongType(), True)
])
myDF = spark.read.schema(schema) \
    .format("io.iguaz.v3io.spark.sql.kv") \
    .load("v3io://projects/mydata/flights")

Copy

 Copied to clipboard

Writing the Data (Converting the Format)

Writing Parquet Data

Use the following code to write data as a Parquet database table.

Syntax
<DF variable>.write.parquet("v3io://<container name>/<path to Parquet data>")

Copy

 Copied to clipboard

<DF variable>.write.parquet("v3io://<container name>/<path to Parquet data>")

Copy

 Copied to clipboard

Example

The following example converts the data that is currently associated with the myDF DataFrame variable into a /mydata/my-parquet-table Parquet database table in the "projects" container.

myDF.write.parquet("v3io://projects/mydata/my-parquet-table")

Copy

 Copied to clipboard

myDF.write.parquet("v3io://projects/mydata/my-parquet-table")

Copy

 Copied to clipboard

Writing NoSQL Data

Use the following code to write data as a NoSQL table.

Syntax
<DF variable>.write.format("io.iguaz.v3io.spark.sql.kv")
    .option("key", <key column>)
    .save("v3io://<container name>/<path to a NoSQL table>")

Copy

 Copied to clipboard

<DF variable>.write.format("io.iguaz.v3io.spark.sql.kv") \
    .option("key", <key column>) \
    .save("v3io://<container name>/<path to a NoSQL table>")

Copy

 Copied to clipboard

Example

The following example converts the data that is currently associated with the myDF DataFrame variable into a /mydata/my-nosql-table NoSQL table in the "projects" container.

myDF.write.format("io.iguaz.v3io.spark.sql.kv")
    .option("key", "ID").save("v3io://projects/mydata/my-nosql-table")

Copy

 Copied to clipboard

myDF.write.format("io.iguaz.v3io.spark.sql.kv") \
    .option("key", "ID").save("v3io://projects/mydata/my-nosql-table")

Copy

 Copied to clipboard

Writing CSV Data

Use the following code to write data in CSV format.
You can write both CSV files and CSV directories.

Syntax

The header and delimiter options are optional.

<DF variable>.write
    .option("header", "<true/false>")
    .option("delimiter", "<delimiter>")
    .csv("v3io://<container name>/<path to CSV data>")

Copy

 Copied to clipboard

<DF variable>.write \
    .option("header", "<true/false>") \
    .option("delimiter", "<delimiter>") \
    .csv("v3io://<container name>/<path to CSV data>")

Copy

 Copied to clipboard

Example

The following example converts the data that is currently associated with the myDF DataFrame variable into /mydata/my-csv-data CSV data in the "projects" container.

myDF.write.option("header", "true").option("delimiter", ",")
    .csv("v3io://projects/mydata/my-csv-data")

Copy

 Copied to clipboard

myDF.write.option("header", "true").option("delimiter", ",") \
    .csv("v3io://projects/mydata/my-csv-data")

Copy

 Copied to clipboard

Running SQL Data Queries

Use the following syntax to run an SQL query on your data.

Syntax

The call to show is optional.

<DF variable>.createOrReplaceTempView("<SQL table name>")
spark.sql("<SQL query string>").show()

Copy

 Copied to clipboard

<DF variable>.createOrReplaceTempView("<SQL table name>")
spark.sql("<SQL query string>").show()

Copy

 Copied to clipboard

Example

The following example creates a temporary myTable SQL table for the database associated with the myDF DataFrame variable, and runs an SQL query on this table:

myDF.createOrReplaceTempView("myTable")
spark.sql("select column1, count(1) as count from myTable
    where column2='xxx' group by column1").show()

Copy

 Copied to clipboard

myDF.createOrReplaceTempView("myTable")
spark.sql("select column1, \
    count(1) as count from myTable where column2='xxx' group by column1") \
    .show()

Copy

 Copied to clipboard

See Also