The NoSQL Spark DataFrame

On This Page

Introduction

The platform includes the Iguazio Spark connector, which defines a custom Spark data source for reading and writing NoSQL data in the platform's NoSQL store using Spark DataFrames. A Spark DataFrame of this data-source format is referred to in the documentation as a NoSQL DataFrame. This data source supports data pruning and filtering (predicate pushdown), which allows Spark queries to operate on a smaller amount of data; only the data that is required by the active job is loaded. The data source also allows you to work with partitioned tables; perform "replace" mode and conditional item updates; define specific item attributes as counter attributes and easily increment or decrement their values; and perform optimized range scans.

Data Source

To use the Iguazio Spark connector to read or write NoSQL data in the platform, use the format method to set the DataFrame's data-source format to the platform's custom NoSQL data source — "io.iguaz.v3io.spark.sql.kv". See the following read and write examples:

    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
    val df = spark.read.format("io.iguaz.v3io.spark.sql.kv")
        .load("v3io://mycontainer/src_table")
    df.write.format("io.iguaz.v3io.spark.sql.kv")
        .option("key", "id").save("v3io://mycontainer/dest_table")
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
    df = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
        .load("v3io://mycontainer/src_table")
    df.write.format("io.iguaz.v3io.spark.sql.kv") \
        .option("key", "id").save("v3io://mycontainer/dest_table")

    Table Paths

    Specify the path to the NoSQL table that is associated with the DataFrame as a fully qualified v3io path of the following format — where <container name> is the name of the table's parent data container and <data path> is the relative path to the table within the specified container (see Data Paths in the Spark Datasets overview):

    v3io://<container name>/<table path>
    

    Examples

    The following example uses a Spark DataFrame to create a NoSQL table named "cars" in a mydata directory in the "projects" container, and then reads the contents of the table into a DataFrame:

      val nosql_source = "io.iguaz.v3io.spark.sql.kv"
      var table_path = "v3io://projects/mydata/cars/"
      
      val writeDF = Seq(
          ("7843321", "Honda", "Accord", "silver", 123000),
          ("2899941", "Ford", "Mustang", "white", 72531),
          ("6689123", "Kia", "Picanto", "red", 29320)
      )
      writeDF.toDF("reg_license", "vendor", "model", "color", "odometer")
          .write.format(nosql_source)
          .option("key", "reg_license")
          .mode("overwrite")
          .save(table_path)
      
      val readDF = spark.read.format(nosql_source).load(table_path)
      readDF.show()
      
      import sys
      from pyspark.sql import *
      from pyspark.sql.functions import *
      
      nosql_source = "io.iguaz.v3io.spark.sql.kv"
      table_path = "v3io://projects/mydata/cars/"
      
      writeDF = spark.createDataFrame([
          ("7843321", "Honda", "Accord", "silver", 123000),
          ("2899941", "Ford", "Mustang", "white", 72531),
          ("6689123", "Kia", "Picanto", "red", 29320),
      ], ["reg_license", "vendor", "model", "color", "odometer"])
      writeDF.write.format(nosql_source) \
          .option("key", "reg_license") \
          .mode("overwrite") \
          .save(table_path)
      
      readDF = spark.read.format(nosql_source).load(table_path)
      readDF.show()
      

      The following code shows several equivalent alternatives for changing the table path from the previous example to a "cars" table in the running-user directory of the "users" container; (note the Python code requires that you add import os). All variations except for the first one use environment variables instead of explicitly specifying the name of the running user ("iguazio" in the example):

        table_path = "v3io://users/iguazio/cars"
        table_path = "v3io://users/" + System.getenv("V3IO_USERNAME") + "/cars"
        table_path = "v3io://" + System.getenv("V3IO_HOME") + "/cars"
        table_path = System.getenv("V3IO_HOME_URL") + "/cars"
        
        table_path = "v3io://users/iguazio/cars"
        table_path = "v3io://users/" + os.getenv("V3IO_USERNAME") + "/cars"
        table_path = "v3io://" + os.getenv("V3IO_HOME") + "/cars"
        table_path = os.getenv("V3IO_HOME_URL") + "/cars"
        

        For additional examples, see the Examples section on this page.

        Save Modes

        The Iguazio Spark connector supports the standard Spark DataFrame save modes, which can be set using the Spark DataFrame mode method when writing data from a NoSQL DataFrame to a table. For more information and examples, see the Spark SQL, DataFrames and Datasets Guide and the NoSQL DataFrame write examples.

        Note
        In some situations — such as when using the columnUpdate or counter write options — the append save mode behaves differently when used with the Iguazio Spark connector, as outlined in the platform documentation.

        Options

        Use the Spark DataFrame option method to configure relevant options. See the Apache Spark Datasets documentation for the built-in options. In addition, the Iguazio Spark connector supports the following custom NoSQL DataFrame options:

        Read Options
        inferSchema

        Set this option to true (option("inferSchema", "true")) to instruct the platform to infer the schema of the NoSQL data that is being read. See Inferring the Table Schema.

        • Type: Boolean
        • Requirement: Optional
        Write Options
        key

        The name of the table's sharding-key attribute (for example, username). This option is used together with the optional sorting-key option to define the table's primary key, which uniquely identifies items within the table (see Spark DataFrames and Tables).

        For example, for a DataFrame item (row) with a username attribute (column) whose value is "johnd", calling option("key", "username") without setting the sorting-key option defines a simple username primary key and sets the item's primary-key value (name) to johnd.

        Note
        • The written DataFrame must contain a compatible attribute (column) whose name matches the value of the key option. Do not modify the value of this attribute after the item's ingestion, as this will result in a mismatch with the item's name and primary-key value (which remains unchanged).

        • The value of the sharding-key attribute cannot contain periods, because the leftmost period in an item's primary-key value is assumed to be a separator between sharding and sorting keys.

        • See the primary-key guidelines in the Best Practices for Defining Primary Keys and Distributing Data Workloads guide.

        • Type: String
        • Requirement: Required
        sorting-key

        The name of the table's sorting-key attribute (for example, login-date). This option can optionally be used together with the key option to define a compound primary key, which uniquely identifies items within the table (see Spark DataFrames and Tables).

        For example, for a DataFrame item (row) with a username attribute whose value is "johnd" and a login-date attribute whose value is "20180125", calling both option("key", "username") and option("sorting-key", "login-date") defines a compound username.login-date primary key and sets the item's primary-key value (name) to johnd.20180125. When using the even-distribution write option, the item's primary-key value will be johnd_<n>.20180125 (for example, johnd_2.20180125) — see Even Workload Distribution.

        Note
        • The written DataFrame must contain a compatible attribute (column) whose name matches the value of the sorting-key option. Do not modify the value of this attribute after the item's ingestion, as this will result in a mismatch with the item's name and primary-key value.

        • You must set this option if you wish to define a compound <sharding key>.<sorting key> table primary key. Note that support for range scans requires a compound primary key and that range scans for tables with a string sorting-key attribute are more efficient. For more information and best-practice guidelines, see Best Practices for Defining Primary Keys and Distributing Data Workloads.

        • Type: String
        • Requirement: Optional
        allow-overwrite-schema

        Set this option to true (option("allow-overwrite-schema", "true")) to instruct the platform to overwrite the current schema of the target table (if exists) with the schema that is automatically inferred from the contents of the DataFrame. By default, if the inferred schema differs from an existing schema for the same table, the existing schema isn't overwritten and the write fails — see Overwriting an Existing Table Schema.

        • Type: Boolean
        • Requirement: Optional
        columnUpdate

        Set this option to true (option("columnUpdate", "true")) together with the append save mode for a custom replace mode — append new items and overwrite existing items (similar to the update logic of the counter option). See the replace-mode write example.

        • Type: Boolean
        • Requirement: Optional
        condition

        A Boolean condition expression that defines a conditional logic for executing the write operation. See Condition Expression for syntax details and examples. As explained in the expression reference documentation, attributes in the target table item are referenced in the expression by using the attribute name. To reference a column (attribute) in the write DataFrame from within the expression, use the syntax ${<column name>}. For example, option("condition", "${version} > version)" will write to the table only if the table has a matching item (identified by its name — see the key option) whose current version attribute value is lower than the value of the version column (attribute) in the Spark DataFrame. For more information, see Conditional Updates and the conditional-update write example.

        • Type: String
        • Requirement: Optional
        counter

        A comma-separated list of one or more attribute (column) names that identify counter attributes. For example, option("counter", "odometer, loops") identifies odometer and loops as counter attributes. For more information, see Counter Attributes and the counter-attributes write example.

        Note
        • Counter attributes must have numeric values.
        • The counter option is supported only with the NoSQL DataFrame append save mode, which for counter attributes functions as a custom replace mode (similar to the update logic of the columnUpdate option).
        • The DataFrame should contain a value for each of the specified counter attributes. This value will be added to or subtracted from the attribute's current value, or used as the initial attribute value if the attribute doesn't already exist in the table.
        • Type: String
        • Requirement: Optional
        partition

        [Tech Preview] A comma-separated list of one or more attribute (column) names that identify partition attributes. The written items are saved to <table path>/<attribute>=<value>[/<attribute>=<value>/...] partition directories according to the values of the items' partition attributes. Note that the order of the partition attribute names in the option string determines the partitioning hierarchy. For example, option("partition", "year, month, day, hour") identifies year, month, day, and hour as partition attributes and saves items in year=<value>/month=<value>/day=<value>/hour=<value> partitions (such as mytable/year=2018/month=2/day=12/hour=21) within the root table directory. For more information and examples, see Partitioned Tables.

        • Type: String
        • Requirement: Optional
        range-scan-even-distribution

        Set this option to true (option("range-scan-even-distribution", "true")) to instruct the platform to distribute items with the same sharding-key attribute value among multiple data slices, to achieve a more even distribution of non-uniform data. This option is applicable only for tables with a compound <sharding key>.<sorting key> primary key, which can be created by using both the key and sorting-key write options. For more information, see Even Workload Distribution.

        • Type: Boolean
        • Requirement: Optional

        Defining the Table Schema

        Spark DataFrames handle structured data. Therefore, Spark needs to be aware of the schema of the data structure. When writing NoSQL data by using the platform's Spark DataFrame or Frames APIs, the schema of the data table is automatically identified and saved and then retrieved when reading the data with a Spark DataFrame, Frames, or Presto (unless you select to explicitly define the schema for the read operation). However, to use a Spark DataFrame, Frames, or Presto to read NoSQL data that was written to a table in another way, you first need to define the table schema. You can use either of the following alternative methods to define or update the schema of a NoSQL table as part of a NoSQL DataFrame read operation:

        • Use the custom inferSchema option to infer the schema (recommended).
        • Define the schema programmatically as part of the Spark DataFrame read operation. (You can also do this for data that was written using a Spark DataFrame in the platform, although it's not required.)

        For more information, see the NoSQL Table Schema Reference.

        Overwriting an Existing Table Schema

        By default, if the schema inferred from the DataFrame's contents during a write operation differs from the table's current schema — as defined in its schema file (if such a file exists) — the write fails. This is designed to protect against inadvertent schema changes. However, you can override this default behavior by using the custom allow-overwrite-schema write option, which forces an overwrite of the current table schema with the inferred schema.

        Table Schema-Overwrite Examples

        The following example creates a "mytable" table in a "mycontainer" data container with AttrA and AttrB attributes of type string and an AttrC attribute of type long, and then overwrites the table schema to change the type of AttrC to double:

          import org.apache.spark.sql.SparkSession
          
          var df = Seq(("a", "z", 123), ("b", "y", 456))
              .toDF("AttrA", "AttrB", "AttrC")
          df.write.format("io.iguaz.v3io.spark.sql.kv")
              .mode("overwrite")
              .option("key", "AttrA")
              .save("v3io://mycontainer/mytable/")
          
          df = Seq(("c", "x", 32.12), ("d", "v", 45.2))
              .toDF("AttrA", "AttrB", "AttrC")
          df.write.format("io.iguaz.v3io.spark.sql.kv")
              .mode("append")
              .option("key", "AttrA")
              .option("allow-overwrite-schema", "true")
              .save("v3io://mycontainer/mytable/")
          
          from pyspark.sql import SparkSession
          
          df = spark.createDataFrame([
              ("a", "z", 123),
              ("b", "y", 456)
          ], ["AttrA", "AttrB", "AttrC"])
          df.write.format("io.iguaz.v3io.spark.sql.kv") \
              .mode("overwrite") \
              .option("key", "AttrA") \
              .save("v3io://mycontainer/mytable/")
              
          df = spark.createDataFrame([
              ("c", "x", 32.12),
              ("d", "v", 45.2)
          ], ["AttrA", "AttrB", "AttrC"])
          df.write.format("io.iguaz.v3io.spark.sql.kv") \
              .mode("append") \
              .option("key", "AttrA") \
              .option("allow-overwrite-schema", "true") \
              .save("v3io://mycontainer/mytable/")
          

          Note

          If you remove or comment out the option("allow-overwrite-schema", "true") call in the second write command, the write will fail with the following schema-mismatch error:

          java.lang.RuntimeException: Note you are about the rewrite existing schema file.
                     old schema = Schema(List(Field(AttrA,string,false,None), Field(AttrB,string,true,None), Field(AttrC,long,false,None)),AttrA,None,0)
                     new schema = Schema(ArraySeq(Field(AttrA,string,false,None), Field(AttrB,string,true,None), Field(AttrC,double,false,None)),AttrA,None,0).
          

          Inferring the Table Schema

          You can use the custom NoSQL DataFrame inferSchema read option to automatically infer the schema of the read table from its contents.

          Infer-Schema Examples

          The following example uses a Spark DataFrame to read data from a NoSQL "employees" table in a "department_store" data container.

            import org.apache.spark.sql.SparkSession
            
            val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
            
            val myDF = spark.read.format("io.iguaz.v3io.spark.sql.kv")
                .option("inferSchema", "true")
                .load("v3io://department_store/employees")
            
            from pyspark.sql import SparkSession
            
            spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
            
            myDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
                .option("inferSchema", "true") \
                .load("v3io://department_store/employees")
            

            To generate a schema file from the inferred schema, you can write back the content of the read DataFrame to the same table using the append save mode; the write operation automatically creates a schema file if it doesn't exist. For efficiency, use limit(0) to write only the schema file:

              myDF.limit(0).write.format("io.iguaz.v3io.spark.sql.kv")
                  .mode("append")
                  .option("key", "employee_id")
                  .save("v3io://department_store/employees")
              
              myDF.limit(0).write.format("io.iguaz.v3io.spark.sql.kv") \
                  .mode("append") \
                  .option("key", "employee_id") \
                  .save("v3io://department_store/employees")
              

              Defining the Table Schema Programmatically

              You can define a NoSQL DataFrame's table schema programmatically by using the Spark DataFrame schema method as part of a read operation: in your code, you can define a schema variable of the relevant list type, and populate it with structures that provide the required information about the table's attributes (columns). Then, pass the variable as a parameter to the DataFrame schema read method — for example, for a schema variable, you can call schema(schema). See The Item-Attributes Schema Object ('fields') reference and the following examples.

              Note
              Programmatically created table schemas don't support range-scan or even-distribution table queries.

              Programmatic Table-Schema Definition Examples

              The following example uses a Spark DataFrame to read data from a NoSQL "employees" table in a "department_store" data container. The table has five attributes (columns), which are depicted using the schema variable:

              • "id" — a numeric employee ID, which serves as the table's primary key and isn't nullable.
              • "firstname" — the employee's first name, depicted as a string.
              • "lastname" — the employee's last name, depicted as a string.
              • "department" — the department to which the employee belongs, depicted as a string .
              • "managerid" — the numeric ID of the employee's manager.
                import org.apache.spark.sql.SparkSession
                import org.apache.spark.sql.types._
                
                val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
                
                val schema = StructType(List(
                    StructField("id", LongType, nullable = false),
                    StructField("firstname", StringType, nullable = true),
                    StructField("lastname", StringType, nullable = true),
                    StructField("department", StringType, nullable = true),
                    StructField("managerid", LongType, nullable = true)))
                val myDF = spark.read.schema(schema)
                    .format("io.iguaz.v3io.spark.sql.kv")
                    .load("v3io://department_store/employees")
                
                import sys
                from pyspark.sql import SparkSession
                from pyspark.sql import *
                from pyspark.sql.types import *
                from pyspark.sql.functions import *
                
                spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
                
                schema = StructType([
                    StructField("id", LongType(), False),
                    StructField("firstname", StringType(), True),
                    StructField("lastname", StringType(), True),
                    StructField("department", StringType(), True),
                    StructField("managerid", LongType(), True)])
                myDF = spark.read.schema(schema) \
                    .format("io.iguaz.v3io.spark.sql.kv") \
                    .load("v3io://department_store/employees")
                

                Conditional Updates

                You can use the custom condition write option of the NoSQL Spark DataFrame to perform conditional item updates, whereby the write operation will be executed only if the specified condition expression evaluates to true.

                The condition expression is evaluated against the table item to be updated, if it exists. If the condition evaluates to true, the write operation is executed and the item is updated or created; otherwise, the operation completes successfully without an update.

                Note
                • If the expression references a non-existent item attribute, the condition processing stops and the operation completes successfully without updating or creating the item.
                • If the item doesn't exist and the condition expression doesn't reference any attributes (for example, a "1==1" or "2==3" expression, which could potentially be auto generated in some programming scenarios), the operation completes successfully and the item is updated or created only if the condition evaluates to true.

                See the NoSQL DataFrame conditional-update write example.

                Counter Attributes

                The Iguazio Spark connector enhances the standard Spark DataFrame by allowing you to define numeric attributes (columns) in a table as counter attributes and easily increment or decrement the attributes' values. This is done by using the custom counter write option of the NoSQL DataFrame to name one or more attributes as counter attributes.

                The counter option is supported only with the NoSQL DataFrame append save mode. However, the significance of this mode when using the counter option is different than the standard Spark DataFrame behavior (for both counter and non-counter attributes):

                • If a DataFrame attribute isn't already found in the table, the attribute is added to the table and initialized to the value set for it in the DataFrame. If the table or item don't exist, they're created and updated with the DataFrame's contents. This is the standard append save-mode behavior ant it's the same for both counter and non-counter attributes.
                • If a DataFrame counter attribute is already found in the table, its value is incremented or decremented according to the value that was set for this attribute in the DataFrame — i.e., the attribute value indicates the increment or decrement step (a negative value = decrement).
                • If a DataFrame non-counter attribute is already found in the table, its value is overwritten with the value that was set for it in the DataFrame but other attributes in the table remain unaffected (i.e., replace mode, similar to an append write with the columnUpdate option.)

                See the NoSQL DataFrame counter-attributes write example.

                Partitioned Tables [Tech Preview]

                Table partitioning is a common technique for optimizing physical data layout and related queries. In a partitioned table, some item attributes (columns) are used to create partition directories within the root table directory using the format <table path>/<attribute>=<value>[/<attribute>=<value>/...], and each item is then saved to the respective partition directory based on its attribute values. For example, for a "mytable" table with year and month attribute partitions, an item with attributes year = 2018 and month = 1 will be saved to a mytable/year=2018/month=1/ directory. This allows for more efficient data queries that search for the data only in the relevant partition directories instead of scanning the entire table. This technique is used, for example, by Hive, and is supported for all the built-in Spark Dataset file-based data sources (such as Parquet, CSV, and JSON) via the partitionBy write method. See also the Partitioned Tables documentation on the Working with NoSQL Data page, including best practices.

                The Iguazio Spark connector supports table partitioning for the NoSQL DataFrame [Tech Preview]:

                • Creating a partitioned table — the custom NoSQL DataFrame partition option allows you to select specific item attributes (columns) in a write DataFrame to be used as partitions. When using this option, the platform creates the necessary partition directory path for each written item. (Note that after you define partitions for a table, you need to specify the same partitions whenever your write to this table unless you decide to overwrite it.)
                • Querying a partitioned table — a partitioned table is queried like any other table, with the table path set to the root table directory and not to a specific partition directory. Version 3.6.1 of the platform doesn't support using using wild cards in the table path, such as "mytable/year=*/month=5" to search the month=5 directories in all mytable/year="value" directories. However, you can easily restrict the query to specific partition directories by using the Spark DataFrame filter method with a filter that references one of the partition attributes. In such cases, the platform searches the root table directory that is specified in the read command for nested directories of the format <attribute>=<value>. If it finds such directories, it searches only the partition directories that match the query. For example, for a table partitioned by year and month attributes, a month == 12 filter will return only the items from the month=12 partition directories in all year=* directories.

                Table-Partitioning Examples

                  import org.apache.spark.sql.SparkSession
                  
                  val spark = SparkSession.builder.appName("Table-Partitioning Example").getOrCreate()
                  val table_path = "v3io://mycontainer/weather/"
                  
                  val df = Seq(
                      (2016,  3, 25,  6, 16, 0.00, 55),
                      (2016,  3, 25, 17, 19, 0.10, 62),
                      (2016,  7, 24,  7, 20, 0.00, 52),
                      (2016, 12, 24,  9, 10, 0.05, 47),
                      (2016, 12, 24, 19,  8, 0.20, 47),
                      (2017,  5,  7, 14, 21, 0.00, 70),
                      (2017, 11,  1, 10, 16, 0.00, 34),
                      (2017, 11,  1, 22, 13, 0.01, 41),
                      (2017, 12, 12, 16, 12, 0.00, 47),
                      (2017, 12, 24, 17, 11, 1.00, 50),
                      (2018,  1, 18,  5,  8, 0.00, 37),
                      (2018,  1, 18, 17, 10, 2.00, 45),
                      (2018,  5, 20, 15, 24, 0.00, 62),
                      (2018,  5, 20, 21, 20, 0.00, 59),
                      (2018, 11,  1, 11, 11, 0.12, 65)
                  ).toDF("year", "month", "day", "hour", "degrees_cel", "rain_ml", "humidity_per")
                  val df_with_key = df.withColumn("time", concat($"year", $"month", $"day", $"hour"))
                  df_with_key.write.format("io.iguaz.v3io.spark.sql.kv")
                      .mode("overwrite")
                      .option("key", "time")
                      .option("partition", "year, month, day")
                      .save(table_path)
                  
                  var readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                  readDF.show()
                  
                  readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                      .filter($"month" < 7)
                  readDF.show()
                  
                  readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                      .filter($"month" === 12 && $"day" === 24)
                  readDF.show()
                  
                  readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                      .filter($"month" > 6 && $"hour" >= 8 && $"hour" <= 20)
                  readDF.show()
                  
                  import sys
                  from pyspark.sql import SparkSession
                  from pyspark.sql import *
                  from pyspark.sql.functions import *
                  
                  spark = SparkSession.builder.appName("Table-Partitioning Example").getOrCreate()
                  table_path = "v3io://mycontainer/weather/"
                  
                  df = spark.createDataFrame([
                      (2016,  3, 25,  6, 16, 0.00, 55),
                      (2016,  3, 25, 17, 19, 0.10, 62),
                      (2016,  7, 24,  7, 20, 0.00, 52),
                      (2016, 12, 24,  9, 10, 0.05, 47),
                      (2016, 12, 24, 19,  8, 0.20, 47),
                      (2017,  5,  7, 14, 21, 0.00, 70),
                      (2017, 11,  1, 10, 16, 0.00, 34),
                      (2017, 11,  1, 22, 13, 0.01, 41),
                      (2017, 12, 12, 16, 12, 0.00, 47),
                      (2017, 12, 24, 17, 11, 1.00, 50),
                      (2018,  1, 18,  5,  8, 0.00, 37),
                      (2018,  1, 18, 17, 10, 2.00, 45),
                      (2018,  5, 20, 15, 24, 0.00, 62),
                      (2018,  5, 20, 21, 20, 0.00, 59),
                      (2018, 11,  1, 11, 11, 0.12, 65)
                  ], ["year", "month", "day", "hour", "degrees_cel", "rain_ml", "humidity_per"])
                  df_with_key = df.withColumn(
                      "time", concat(df["year"], df["month"], df["day"], df["hour"]))
                  df_with_key.write.format("io.iguaz.v3io.spark.sql.kv") \
                      .mode("overwrite") \
                      .option("key", "time") \
                      .option("partition", "year, month, day, hour") \
                      .save(table_path)
                  
                  readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                  readDF.show()
                  
                  readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
                      .filter("month < 7")
                  readDF.show()
                  
                  readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
                      .filter("month == 12 AND day == 24")
                  readDF.show()
                  
                  readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
                      .filter("month > 6 AND hour >= 8 AND hour <= 20")
                  readDF.show()
                  
                  

                  This examples creates a partitioned "weather" table in a "mycontainer" data container. The option("partition", "year, month, day") write option partitions the table by the year, month, and day item attributes. As demonstrated in the following image, if you browse the container in the dashboard after running the example, you'll see that the weather directory has year=<value>/month=<value>/day=<value> partition directories that match the written items. If you select any of the nested day partition directories, you can see the written items and their attributes. For example, the first item (with attribute values 2016, 3, 25, 6, 16, 0.00, 55) is saved to a 20163256 file in a weather/year=2016/month=3/day=25 partition directory.

                  Dashboard partition-table browse

                  Following is the output of the example's show commands for each read. The filtered results are gathered by scanning only the partition directories that match the filter criteria.

                  Full table read

                  +----+-----+---+----+-----------+-------+------------+----------+
                  |year|month|day|hour|degrees_cel|rain_ml|humidity_per|      time|
                  +----+-----+---+----+-----------+-------+------------+----------+
                  |2016|   12| 24|   9|         10|   0.05|          47| 201612249|
                  |2016|   12| 24|  19|          8|    0.2|          47|2016122419|
                  |2016|    3| 25|   6|         16|    0.0|          55|  20163256|
                  |2016|    3| 25|  17|         19|    0.1|          62| 201632517|
                  |2016|    7| 24|   7|         20|    0.0|          52|  20167247|
                  |2017|   11|  1|  22|         13|   0.01|          41| 201711122|
                  |2017|   11|  1|  10|         16|    0.0|          34| 201711110|
                  |2017|   12| 12|  16|         12|    0.0|          47|2017121216|
                  |2017|   12| 24|  17|         11|    1.0|          50|2017122417|
                  |2017|    5|  7|  14|         21|    0.0|          70|  20175714|
                  |2018|    1| 18|   5|          8|    0.0|          37|  20181185|
                  |2018|    1| 18|  17|         10|    2.0|          45| 201811817|
                  |2018|   11|  1|  11|         11|   0.12|          65| 201811111|
                  |2018|    5| 20|  15|         24|    0.0|          62| 201852015|
                  |2018|    5| 20|  21|         20|    0.0|          59| 201852021|
                  +----+-----+---+----+-----------+-------+------------+----------+
                  

                  month < 7 filter — retrieve all data for the first six months of each year:

                  +----+-----+---+----+-----------+-------+------------+---------+
                  |year|month|day|hour|degrees_cel|rain_ml|humidity_per|     time|
                  +----+-----+---+----+-----------+-------+------------+---------+
                  |2016|    3| 25|   6|         16|    0.0|          55| 20163256|
                  |2016|    3| 25|  17|         19|    0.1|          62|201632517|
                  |2017|    5|  7|  14|         21|    0.0|          70| 20175714|
                  |2018|    1| 18|   5|          8|    0.0|          37| 20181185|
                  |2018|    1| 18|  17|         10|    2.0|          45|201811817|
                  |2018|    5| 20|  15|         24|    0.0|          62|201852015|
                  |2018|    5| 20|  21|         20|    0.0|          59|201852021|
                  +----+-----+---+----+-----------+-------+------------+---------+
                  

                  month == 12 AND day == 24 filter — retrieve all hours on Dec 24 each year:

                  
                  +----+-----+---+----+-----------+-------+------------+----------+
                  |year|month|day|hour|degrees_cel|rain_ml|humidity_per|      time|
                  +----+-----+---+----+-----------+-------+------------+----------+
                  |2016|   12| 24|   9|         10|   0.05|          47| 201612249|
                  |2016|   12| 24|  19|          8|    0.2|          47|2016122419|
                  |2017|   12| 24|  17|         11|    1.0|          50|2017122417|
                  +----+-----+---+----+-----------+-------+------------+----------+
                  

                  month > 6 AND hour >= 8 AND hour <= 20 filter — retrieve 08:00–20:00 data for every day in the last six months of each year:

                  +----+-----+---+----+-----------+-------+------------+----------+
                  |year|month|day|hour|degrees_cel|rain_ml|humidity_per|      time|
                  +----+-----+---+----+-----------+-------+------------+----------+
                  |2016|   12| 24|   9|         10|   0.05|          47| 201612249|
                  |2016|   12| 24|  19|          8|    0.2|          47|2016122419|
                  |2017|   11|  1|  10|         16|    0.0|          34| 201711110|
                  |2017|   12| 12|  16|         12|    0.0|          47|2017121216|
                  |2017|   12| 24|  17|         11|    1.0|          50|2017122417|
                  |2018|   11|  1|  11|         11|   0.12|          65| 201811111|
                  +----+-----+---+----+-----------+-------+------------+----------+
                  

                  Range Scans

                  A NoSQL Spark DataFrame table query that uses supported sharding-key and optional sorting-key filters to retrieve items with the same sharding-key value, is processed by performing a range scan, which is more efficient than the standard full table scan. See NoSQL Range Scans. Note that the support for Spark DataFrame range scans requires a table schema that was inferred with a NoSQL Spark DataFrame, Frames, or the Iguazio Trino connector.

                  Range-Scan Operators
                  The NoSQL Spark DataFrame uses range scan for compound primary-key table queries that apply the equal-to (=) or IN (IN/isin) operator to the sharding-key attribute, and optionally also apply a comparison operator (=/>/>=/</<=) to the sorting-key attribute.

                  Range-Scan Examples

                  Example 1 — Basic Range Scan

                    import org.apache.spark.sql.SparkSession
                    
                    val spark = SparkSession.builder.appName("Range-Scan Example").getOrCreate()
                    val table_path = "v3io://mycontainer/mytaxis/rides/"
                    
                    var writeDF = Seq(
                        (24, "20180601",  8, 332.0, 18),
                        (24, "20180602",  5, 260.0, 11),
                        (24, "20180701",  7, 352.1, 21),
                        (1,  "20180601", 25, 125.0, 40),
                        (1,  "20180602", 20, 106.0, 46),
                        (1,  "20180701", 28, 106.4, 42),
                        (16, "20180601",  1, 224.2,  8),
                        (16, "20180602", 10, 244.0, 45),
                        (16, "20180701",  6, 193.2, 24)
                    ).toDF("driver_id", "date", "num_rides", "total_km", "total_passengers")
                    writeDF = writeDF
                        .withColumn("avg_ride_km", $"total_km" / $"num_rides")
                        .withColumn("avg_ride_passengers", $"total_passengers" / $"num_rides")
                    writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
                        .mode("overwrite")
                        .option("key", "driver_id")
                        .option("sorting-key", "date")
                        .save(table_path)
                    
                    // Range-scan queries
                    var readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                        .filter($"driver_id" === 1)
                    readDF.show()
                    
                    readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                        .filter($"driver_id" === 24 && $"date" >= "20180101" && $"date" < "20180701")
                    readDF.show()
                    
                    readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                        .filter($"driver_id".isin(1, 16, 24) && $"avg_ride_passengers" >= 3)
                    readDF.show()
                    }
                    
                    from pyspark.sql import SparkSession
                    
                    spark = SparkSession.builder.appName("Range-Scan Example").getOrCreate()
                    table_path = "v3io://mycontainer/mytaxis/rides/"
                    
                    writeDF = spark.createDataFrame([
                        (24, "20180601",  8, 332.0, 18),
                        (24, "20180602",  5, 260.0, 11),
                        (24, "20180701",  7, 352.1, 21),
                        (1,  "20180601", 25, 125.0, 40),
                        (1,  "20180602", 20, 106.0, 46),
                        (1,  "20180701", 28, 106.4, 42),
                        (16, "20180601",  1, 224.2,  8),
                        (16, "20180602", 10, 244.0, 45),
                        (16, "20180701",  6, 193.2, 24)
                    ], ["driver_id", "date", "num_rides", "total_km", "total_passengers"])
                    writeDF = writeDF.withColumn(
                        "avg_ride_km", writeDF["total_km"] / writeDF["num_rides"]) \
                                     .withColumn(
                        "avg_ride_passengers", writeDF["total_passengers"] / writeDF["num_rides"])
                    writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
                        .mode("overwrite") \
                        .option("key", "driver_id") \
                        .option("sorting-key", "date") \
                        .save(table_path)
                    
                    # Range-scan queries
                    readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
                        .filter("driver_id == 1")
                    readDF.show()
                    
                    readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
                        .filter("driver_id == 24 AND date >= '20180101' AND date < '20180701'")
                    readDF.show()
                    
                    readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
                        .filter("driver_id IN (1, 16, 24) AND avg_ride_passengers >= 3")
                    readDF.show()
                    
                    

                    This example creates a "rides" table in a mytaxis directory in a "mycontainer" data container. The option("key", "driver_id") and option("sorting-key", "date") write options define the driver_id attribute as the table's sharding key and the date attribute as the table's sorting key; all items in the DataFrame define these attributes. If you browse the container in the dashboard after running the example, you'll see that the names of the files in the mytaxis/rides directory are of the format <sharding-key value>.<sorting-key value>, as demonstrated in the following image (for example, 16.20180602):

                    Dashboard basic range-scan table browse

                    All of the read commands will result in faster range scans, compared to standard full-table scans, because they include range-scan sharding-key and optionally also sorting-key filters. Following is the output of the example's show commands for each read:

                    "driver_id == 1" filter — retrieve all items with a driver_id sharding-key attribute value of 1 (regardless of the sorting-key value):

                    +---------+--------+---------+--------+----------------+------------------+-------------------+
                    |driver_id|    date|num_rides|total_km|total_passengers|       avg_ride_km|avg_ride_passengers|
                    +---------+--------+---------+--------+----------------+------------------+-------------------+
                    |        1|20180601|       25|   125.0|              40|               5.0|                1.6|
                    |        1|20180602|       20|   106.0|              46|               5.3|                2.3|
                    |        1|20180701|       28|   106.4|              42|3.8000000000000003|                1.5|
                    +---------+--------+---------+--------+----------------+------------------+-------------------+
                    

                    "driver_id == 24 AND date >= '20180101' AND date < '20180701'" filter — retrieve all items with a driver_id sharding-key attribute value of 24 and a date sorting-key attribute value within the first six months of 2018:

                    +---------+--------+---------+--------+----------------+-----------+-------------------+
                    |driver_id|    date|num_rides|total_km|total_passengers|avg_ride_km|avg_ride_passengers|
                    +---------+--------+---------+--------+----------------+-----------+-------------------+
                    |       24|20180601|        8|   332.0|              18|       41.5|               2.25|
                    |       24|20180602|        5|   260.0|              11|       52.0|                2.2|
                    +---------+--------+---------+--------+----------------+-----------+-------------------+
                    

                    "driver_id IN (1, 16, 24) AND avg_ride_passengers >= 3" filter — retrieve all items with a driver_id sharding-key attribute value of 1, 16, or 24 (regardless of the sorting-key value) and an avg_ride_passengers attribute value that is greater or equal to 3:

                    +---------+--------+---------+--------+----------------+------------------+-------------------+
                    |driver_id|    date|num_rides|total_km|total_passengers|       avg_ride_km|avg_ride_passengers|
                    +---------+--------+---------+--------+----------------+------------------+-------------------+
                    |       16|20180601|        1|   224.2|               8|             224.2|                8.0|
                    |       16|20180602|       10|   244.0|              45|              24.4|                4.5|
                    |       16|20180701|        6|   193.2|              24|32.199999999999996|                4.0|
                    |       24|20180701|        7|   352.1|              21|50.300000000000004|                3.0|
                    +---------+--------+---------+--------+----------------+------------------+-------------------+
                    

                    Example 2 — Even-Distribution Range Scan

                      import org.apache.spark.sql.SparkSession
                      
                      val spark = SparkSession.builder.appName("Even-Distribution Range-Scan Example").getOrCreate()
                      val table_path = "v3io://mycontainer/mytaxis/even_distribution_range_scan/rides/"
                      
                      var writeDF = Seq(
                          (24, "20180601",  8, 332.0, 18),
                          (24, "20180602",  5, 260.0, 11),
                          (24, "20180701",  7, 352.1, 21),
                          (1,  "20180101",  4,  90.0, 14),
                          (1,  "20180102", 14, 141.4, 28),
                          (1,  "20180202",  8, 220.8, 22),
                          (1,  "20180601", 25, 125.0, 40),
                          (1,  "20180602", 20, 106.0, 46),
                          (1,  "20180701", 28, 106.4, 42),
                          (16, "20180601",  1, 224.2,  8),
                          (16, "20180602", 10, 244.0, 45),
                          (16, "20180701",  6, 193.2, 24)
                      ).toDF("driver_id", "date", "num_rides", "total_km", "total_passengers")
                          .withColumn("avg_ride_km", $"total_km" / $"num_rides")
                          .withColumn("avg_ride_passengers", $"total_passengers" / $"num_rides")
                      writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
                          .mode("overwrite")
                          .option("key", "driver_id")
                          .option("sorting-key", "date")
                          .option("range-scan.even-distribution", "true")
                          .save(table_path)
                      
                      // Range-scan queries
                      var readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                          .filter($"driver_id" === 1)
                      readDF.show()
                      
                      readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                          .filter($"driver_id" === 24 && $"date" >= "20180101" && $"date" < "20180701")
                      readDF.show()
                      
                      readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
                          .filter($"driver_id".isin(1, 16, 24) && $"avg_ride_passengers" >= 3)
                      readDF.show()
                      
                      from pyspark.sql import SparkSession
                      
                      spark = SparkSession.builder.appName("Even-Distribution Range-Scan Example").getOrCreate()
                      table_path = "v3io://mycontainer/mytaxis/even_distribution_range_scan/rides/"
                      
                      writeDF = spark.createDataFrame([
                          (24, "20180601",  8, 332.0, 18),
                          (24, "20180602",  5, 260.0, 11),
                          (24, "20180701",  7, 352.1, 21),
                          (1,  "20180101",  4,  90.0, 14),
                          (1,  "20180102", 14, 141.4, 28),
                          (1,  "20180202",  8, 220.8, 22),
                          (1,  "20180601", 25, 125.0, 40),
                          (1,  "20180602", 20, 106.0, 46),
                          (1,  "20180701", 28, 106.4, 42),
                          (16, "20180601",  1, 224.2,  8),
                          (16, "20180602", 10, 244.0, 45),
                          (16, "20180701",  6, 193.2, 24)
                      ], ["driver_id", "date", "num_rides", "total_km", "total_passengers"])
                      writeDF = writeDF.withColumn(
                          "avg_ride_km", writeDF["total_km"] / writeDF["num_rides"]) \
                                       .withColumn(
                          "avg_ride_passengers", writeDF["total_passengers"] / writeDF["num_rides"])
                      writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
                          .mode("overwrite") \
                          .option("key", "driver_id") \
                          .option("sorting-key", "date") \
                          .option("range-scan.even-distribution", "true") \
                          .save(table_path)
                      
                      # Range-scan queries
                      readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
                          .filter("driver_id == 1")
                      readDF.show()
                      
                      readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
                          .filter("driver_id == 24 AND date >= '20180101' AND date < '20180701'")
                      readDF.show()
                      
                      readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
                          .filter("driver_id IN (1, 16, 24) AND avg_ride_passengers >= 3")
                      readDF.show()
                      
                      

                      This example creates a "rides" table in a mytaxis/even_distribution_range_scan directory in a "mycontainer" data container. The content of the table is similar to that of the "rides" table in the basic range-scan example but with additional items with a sharding-key value of 1. For demonstration purposes, the assumption is that the data in the table is expected to become less uniform as more items are added, with most items having the same sharding-key value (for example, 1). Therefore, the range-scan-even-distribution write option is added to the writeDF write command — option("range-scan.even-distribution", "true") — to recalculate the items' sharding-key values and distribute the items more evenly across multiple data slices. See Even Workload Distribution.

                      The read queries remain the same as in the basic range-scan example. However, if you browse the container in the dashboard after running the example, you'll see that the names of the files in the mytaxis/even_distribution_range_scan/rides directory are of the format <original sharding-key value>_<n>.<sorting-key value>, as demonstrated in the following image, and not <original sharding-key value>.<sorting-key value> as in the basic example (for example, 16_36.20180602 instead of 16.20180602):

                      Dashboard range-scan even-distribution table browse

                      Following is the output of the example's show commands for each read. If you compare the output to that of the basic range-scan example, you'll see that it's similar except that the even-distribution range-scan query results have some additional results for sharding-key 1 items that aren't found in the basic-example table and the sort order is different.

                      "driver_id == 1" filter — retrieve all items with a driver_id sharding-key attribute value of 1 (regardless of the sorting-key value):

                      +---------+--------+---------+--------+----------------+------------------+-------------------+
                      |driver_id|    date|num_rides|total_km|total_passengers|       avg_ride_km|avg_ride_passengers|
                      +---------+--------+---------+--------+----------------+------------------+-------------------+
                      |        1|20180202|        8|   220.8|              22|              27.6|               2.75|
                      |        1|20180102|       14|   141.4|              28|              10.1|                2.0|
                      |        1|20180101|        4|    90.0|              14|              22.5|                3.5|
                      |        1|20180602|       20|   106.0|              46|               5.3|                2.3|
                      |        1|20180701|       28|   106.4|              42|3.8000000000000003|                1.5|
                      |        1|20180601|       25|   125.0|              40|               5.0|                1.6|
                      +---------+--------+---------+--------+----------------+------------------+-------------------+
                      

                      "driver_id == 24 AND date >= '20180101' AND date < '20180701'" filter — retrieve all items with a driver_id sharding-key attribute value of 24 and a date sorting-key attribute value within the first six months of 2018:

                      +---------+--------+---------+--------+----------------+-----------+-------------------+
                      |driver_id|    date|num_rides|total_km|total_passengers|avg_ride_km|avg_ride_passengers|
                      +---------+--------+---------+--------+----------------+-----------+-------------------+
                      |       24|20180601|        8|   332.0|              18|       41.5|               2.25|
                      |       24|20180602|        5|   260.0|              11|       52.0|                2.2|
                      +---------+--------+---------+--------+----------------+-----------+-------------------+
                      

                      "driver_id IN (1, 16, 24) AND avg_ride_passengers >= 3" filter — retrieve all items with a driver_id sharding-key attribute value of 1, 16, or 24 (regardless of the sorting-key value) and an avg_ride_passengers attribute value that is greater or equal to 3:

                      +---------+--------+---------+--------+----------------+------------------+-------------------+
                      |driver_id|    date|num_rides|total_km|total_passengers|       avg_ride_km|avg_ride_passengers|
                      +---------+--------+---------+--------+----------------+------------------+-------------------+
                      |        1|20180101|        4|    90.0|              14|              22.5|                3.5|
                      |       16|20180602|       10|   244.0|              45|              24.4|                4.5|
                      |       16|20180701|        6|   193.2|              24|32.199999999999996|                4.0|
                      |       16|20180601|        1|   224.2|               8|             224.2|                8.0|
                      |       24|20180701|        7|   352.1|              21|50.300000000000004|                3.0|
                      +---------+--------+---------+--------+----------------+------------------+-------------------+
                      

                      Even Workload Distribution

                      The NoSQL Spark DataFrame offers custom support for even distribution of ingested items across the available data slices for the parent container. The objective is to improve the system's performance when working with a non-uniform data set — see the Recalculating Sharding-Key Values for Even Workload Distribution best-practice guidelines.

                      When writing (ingesting) data to a table with a compound <sharding key>.<sorting key> primary key (see the key and sorting-key write options), you can optionally also set the custom range-scan-even-distribution option. This option instructs the platform to recalculate the primary-key value for each of the ingested items, by splitting the item's original sharding-key value into multiple values, according to the number configured in the platform's v3io.kv.range-scan.hashing-bucket-num configuration property (default = 64). As a result, items with the same original sharding-key value (which remains stored in the items' sharding-key attribute) are distributed across multiple data slices, based on the value of the items' sorting key, instead of being stored on the same slice. This is done implicitly, although if you browse the table directory you can see the new primary-key values (item names) — of the format <original sharding-key value>_<n>.<sorting-key value> (for example, johnd_1.20180602) — as demonstrated in the even-distribution range-scan example.

                      When submitting a NoSQL Spark DataFrame or Presto sharding-key query for a table that was created with the even-distribution Spark DataFrame option or by using similar calculations, use the original sharding-key value. Behind the scenes, the platform searches for all the primary-key values that were derived from the original sharding-key value. Note that this custom support requires a table schema that was inferred with a NoSQL Spark DataFrame or with the Iguazio Trino connector.

                      For more information on the behind-the-scenes implementation to support this feature, see the Using a NoSQL Spark DataFrame for Even Workload Distribution best-practices documentation.

                      Examples

                      Following are some examples of using the NoSQL Spark DataFrame. For schema-definition, table-partitioning, and range-scan examples, see the Defining the Table Schema, Partitioned Tables, and Range Scans sections, respectively; the range-scan examples also demonstrate the support for even workload distribution.

                      Read Examples

                      Note
                        import org.apache.spark.sql.SparkSession
                        
                        val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
                        
                        val df = spark.read.format("io.iguaz.v3io.spark.sql.kv")
                            .option("inferSchema", "true")
                            .load("v3io://mycontainer/WebData")
                        
                        df.select($"url", $"pages" + $"ads" as "views")
                            .where($"browser" != lit("Chrome"))
                        
                        from pyspark.sql import SparkSession
                        
                        spark = SparkSession.builder.getOrCreate()
                        
                        val df = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
                            .option("inferSchema", "true") \
                            .load("v3io://mycontainer/WebData")
                        
                        df.select(df["url"], df["pages"] + df["ads"] as "views") \
                            .where(df["browser"] != lit("Chrome"))
                        

                        This example reads page-click data from a "WebData" table in a "mycontainer" data container. The inferSchema option is used to infer the schema of the table (in case the table doesn't have a schema file).

                        Note
                        By using Spark's predicate pushdown, the select and where operations are handed over to the platform's NoSQL store, and pruned/filtered data is returned to Spark.

                        Write Examples

                        Example 1 — Simple Append-Mode Write

                          val df = Seq(("ians", "Ian Smith", 25), ("dorisb", "Doris Brown", 31))
                              .toDF("username", "name", "age")
                          df.write.format("io.iguaz.v3io.spark.sql.kv")
                              .mode("append")
                              .option("key", "username")
                              .save("v3io://mycontainer/IT/Users/")
                          
                          df = spark.createDataFrame([
                              ("ians", "Ian Smith", 25),
                              ("dorisb", "Doris Brown", 31)
                          ], ["username", "name", "age"])
                          df.write.format("io.iguaz.v3io.spark.sql.kv") \
                              .mode("append") \
                              .option("key", "username") \
                              .save("v3io://mycontainer/IT/Users/")
                          

                          This example writes two items (rows) to an "IT/Users" table in a "mycontainer" data container whose primary-key attribute is username. The save mode is set to append. Therefore, DataFrame items that don't already exist in the table will be added in full, and existing items (based on the specified primary-key attribute values) will be updated only to add missing attributes, but values of existing item attributes won't be modified.

                          Example 2 — Replace-Mode Write

                            val df = Seq(("ians", "Ian Smith", 26), ("janed", "Jane Doe", 42))
                                .toDF("username", "name", "age")
                            df.write.format("io.iguaz.v3io.spark.sql.kv")
                                .mode("append")
                                .option("columnUpdate", "true")
                                .option("key", "username")
                                .save("v3io://mycontainer/IT/Users/")
                            
                            df = spark.createDataFrame([
                                ("ians", "Ian Smith", 26),
                                ("janed", "Jane Doe", 42)
                            ], ["username", "name", "age"])
                            df.write.format("io.iguaz.v3io.spark.sql.kv") \
                                .mode("append") \
                                .option("columnUpdate", "true") \
                                .option("key", "username") \
                                .save("v3io://mycontainer/IT/Users/")
                            

                            This example writes two items (rows) to the same table as in the previous simple append-mode write example — "IT/Users" table in a "mycontainer" data container whose primary-key attribute is username. The save mode is set to append and the columnUpdate option is set to "true". Therefore, assuming the code is run after the simple append-mode write example, the new "janed" item (which doesn't exist in the table) will be appended to the table; the existing "ians" item, which was included in the previous write example, will be overwritten with the item from the new write DataFrame (and the value of the age attribute will change from 25 to 26); and the existing "dorisb" item, which was written only in the previous example, will remain unchanged.

                            Example 3 — Counter Attributes

                              val df = Seq((532, 5, "IP-953481-35", "Jenny Penny", 7866689))
                                  .toDF("kms", "num_drives", "policy", "primary_driver", "reg_num")
                              df.write.format("io.iguaz.v3io.spark.sql.kv")
                                  .mode("append")
                                  .option("key", "reg_num")
                                  .option("counter", "kms, num_drives")
                                  .save("v3io://mycontainer/Cars/")
                              
                              df = spark.createDataFrame([
                                  (532, 5, "IP-953481-35", "Jenny Penny", 7866689)
                              ], ["kms", "num_drives", "policy", "primary_driver", "reg_num"])
                              df.write.format("io.iguaz.v3io.spark.sql.kv") \
                                  .mode("append") \
                                  .option("key", "reg_num") \
                                  .option("counter", "kms, num_drives") \
                                  .save("v3io://mycontainer/Cars/")
                              

                              This example writes an item (row) to a "Cars" table in a "mycontainer" data container whose primary-key attribute is reg_num. The save mode is set to append, which is the required mode when defining Counter Attributes. The example demonstrates the special significance of this mode when used together with the counter option.

                              The DataFrame contains a primary-key reg_num attribute (7866689); num_drives (5) and kms (532) attributes, which are defined as counter attributes using the counter option; and regular (non-counter) policy ("IP-953481") and primary_driver ("Jenny Penny") attributes.
                              Assume a matching item (reg_num=7866689) already exists in the table and that its has a num_drives attribute with the value 95 and a primary_driver attribute with the value "Harry Carey", but no kms or policy attributes.

                              Because the table item already has the num_drives counter attribute, its current value (95) will be incremented by the specified attribute value (5), updating the attribute's value to 100. Because the kms counter attribute is new, it will be added to the item and initialized to its DataFrame value — 532.
                              Both non-counter attributes in the DataFrame will be added to the table item with the respective DataFrame values, overwriting any existing values: the value of the primary_driver attribute will change from "Harry Carey" to "Jenny Penny", and a policy attribute with the value "IP-953481" will be added to the item. (This behavior is different when using the append or overwrite save modes without the counter option for the same non-counter attributes.)

                              Example 4 — Conditional Update

                                import org.apache.spark.sql.SparkSession
                                
                                val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
                                
                                var writeDF = Seq(("7843321", "Honda", 29321))
                                    .toDF("reg_license", "model", "odometer")
                                writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
                                    .option("key", "reg_license")
                                    .mode("overwrite").save("v3io://mycontainer/cars/")
                                var readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv")
                                    .load("v3io://mycontainer/cars/")
                                readDF.show()
                                
                                writeDF = Seq(("7843321", "Honda", 31718))
                                    .toDF("reg_license", "model", "odometer")
                                writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
                                    .option("key", "reg_license")
                                    .option("condition", "${odometer} > odometer")
                                    .mode("append").save("v3io://mycontainer/cars/")
                                readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv")
                                    .load("v3io://mycontainer/cars/")
                                readDF.show()
                                
                                writeDF = Seq(("7843321", "Ford", 40001))
                                    .toDF("reg_license", "model", "odometer")
                                writeDF.write.format("io.iguaz.v3io.spark.sql.kv")
                                    .option("key", "reg_license")
                                    .option("condition", "${model} == model")
                                    .mode("append").save("v3io://mycontainer/cars/")
                                readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv")
                                    .load("v3io://mycontainer/cars/")
                                readDF.show()
                                
                                from pyspark.sql import SparkSession
                                
                                spark = SparkSession.builder.getOrCreate()
                                
                                writeDF = spark.createDataFrame([("7843321", "Honda", 29321)],
                                                                ["reg_license", "model", "odometer"])
                                writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
                                    .option("key", "reg_license") \
                                    .mode("overwrite").save("v3io://mycontainer/cars/")
                                readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
                                    .load("v3io://mycontainer/cars/")
                                readDF.show()
                                
                                writeDF = spark.createDataFrame([("7843321", "Honda", 31718)],
                                                                ["reg_license", "model", "odometer"])
                                writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
                                    .option("key", "reg_license") \
                                    .option("condition", "${odometer} > odometer") \
                                    .mode("append").save("v3io://mycontainer/cars/")
                                readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
                                    .load("v3io://mycontainer/cars/")
                                readDF.show()
                                
                                writeDF = spark.createDataFrame([("7843321", "Ford", 40001)],
                                                                ["reg_license", "model", "odometer"])
                                writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
                                    .option("key", "reg_license") \
                                    .option("condition", "${model} == model") \
                                    .mode("append").save("v3io://mycontainer/cars/")
                                readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
                                    .load("v3io://mycontainer/cars/")
                                readDF.show()
                                

                                This example demonstrates how to conditionally update NoSQL table items by using the condition write option. Each write call in the example is followed by matching read and show calls to read and display the value of the updated item in the target table after the write operation.

                                The first write command writes an item (row) to a "cars" table in a "mycontainer" data container. The item's reg_license primary-key attribute is set to 7843321, the mode attribute is set to "Honda", and the odometer attribute is set to 29321. The overwrite save mode is used to overwrite the table if it already exists and create it otherwise. Reading the item from the table produces this output:

                                +-----------+-----+--------+
                                |reg_license|model|odometer|
                                +-----------+-----+--------+
                                |    7843321|Honda|   29321|
                                +-----------+-----+--------+
                                

                                The second write command uses the condition option to update the value of the item's odometer attribute only if this value is higher than the current value of this attribute in the table — option("condition", "${odometer} > odometer"). The append save mode is used to update the specified item rather than overwriting the table. Because the value of odometer in the write DataFrame (31718) is higher than the current value of this attribute in the table (29321), the condition evaluates to true and the write is executed, updating the value of the item's odometer attribute in the table, as shown when reading the item from the table:

                                -----------+-----+--------+
                                |reg_license|model|odometer|
                                +-----------+-----+--------+
                                |    7843321|Honda|   31718|
                                +-----------+-----+--------+
                                

                                The third write command uses the condition option (again with the append save mode) to update the value of the item's odometer attribute to 40001 only if it the value of the model attribute remains the same — option("condition", "${model} == model"). Because the value of model in the write DataFrame ("Ford") is different than the current value of this attribute in the table ("Honda"), the condition evaluates to false and the write isn't executed (i.e., the table item isn't updated), as shown when reading the item from the table:

                                -----------+-----+--------+
                                |reg_license|model|odometer|
                                +-----------+-----+--------+
                                |    7843321|Honda|   31718|
                                +-----------+-----+--------+
                                

                                See Also