The NoSQL Spark DataFrame
Introduction
The platform includes the Iguazio Spark connector, which defines a custom Spark data source for reading and writing NoSQL data in the platform's NoSQL store using Spark DataFrames. A Spark DataFrame of this data-source format is referred to in the documentation as a NoSQL DataFrame. This data source supports data pruning and filtering (predicate pushdown), which allows Spark queries to operate on a smaller amount of data; only the data that is required by the active job is loaded. The data source also allows you to work with partitioned tables; perform "replace" mode and conditional item updates; define specific item attributes as counter attributes and easily increment or decrement their values; and perform optimized range scans.
Data Source
To use the Iguazio Spark connector to read or write NoSQL data in the platform, use the "io.iguaz.v3io.spark.sql.kv"
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
val df = spark.read.format("io.iguaz.v3io.spark.sql.kv")
.load("v3io://mycontainer/src_table")
df.write.format("io.iguaz.v3io.spark.sql.kv")
.option("key", "id").save("v3io://mycontainer/dest_table")
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
df = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
.load("v3io://mycontainer/src_table")
df.write.format("io.iguaz.v3io.spark.sql.kv") \
.option("key", "id").save("v3io://mycontainer/dest_table")
Table Paths
Specify the path to the NoSQL table that is associated with the DataFrame as a fully qualified v3io
path of the following format — where <container name>
is the name of the table's parent data container and <data path>
is the relative path to the table within the specified container (see Data Paths in the Spark Datasets overview):
v3io://<container name>/<table path>
Examples
The following example uses a Spark DataFrame to create a NoSQL table named "cars" in a
val nosql_source = "io.iguaz.v3io.spark.sql.kv"
var table_path = "v3io://projects/mydata/cars/"
val writeDF = Seq(
("7843321", "Honda", "Accord", "silver", 123000),
("2899941", "Ford", "Mustang", "white", 72531),
("6689123", "Kia", "Picanto", "red", 29320)
)
writeDF.toDF("reg_license", "vendor", "model", "color", "odometer")
.write.format(nosql_source)
.option("key", "reg_license")
.mode("overwrite")
.save(table_path)
val readDF = spark.read.format(nosql_source).load(table_path)
readDF.show()
import sys
from pyspark.sql import *
from pyspark.sql.functions import *
nosql_source = "io.iguaz.v3io.spark.sql.kv"
table_path = "v3io://projects/mydata/cars/"
writeDF = spark.createDataFrame([
("7843321", "Honda", "Accord", "silver", 123000),
("2899941", "Ford", "Mustang", "white", 72531),
("6689123", "Kia", "Picanto", "red", 29320),
], ["reg_license", "vendor", "model", "color", "odometer"])
writeDF.write.format(nosql_source) \
.option("key", "reg_license") \
.mode("overwrite") \
.save(table_path)
readDF = spark.read.format(nosql_source).load(table_path)
readDF.show()
The following code shows several equivalent alternatives for changing the table path from the previous example to a "cars" table in the running-user directory of the "users" container; (note the Python code requires that you add import os
table_path = "v3io://users/iguazio/cars"
table_path = "v3io://users/" + System.getenv("V3IO_USERNAME") + "/cars"
table_path = "v3io://" + System.getenv("V3IO_HOME") + "/cars"
table_path = System.getenv("V3IO_HOME_URL") + "/cars"
table_path = "v3io://users/iguazio/cars"
table_path = "v3io://users/" + os.getenv("V3IO_USERNAME") + "/cars"
table_path = "v3io://" + os.getenv("V3IO_HOME") + "/cars"
table_path = os.getenv("V3IO_HOME_URL") + "/cars"
For additional examples, see the Examples section on this page.
Save Modes
The Iguazio Spark connector supports the standard Spark DataFrame save modes, which can be set using the Spark DataFrame
Options
Use the Spark DataFrame
- inferSchema
Set this option to
true
(option("inferSchema", "true")
) to instruct the platform to infer the schema of the NoSQL data that is being read. See Inferring the Table Schema.- Type: Boolean
- Requirement: Optional
- key
The name of the table's sharding-key attribute (for example,
username
). This option is used together with the optionalsorting-key option to define the table's primary key, which uniquely identifies items within the table (see Spark DataFrames and Tables).For example, for a DataFrame item (row) with a
username attribute (column) whose value is"johnd"
, callingoption("key", "username")
without setting thesorting-key option defines a simpleusername
primary key and sets the item's primary-key value (name) tojohnd
.Note-
The written DataFrame must contain a compatible attribute (column) whose name matches the value of the
key option. Do not modify the value of this attribute after the item's ingestion, as this will result in a mismatch with the item's name and primary-key value (which remains unchanged). -
The value of the sharding-key attribute cannot contain periods, because the leftmost period in an item's primary-key value is assumed to be a separator between sharding and sorting keys.
-
See the primary-key guidelines in the Best Practices for Defining Primary Keys and Distributing Data Workloads guide.
- Type: String
- Requirement: Required
-
- sorting-key
The name of the table's sorting-key attribute (for example,
login-date
). This option can optionally be used together with thekey option to define a compound primary key, which uniquely identifies items within the table (see Spark DataFrames and Tables).For example, for a DataFrame item (row) with a
username attribute whose value is"johnd"
and alogin-date attribute whose value is"20180125"
, calling bothoption("key", "username")
andoption("sorting-key", "login-date")
defines a compoundusername.login-date
primary key and sets the item's primary-key value (name) tojohnd.20180125
. When using the even-distribution write option, the item's primary-key value will bejohnd_<n>.20180125
(for example,johnd_2.20180125
) — see Even Workload Distribution.Note-
The written DataFrame must contain a compatible attribute (column) whose name matches the value of the
sorting-key option. Do not modify the value of this attribute after the item's ingestion, as this will result in a mismatch with the item's name and primary-key value. -
You must set this option if you wish to define a compound
<sharding key>.<sorting key>
table primary key. Note that support for range scans requires a compound primary key and that range scans for tables with a string sorting-key attribute are more efficient. For more information and best-practice guidelines, see Best Practices for Defining Primary Keys and Distributing Data Workloads.
- Type: String
- Requirement: Optional
-
- allow-overwrite-schema
Set this option to
true
(option("allow-overwrite-schema", "true")
) to instruct the platform to overwrite the current schema of the target table (if exists) with the schema that is automatically inferred from the contents of the DataFrame. By default, if the inferred schema differs from an existing schema for the same table, the existing schema isn't overwritten and the write fails — see Overwriting an Existing Table Schema.- Type: Boolean
- Requirement: Optional
- columnUpdate
Set this option to
true
(option("columnUpdate", "true")
) together with theappend save mode for a custom replace mode — append new items and overwrite existing items (similar to the update logic of thecounter option). See the replace-mode write example.- Type: Boolean
- Requirement: Optional
- condition
A Boolean condition expression that defines a conditional logic for executing the write operation. See Condition Expression for syntax details and examples. As explained in the expression reference documentation, attributes in the target table item are referenced in the expression by using the attribute name. To reference a column (attribute) in the write DataFrame from within the expression, use the syntax
. For example,${<column name>}
option("condition", "${version} > version)"
will write to the table only if the table has a matching item (identified by its name — see thekey option) whose currentversion attribute value is lower than the value of theversion column (attribute) in the Spark DataFrame. For more information, see Conditional Updates and the conditional-update write example.- Type: String
- Requirement: Optional
- counter
A comma-separated list of one or more attribute (column) names that identify counter attributes. For example,
option("counter", "odometer, loops")
identifiesodometer andloops as counter attributes. For more information, see Counter Attributes and the counter-attributes write example.Note- Counter attributes must have numeric values.
- The
counter option is supported only with the NoSQL DataFrameappend save mode, which for counter attributes functions as a custom replace mode (similar to the update logic of thecolumnUpdate option). - The DataFrame should contain a value for each of the specified counter attributes. This value will be added to or subtracted from the attribute's current value, or used as the initial attribute value if the attribute doesn't already exist in the table.
- Type: String
- Requirement: Optional
- partition
[Tech Preview] A comma-separated list of one or more attribute (column) names that identify partition attributes. The written items are saved to<table path>/<attribute>=<value>[/<attribute>=<value>/...] partition directories according to the values of the items' partition attributes. Note that the order of the partition attribute names in the option string determines the partitioning hierarchy. For example,option("partition", "year, month, day, hour")
identifiesyear ,month ,day , andhour as partition attributes and saves items inyear=<value>/month=<value>/day=<value>/hour=<value> partitions (such asmytable/year=2018/month=2/day=12/hour=21 ) within the root table directory. For more information and examples, see Partitioned Tables.- Type: String
- Requirement: Optional
- range-scan-even-distribution
Set this option to
true
(option("range-scan-even-distribution", "true")
) to instruct the platform to distribute items with the same sharding-key attribute value among multiple data slices, to achieve a more even distribution of non-uniform data. This option is applicable only for tables with a compound<sharding key>.<sorting key>
primary key, which can be created by using both thekey andsorting-key write options. For more information, see Even Workload Distribution.- Type: Boolean
- Requirement: Optional
Defining the Table Schema
Spark DataFrames handle structured data. Therefore, Spark needs to be aware of the schema of the data structure. When writing NoSQL data by using the platform's Spark DataFrame or Frames APIs, the schema of the data table is automatically identified and saved and then retrieved when reading the data with a Spark DataFrame, Frames, or Presto (unless you select to explicitly define the schema for the read operation). However, to use a Spark DataFrame, Frames, or Presto to read NoSQL data that was written to a table in another way, you first need to define the table schema. You can use either of the following alternative methods to define or update the schema of a NoSQL table as part of a NoSQL DataFrame read operation:
- Use the custom
inferSchema option to infer the schema (recommended). - Define the schema programmatically as part of the Spark DataFrame read operation. (You can also do this for data that was written using a Spark DataFrame in the platform, although it's not required.)
For more information, see the NoSQL Table Schema Reference.
Overwriting an Existing Table Schema
By default, if the schema inferred from the DataFrame's contents during a write operation differs from the table's current schema — as defined in its schema file (if such a file exists) — the write fails.
This is designed to protect against inadvertent schema changes.
However, you can override this default behavior by using the custom
Table Schema-Overwrite Examples
The following example creates a "mytable" table in a "mycontainer" data container with
import org.apache.spark.sql.SparkSession
var df = Seq(("a", "z", 123), ("b", "y", 456))
.toDF("AttrA", "AttrB", "AttrC")
df.write.format("io.iguaz.v3io.spark.sql.kv")
.mode("overwrite")
.option("key", "AttrA")
.save("v3io://mycontainer/mytable/")
df = Seq(("c", "x", 32.12), ("d", "v", 45.2))
.toDF("AttrA", "AttrB", "AttrC")
df.write.format("io.iguaz.v3io.spark.sql.kv")
.mode("append")
.option("key", "AttrA")
.option("allow-overwrite-schema", "true")
.save("v3io://mycontainer/mytable/")
from pyspark.sql import SparkSession
df = spark.createDataFrame([
("a", "z", 123),
("b", "y", 456)
], ["AttrA", "AttrB", "AttrC"])
df.write.format("io.iguaz.v3io.spark.sql.kv") \
.mode("overwrite") \
.option("key", "AttrA") \
.save("v3io://mycontainer/mytable/")
df = spark.createDataFrame([
("c", "x", 32.12),
("d", "v", 45.2)
], ["AttrA", "AttrB", "AttrC"])
df.write.format("io.iguaz.v3io.spark.sql.kv") \
.mode("append") \
.option("key", "AttrA") \
.option("allow-overwrite-schema", "true") \
.save("v3io://mycontainer/mytable/")
If you remove or comment out the option("allow-overwrite-schema", "true")
call in the second write command, the write will fail with the following schema-mismatch error:
java.lang.RuntimeException: Note you are about the rewrite existing schema file.
old schema = Schema(List(Field(AttrA,string,false,None), Field(AttrB,string,true,None), Field(AttrC,long,false,None)),AttrA,None,0)
new schema = Schema(ArraySeq(Field(AttrA,string,false,None), Field(AttrB,string,true,None), Field(AttrC,double,false,None)),AttrA,None,0).
Inferring the Table Schema
You can use the custom NoSQL DataFrame
Infer-Schema Examples
The following example uses a Spark DataFrame to read data from a NoSQL "employees" table in a "department_store" data container.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
val myDF = spark.read.format("io.iguaz.v3io.spark.sql.kv")
.option("inferSchema", "true")
.load("v3io://department_store/employees")
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
myDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
.option("inferSchema", "true") \
.load("v3io://department_store/employees")
To generate a schema file from the inferred schema, you can write back the content of the read DataFrame to the same table using the limit(0)
to write only the schema file:
myDF.limit(0).write.format("io.iguaz.v3io.spark.sql.kv")
.mode("append")
.option("key", "employee_id")
.save("v3io://department_store/employees")
myDF.limit(0).write.format("io.iguaz.v3io.spark.sql.kv") \
.mode("append") \
.option("key", "employee_id") \
.save("v3io://department_store/employees")
Defining the Table Schema Programmatically
You can define a NoSQL DataFrame's table schema programmatically by using the Spark DataFrame schema
variable, you can call schema(schema)
.
See The Item-Attributes Schema Object ('fields') reference and the following examples.
Programmatic Table-Schema Definition Examples
The following example uses a Spark DataFrame to read data from a NoSQL "employees" table in a "department_store" data container.
The table has five attributes (columns), which are depicted using the schema
variable:
- "id" — a numeric employee ID, which serves as the table's primary key and isn't nullable.
- "firstname" — the employee's first name, depicted as a string.
- "lastname" — the employee's last name, depicted as a string.
- "department" — the department to which the employee belongs, depicted as a string .
- "managerid" — the numeric ID of the employee's manager.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
val schema = StructType(List(
StructField("id", LongType, nullable = false),
StructField("firstname", StringType, nullable = true),
StructField("lastname", StringType, nullable = true),
StructField("department", StringType, nullable = true),
StructField("managerid", LongType, nullable = true)))
val myDF = spark.read.schema(schema)
.format("io.iguaz.v3io.spark.sql.kv")
.load("v3io://department_store/employees")
import sys
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
schema = StructType([
StructField("id", LongType(), False),
StructField("firstname", StringType(), True),
StructField("lastname", StringType(), True),
StructField("department", StringType(), True),
StructField("managerid", LongType(), True)])
myDF = spark.read.schema(schema) \
.format("io.iguaz.v3io.spark.sql.kv") \
.load("v3io://department_store/employees")
Conditional Updates
You can use the custom true
.
The condition expression is evaluated against the table item to be updated, if it exists.
If the condition evaluates to true
, the write operation is executed and the item is updated or created; otherwise, the operation completes successfully without an update.
- If the expression references a non-existent item attribute, the condition processing stops and the operation completes successfully without updating or creating the item.
- If the item doesn't exist and the condition expression doesn't reference any attributes (for example, a
"1==1"
or"2==3"
expression, which could potentially be auto generated in some programming scenarios), the operation completes successfully and the item is updated or created only if the condition evaluates totrue
.
See the NoSQL DataFrame conditional-update write example.
Counter Attributes
The Iguazio Spark connector enhances the standard Spark DataFrame by allowing you to define numeric attributes (columns) in a table as counter attributes and easily increment or decrement the attributes' values.
This is done by using the custom
The
- If a DataFrame attribute isn't already found in the table, the attribute is added to the table and initialized to the value set for it in the DataFrame. If the table or item don't exist, they're created and updated with the DataFrame's contents. This is the standard append save-mode behavior ant it's the same for both counter and non-counter attributes.
- If a DataFrame counter attribute is already found in the table, its value is incremented or decremented according to the value that was set for this attribute in the DataFrame — i.e., the attribute value indicates the increment or decrement step (a negative value = decrement).
- If a DataFrame non-counter attribute is already found in the table, its value is overwritten with the value that was set for it in the DataFrame but other attributes in the table remain unaffected (i.e., replace mode, similar to an
append write with thecolumnUpdate option.)
See the NoSQL DataFrame counter-attributes write example.
Partitioned Tables [Tech Preview]
Table partitioning is a common technique for optimizing physical data layout and related queries. In a partitioned table, some item attributes (columns) are used to create partition directories within the root table directory using the format
The Iguazio Spark connector supports table partitioning for the NoSQL DataFrame
- Creating a partitioned table — the custom NoSQL DataFrame
partition option allows you to select specific item attributes (columns) in a write DataFrame to be used as partitions. When using this option, the platform creates the necessary partition directory path for each written item. (Note that after you define partitions for a table, you need to specify the same partitions whenever your write to this table unless you decide to overwrite it.) - Querying a partitioned table — a partitioned table is queried like any other table, with the table path set to the root table directory and not to a specific partition directory.
Version 3.5.5 of the platform doesn't support using using wild cards in the table path, such as
"mytable/year=*/month=5"
to search themonth=5 directories in allmytable/year="value" directories. However, you can easily restrict the query to specific partition directories by using the Spark DataFramefilter method with a filter that references one of the partition attributes. In such cases, the platform searches the root table directory that is specified in the read command for nested directories of the format<attribute>=<value> . If it finds such directories, it searches only the partition directories that match the query. For example, for a table partitioned byyear andmonth attributes, amonth == 12
filter will return only the items from themonth=12 partition directories in allyear=* directories.
Table-Partitioning Examples
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Table-Partitioning Example").getOrCreate()
val table_path = "v3io://mycontainer/weather/"
val df = Seq(
(2016, 3, 25, 6, 16, 0.00, 55),
(2016, 3, 25, 17, 19, 0.10, 62),
(2016, 7, 24, 7, 20, 0.00, 52),
(2016, 12, 24, 9, 10, 0.05, 47),
(2016, 12, 24, 19, 8, 0.20, 47),
(2017, 5, 7, 14, 21, 0.00, 70),
(2017, 11, 1, 10, 16, 0.00, 34),
(2017, 11, 1, 22, 13, 0.01, 41),
(2017, 12, 12, 16, 12, 0.00, 47),
(2017, 12, 24, 17, 11, 1.00, 50),
(2018, 1, 18, 5, 8, 0.00, 37),
(2018, 1, 18, 17, 10, 2.00, 45),
(2018, 5, 20, 15, 24, 0.00, 62),
(2018, 5, 20, 21, 20, 0.00, 59),
(2018, 11, 1, 11, 11, 0.12, 65)
).toDF("year", "month", "day", "hour", "degrees_cel", "rain_ml", "humidity_per")
val df_with_key = df.withColumn("time", concat($"year", $"month", $"day", $"hour"))
df_with_key.write.format("io.iguaz.v3io.spark.sql.kv")
.mode("overwrite")
.option("key", "time")
.option("partition", "year, month, day")
.save(table_path)
var readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
.filter($"month" < 7)
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
.filter($"month" === 12 && $"day" === 24)
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
.filter($"month" > 6 && $"hour" >= 8 && $"hour" <= 20)
readDF.show()
import sys
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("Table-Partitioning Example").getOrCreate()
table_path = "v3io://mycontainer/weather/"
df = spark.createDataFrame([
(2016, 3, 25, 6, 16, 0.00, 55),
(2016, 3, 25, 17, 19, 0.10, 62),
(2016, 7, 24, 7, 20, 0.00, 52),
(2016, 12, 24, 9, 10, 0.05, 47),
(2016, 12, 24, 19, 8, 0.20, 47),
(2017, 5, 7, 14, 21, 0.00, 70),
(2017, 11, 1, 10, 16, 0.00, 34),
(2017, 11, 1, 22, 13, 0.01, 41),
(2017, 12, 12, 16, 12, 0.00, 47),
(2017, 12, 24, 17, 11, 1.00, 50),
(2018, 1, 18, 5, 8, 0.00, 37),
(2018, 1, 18, 17, 10, 2.00, 45),
(2018, 5, 20, 15, 24, 0.00, 62),
(2018, 5, 20, 21, 20, 0.00, 59),
(2018, 11, 1, 11, 11, 0.12, 65)
], ["year", "month", "day", "hour", "degrees_cel", "rain_ml", "humidity_per"])
df_with_key = df.withColumn(
"time", concat(df["year"], df["month"], df["day"], df["hour"]))
df_with_key.write.format("io.iguaz.v3io.spark.sql.kv") \
.mode("overwrite") \
.option("key", "time") \
.option("partition", "year, month, day, hour") \
.save(table_path)
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
.filter("month < 7")
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
.filter("month == 12 AND day == 24")
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
.filter("month > 6 AND hour >= 8 AND hour <= 20")
readDF.show()
This examples creates a partitioned "weather" table in a "mycontainer" data container.
The option("partition", "year, month, day")
write option partitions the table by the 2016, 3, 25, 6, 16, 0.00, 55
) is saved to a
Following is the output of the example's
Full table read
+----+-----+---+----+-----------+-------+------------+----------+
|year|month|day|hour|degrees_cel|rain_ml|humidity_per| time|
+----+-----+---+----+-----------+-------+------------+----------+
|2016| 12| 24| 9| 10| 0.05| 47| 201612249|
|2016| 12| 24| 19| 8| 0.2| 47|2016122419|
|2016| 3| 25| 6| 16| 0.0| 55| 20163256|
|2016| 3| 25| 17| 19| 0.1| 62| 201632517|
|2016| 7| 24| 7| 20| 0.0| 52| 20167247|
|2017| 11| 1| 22| 13| 0.01| 41| 201711122|
|2017| 11| 1| 10| 16| 0.0| 34| 201711110|
|2017| 12| 12| 16| 12| 0.0| 47|2017121216|
|2017| 12| 24| 17| 11| 1.0| 50|2017122417|
|2017| 5| 7| 14| 21| 0.0| 70| 20175714|
|2018| 1| 18| 5| 8| 0.0| 37| 20181185|
|2018| 1| 18| 17| 10| 2.0| 45| 201811817|
|2018| 11| 1| 11| 11| 0.12| 65| 201811111|
|2018| 5| 20| 15| 24| 0.0| 62| 201852015|
|2018| 5| 20| 21| 20| 0.0| 59| 201852021|
+----+-----+---+----+-----------+-------+------------+----------+
month < 7
filter — retrieve all data for the first six months of each year:
+----+-----+---+----+-----------+-------+------------+---------+
|year|month|day|hour|degrees_cel|rain_ml|humidity_per| time|
+----+-----+---+----+-----------+-------+------------+---------+
|2016| 3| 25| 6| 16| 0.0| 55| 20163256|
|2016| 3| 25| 17| 19| 0.1| 62|201632517|
|2017| 5| 7| 14| 21| 0.0| 70| 20175714|
|2018| 1| 18| 5| 8| 0.0| 37| 20181185|
|2018| 1| 18| 17| 10| 2.0| 45|201811817|
|2018| 5| 20| 15| 24| 0.0| 62|201852015|
|2018| 5| 20| 21| 20| 0.0| 59|201852021|
+----+-----+---+----+-----------+-------+------------+---------+
month == 12 AND day == 24
filter — retrieve all hours on Dec 24 each year:
+----+-----+---+----+-----------+-------+------------+----------+
|year|month|day|hour|degrees_cel|rain_ml|humidity_per| time|
+----+-----+---+----+-----------+-------+------------+----------+
|2016| 12| 24| 9| 10| 0.05| 47| 201612249|
|2016| 12| 24| 19| 8| 0.2| 47|2016122419|
|2017| 12| 24| 17| 11| 1.0| 50|2017122417|
+----+-----+---+----+-----------+-------+------------+----------+
month > 6 AND hour >= 8 AND hour <= 20
filter — retrieve 08:00–20:00 data for every day in the last six months of each year:
+----+-----+---+----+-----------+-------+------------+----------+
|year|month|day|hour|degrees_cel|rain_ml|humidity_per| time|
+----+-----+---+----+-----------+-------+------------+----------+
|2016| 12| 24| 9| 10| 0.05| 47| 201612249|
|2016| 12| 24| 19| 8| 0.2| 47|2016122419|
|2017| 11| 1| 10| 16| 0.0| 34| 201711110|
|2017| 12| 12| 16| 12| 0.0| 47|2017121216|
|2017| 12| 24| 17| 11| 1.0| 50|2017122417|
|2018| 11| 1| 11| 11| 0.12| 65| 201811111|
+----+-----+---+----+-----------+-------+------------+----------+
Range Scans
A NoSQL Spark DataFrame table query that uses supported sharding-key and optional sorting-key filters to retrieve items with the same sharding-key value, is processed by performing a range scan, which is more efficient than the standard full table scan. See NoSQL Range Scans. Note that the support for Spark DataFrame range scans requires a table schema that was inferred with a NoSQL Spark DataFrame, Frames, or the Iguazio Trino connector.
=
) or IN (IN
/=
/>
/>=
/<
/<=
) to the sorting-key attribute.Range-Scan Examples
Example 1 — Basic Range Scan
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Range-Scan Example").getOrCreate()
val table_path = "v3io://mycontainer/mytaxis/rides/"
var writeDF = Seq(
(24, "20180601", 8, 332.0, 18),
(24, "20180602", 5, 260.0, 11),
(24, "20180701", 7, 352.1, 21),
(1, "20180601", 25, 125.0, 40),
(1, "20180602", 20, 106.0, 46),
(1, "20180701", 28, 106.4, 42),
(16, "20180601", 1, 224.2, 8),
(16, "20180602", 10, 244.0, 45),
(16, "20180701", 6, 193.2, 24)
).toDF("driver_id", "date", "num_rides", "total_km", "total_passengers")
writeDF = writeDF
.withColumn("avg_ride_km", $"total_km" / $"num_rides")
.withColumn("avg_ride_passengers", $"total_passengers" / $"num_rides")
writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
.mode("overwrite")
.option("key", "driver_id")
.option("sorting-key", "date")
.save(table_path)
// Range-scan queries
var readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
.filter($"driver_id" === 1)
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
.filter($"driver_id" === 24 && $"date" >= "20180101" && $"date" < "20180701")
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
.filter($"driver_id".isin(1, 16, 24) && $"avg_ride_passengers" >= 3)
readDF.show()
}
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Range-Scan Example").getOrCreate()
table_path = "v3io://mycontainer/mytaxis/rides/"
writeDF = spark.createDataFrame([
(24, "20180601", 8, 332.0, 18),
(24, "20180602", 5, 260.0, 11),
(24, "20180701", 7, 352.1, 21),
(1, "20180601", 25, 125.0, 40),
(1, "20180602", 20, 106.0, 46),
(1, "20180701", 28, 106.4, 42),
(16, "20180601", 1, 224.2, 8),
(16, "20180602", 10, 244.0, 45),
(16, "20180701", 6, 193.2, 24)
], ["driver_id", "date", "num_rides", "total_km", "total_passengers"])
writeDF = writeDF.withColumn(
"avg_ride_km", writeDF["total_km"] / writeDF["num_rides"]) \
.withColumn(
"avg_ride_passengers", writeDF["total_passengers"] / writeDF["num_rides"])
writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
.mode("overwrite") \
.option("key", "driver_id") \
.option("sorting-key", "date") \
.save(table_path)
# Range-scan queries
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
.filter("driver_id == 1")
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
.filter("driver_id == 24 AND date >= '20180101' AND date < '20180701'")
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
.filter("driver_id IN (1, 16, 24) AND avg_ride_passengers >= 3")
readDF.show()
This example creates a "rides" table in a option("key", "driver_id")
and option("sorting-key", "date")
write options define the <sharding-key value>.<sorting-key value>
, as demonstrated in the following image (for example, 16.20180602
):
All of the
"driver_id == 1"
filter — retrieve all items with a 1
(regardless of the sorting-key value):
+---------+--------+---------+--------+----------------+------------------+-------------------+
|driver_id| date|num_rides|total_km|total_passengers| avg_ride_km|avg_ride_passengers|
+---------+--------+---------+--------+----------------+------------------+-------------------+
| 1|20180601| 25| 125.0| 40| 5.0| 1.6|
| 1|20180602| 20| 106.0| 46| 5.3| 2.3|
| 1|20180701| 28| 106.4| 42|3.8000000000000003| 1.5|
+---------+--------+---------+--------+----------------+------------------+-------------------+
"driver_id == 24 AND date >= '20180101' AND date < '20180701'"
filter — retrieve all items with a 24
and a
+---------+--------+---------+--------+----------------+-----------+-------------------+
|driver_id| date|num_rides|total_km|total_passengers|avg_ride_km|avg_ride_passengers|
+---------+--------+---------+--------+----------------+-----------+-------------------+
| 24|20180601| 8| 332.0| 18| 41.5| 2.25|
| 24|20180602| 5| 260.0| 11| 52.0| 2.2|
+---------+--------+---------+--------+----------------+-----------+-------------------+
"driver_id IN (1, 16, 24) AND avg_ride_passengers >= 3"
filter — retrieve all items with a 1
, 16
, or 24
(regardless of the sorting-key value) and an
+---------+--------+---------+--------+----------------+------------------+-------------------+
|driver_id| date|num_rides|total_km|total_passengers| avg_ride_km|avg_ride_passengers|
+---------+--------+---------+--------+----------------+------------------+-------------------+
| 16|20180601| 1| 224.2| 8| 224.2| 8.0|
| 16|20180602| 10| 244.0| 45| 24.4| 4.5|
| 16|20180701| 6| 193.2| 24|32.199999999999996| 4.0|
| 24|20180701| 7| 352.1| 21|50.300000000000004| 3.0|
+---------+--------+---------+--------+----------------+------------------+-------------------+
Example 2 — Even-Distribution Range Scan
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Even-Distribution Range-Scan Example").getOrCreate()
val table_path = "v3io://mycontainer/mytaxis/even_distribution_range_scan/rides/"
var writeDF = Seq(
(24, "20180601", 8, 332.0, 18),
(24, "20180602", 5, 260.0, 11),
(24, "20180701", 7, 352.1, 21),
(1, "20180101", 4, 90.0, 14),
(1, "20180102", 14, 141.4, 28),
(1, "20180202", 8, 220.8, 22),
(1, "20180601", 25, 125.0, 40),
(1, "20180602", 20, 106.0, 46),
(1, "20180701", 28, 106.4, 42),
(16, "20180601", 1, 224.2, 8),
(16, "20180602", 10, 244.0, 45),
(16, "20180701", 6, 193.2, 24)
).toDF("driver_id", "date", "num_rides", "total_km", "total_passengers")
.withColumn("avg_ride_km", $"total_km" / $"num_rides")
.withColumn("avg_ride_passengers", $"total_passengers" / $"num_rides")
writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
.mode("overwrite")
.option("key", "driver_id")
.option("sorting-key", "date")
.option("range-scan.even-distribution", "true")
.save(table_path)
// Range-scan queries
var readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
.filter($"driver_id" === 1)
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
.filter($"driver_id" === 24 && $"date" >= "20180101" && $"date" < "20180701")
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
.filter($"driver_id".isin(1, 16, 24) && $"avg_ride_passengers" >= 3)
readDF.show()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Even-Distribution Range-Scan Example").getOrCreate()
table_path = "v3io://mycontainer/mytaxis/even_distribution_range_scan/rides/"
writeDF = spark.createDataFrame([
(24, "20180601", 8, 332.0, 18),
(24, "20180602", 5, 260.0, 11),
(24, "20180701", 7, 352.1, 21),
(1, "20180101", 4, 90.0, 14),
(1, "20180102", 14, 141.4, 28),
(1, "20180202", 8, 220.8, 22),
(1, "20180601", 25, 125.0, 40),
(1, "20180602", 20, 106.0, 46),
(1, "20180701", 28, 106.4, 42),
(16, "20180601", 1, 224.2, 8),
(16, "20180602", 10, 244.0, 45),
(16, "20180701", 6, 193.2, 24)
], ["driver_id", "date", "num_rides", "total_km", "total_passengers"])
writeDF = writeDF.withColumn(
"avg_ride_km", writeDF["total_km"] / writeDF["num_rides"]) \
.withColumn(
"avg_ride_passengers", writeDF["total_passengers"] / writeDF["num_rides"])
writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
.mode("overwrite") \
.option("key", "driver_id") \
.option("sorting-key", "date") \
.option("range-scan.even-distribution", "true") \
.save(table_path)
# Range-scan queries
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
.filter("driver_id == 1")
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
.filter("driver_id == 24 AND date >= '20180101' AND date < '20180701'")
readDF.show()
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
.filter("driver_id IN (1, 16, 24) AND avg_ride_passengers >= 3")
readDF.show()
This example creates a "rides" table in a writeDF
write command — option("range-scan.even-distribution", "true")
— to recalculate the items' sharding-key values and distribute the items more evenly across multiple data slices.
See Even Workload Distribution.
The read queries remain the same as in the basic range-scan example.
However, if you browse the container in the dashboard after running the example, you'll see that the names of the files in the <original sharding-key value>_<n>.<sorting-key value>
, as demonstrated in the following image, and not <original sharding-key value>.<sorting-key value>
as in the basic example (for example, 16_36.20180602
instead of 16.20180602
):
Following is the output of the example's
"driver_id == 1"
filter — retrieve all items with a 1
(regardless of the sorting-key value):
+---------+--------+---------+--------+----------------+------------------+-------------------+
|driver_id| date|num_rides|total_km|total_passengers| avg_ride_km|avg_ride_passengers|
+---------+--------+---------+--------+----------------+------------------+-------------------+
| 1|20180202| 8| 220.8| 22| 27.6| 2.75|
| 1|20180102| 14| 141.4| 28| 10.1| 2.0|
| 1|20180101| 4| 90.0| 14| 22.5| 3.5|
| 1|20180602| 20| 106.0| 46| 5.3| 2.3|
| 1|20180701| 28| 106.4| 42|3.8000000000000003| 1.5|
| 1|20180601| 25| 125.0| 40| 5.0| 1.6|
+---------+--------+---------+--------+----------------+------------------+-------------------+
"driver_id == 24 AND date >= '20180101' AND date < '20180701'"
filter — retrieve all items with a 24
and a
+---------+--------+---------+--------+----------------+-----------+-------------------+
|driver_id| date|num_rides|total_km|total_passengers|avg_ride_km|avg_ride_passengers|
+---------+--------+---------+--------+----------------+-----------+-------------------+
| 24|20180601| 8| 332.0| 18| 41.5| 2.25|
| 24|20180602| 5| 260.0| 11| 52.0| 2.2|
+---------+--------+---------+--------+----------------+-----------+-------------------+
"driver_id IN (1, 16, 24) AND avg_ride_passengers >= 3"
filter — retrieve all items with a 1
, 16
, or 24
(regardless of the sorting-key value) and an
+---------+--------+---------+--------+----------------+------------------+-------------------+
|driver_id| date|num_rides|total_km|total_passengers| avg_ride_km|avg_ride_passengers|
+---------+--------+---------+--------+----------------+------------------+-------------------+
| 1|20180101| 4| 90.0| 14| 22.5| 3.5|
| 16|20180602| 10| 244.0| 45| 24.4| 4.5|
| 16|20180701| 6| 193.2| 24|32.199999999999996| 4.0|
| 16|20180601| 1| 224.2| 8| 224.2| 8.0|
| 24|20180701| 7| 352.1| 21|50.300000000000004| 3.0|
+---------+--------+---------+--------+----------------+------------------+-------------------+
Even Workload Distribution
The NoSQL Spark DataFrame offers custom support for even distribution of ingested items across the available data slices for the parent container. The objective is to improve the system's performance when working with a non-uniform data set — see the Recalculating Sharding-Key Values for Even Workload Distribution best-practice guidelines.
When writing (ingesting) data to a table with a compound <sharding key>.<sorting key>
primary key (see the <original sharding-key value>_<n>.<sorting-key value>
(for example, johnd_1.20180602
) — as demonstrated in the even-distribution range-scan example.
When submitting a NoSQL Spark DataFrame or Presto sharding-key query for a table that was created with the even-distribution Spark DataFrame option or by using similar calculations, use the original sharding-key value. Behind the scenes, the platform searches for all the primary-key values that were derived from the original sharding-key value. Note that this custom support requires a table schema that was inferred with a NoSQL Spark DataFrame or with the Iguazio Trino connector.
For more information on the behind-the-scenes implementation to support this feature, see the Using a NoSQL Spark DataFrame for Even Workload Distribution best-practices documentation.
Examples
Following are some examples of using the NoSQL Spark DataFrame. For schema-definition, table-partitioning, and range-scan examples, see the Defining the Table Schema, Partitioned Tables, and Range Scans sections, respectively; the range-scan examples also demonstrate the support for even workload distribution.
Read Examples
- For a NoSQL DataFrame read example with an explicit table-schema definition, see the examples in the Defining the Table Schema Programmatically section.
- The conditional-update write example also demonstrates reading from a NoSQL table to validate the execution of the write commands.
Example 1 — Basic Read and Related Queries
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
val df = spark.read.format("io.iguaz.v3io.spark.sql.kv")
.option("inferSchema", "true")
.load("v3io://mycontainer/WebData")
df.select($"url", $"pages" + $"ads" as "views")
.where($"browser" != lit("Chrome"))
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
val df = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
.option("inferSchema", "true") \
.load("v3io://mycontainer/WebData")
df.select(df["url"], df["pages"] + df["ads"] as "views") \
.where(df["browser"] != lit("Chrome"))
This example reads page-click data from a "WebData" table in a "mycontainer" data container.
The
Write Examples
Example 1 — Simple Append-Mode Write
val df = Seq(("ians", "Ian Smith", 25), ("dorisb", "Doris Brown", 31))
.toDF("username", "name", "age")
df.write.format("io.iguaz.v3io.spark.sql.kv")
.mode("append")
.option("key", "username")
.save("v3io://mycontainer/IT/Users/")
df = spark.createDataFrame([
("ians", "Ian Smith", 25),
("dorisb", "Doris Brown", 31)
], ["username", "name", "age"])
df.write.format("io.iguaz.v3io.spark.sql.kv") \
.mode("append") \
.option("key", "username") \
.save("v3io://mycontainer/IT/Users/")
This example writes two items (rows) to an "IT/Users" table in a "mycontainer" data container whose primary-key attribute is
Example 2 — Replace-Mode Write
val df = Seq(("ians", "Ian Smith", 26), ("janed", "Jane Doe", 42))
.toDF("username", "name", "age")
df.write.format("io.iguaz.v3io.spark.sql.kv")
.mode("append")
.option("columnUpdate", "true")
.option("key", "username")
.save("v3io://mycontainer/IT/Users/")
df = spark.createDataFrame([
("ians", "Ian Smith", 26),
("janed", "Jane Doe", 42)
], ["username", "name", "age"])
df.write.format("io.iguaz.v3io.spark.sql.kv") \
.mode("append") \
.option("columnUpdate", "true") \
.option("key", "username") \
.save("v3io://mycontainer/IT/Users/")
This example writes two items (rows) to the same table as in the previous simple append-mode write example — "IT/Users" table in a "mycontainer" data container whose primary-key attribute is "true"
.
Therefore, assuming the code is run after the simple append-mode write example, the new "janed" item (which doesn't exist in the table) will be appended to the table; the existing "ians" item, which was included in the previous write example, will be overwritten with the item from the new write DataFrame (and the value of the
Example 3 — Counter Attributes
val df = Seq((532, 5, "IP-953481-35", "Jenny Penny", 7866689))
.toDF("kms", "num_drives", "policy", "primary_driver", "reg_num")
df.write.format("io.iguaz.v3io.spark.sql.kv")
.mode("append")
.option("key", "reg_num")
.option("counter", "kms, num_drives")
.save("v3io://mycontainer/Cars/")
df = spark.createDataFrame([
(532, 5, "IP-953481-35", "Jenny Penny", 7866689)
], ["kms", "num_drives", "policy", "primary_driver", "reg_num"])
df.write.format("io.iguaz.v3io.spark.sql.kv") \
.mode("append") \
.option("key", "reg_num") \
.option("counter", "kms, num_drives") \
.save("v3io://mycontainer/Cars/")
This example writes an item (row) to a "Cars" table in a "mycontainer" data container whose primary-key attribute is
The DataFrame contains a primary-key
Assume a matching item (reg_num=7866689
) already exists in the table and that its has a
Because the table item already has the
Both non-counter attributes in the DataFrame will be added to the table item with the respective DataFrame values, overwriting any existing values: the value of the
Example 4 — Conditional Update
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
var writeDF = Seq(("7843321", "Honda", 29321))
.toDF("reg_license", "model", "odometer")
writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
.option("key", "reg_license")
.mode("overwrite").save("v3io://mycontainer/cars/")
var readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv")
.load("v3io://mycontainer/cars/")
readDF.show()
writeDF = Seq(("7843321", "Honda", 31718))
.toDF("reg_license", "model", "odometer")
writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
.option("key", "reg_license")
.option("condition", "${odometer} > odometer")
.mode("append").save("v3io://mycontainer/cars/")
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv")
.load("v3io://mycontainer/cars/")
readDF.show()
writeDF = Seq(("7843321", "Ford", 40001))
.toDF("reg_license", "model", "odometer")
writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
.option("key", "reg_license")
.option("condition", "${model} == model")
.mode("append").save("v3io://mycontainer/cars/")
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv")
.load("v3io://mycontainer/cars/")
readDF.show()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
writeDF = spark.createDataFrame([("7843321", "Honda", 29321)],
["reg_license", "model", "odometer"])
writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
.option("key", "reg_license") \
.mode("overwrite").save("v3io://mycontainer/cars/")
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
.load("v3io://mycontainer/cars/")
readDF.show()
writeDF = spark.createDataFrame([("7843321", "Honda", 31718)],
["reg_license", "model", "odometer"])
writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
.option("key", "reg_license") \
.option("condition", "${odometer} > odometer") \
.mode("append").save("v3io://mycontainer/cars/")
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
.load("v3io://mycontainer/cars/")
readDF.show()
writeDF = spark.createDataFrame([("7843321", "Ford", 40001)],
["reg_license", "model", "odometer"])
writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
.option("key", "reg_license") \
.option("condition", "${model} == model") \
.mode("append").save("v3io://mycontainer/cars/")
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
.load("v3io://mycontainer/cars/")
readDF.show()
This example demonstrates how to conditionally update NoSQL table items by using the
The first write command writes an item (row) to a "cars" table in a "mycontainer" data container.
The item's
+-----------+-----+--------+
|reg_license|model|odometer|
+-----------+-----+--------+
| 7843321|Honda| 29321|
+-----------+-----+--------+
The second write command uses the option("condition", "${odometer} > odometer")
.
The true
and the write is executed, updating the value of the item's
-----------+-----+--------+
|reg_license|model|odometer|
+-----------+-----+--------+
| 7843321|Honda| 31718|
+-----------+-----+--------+
The third write command uses the option("condition", "${model} == model")
.
Because the value of false
and the write isn't executed (i.e., the table item isn't updated), as shown when reading the item from the table:
-----------+-----+--------+
|reg_license|model|odometer|
+-----------+-----+--------+
| 7843321|Honda| 31718|
+-----------+-----+--------+