V3IOUtils Object
Description
A singleton utility object for creating Spark input streams that can be used to consume platform stream records via the Spark Streaming API.
Summary
object V3IOUtils
-
def createDirectStream[ V: ClassTag, VD <: Decoder[V] : ClassTag, R: ClassTag]( ssc: StreamingContext, v3ioParams: Map[String, String], streamNames: Set[String], messageHandler: RecordAndMetadata[V] => R) : InputDStream[R]
Prototype
object V3IOUtils
createDirectStream Method
Creates a Spark input-stream object that can be used to consume record data and metadata from the specified platform streams, using the Spark Streaming API (v3.2.3). The new input stream pulls stream records by querying the platform directly, without using a receiver.
Syntax
createDirectStream[
V: ClassTag,
VD <: Decoder[V] : ClassTag,
R: ClassTag](
ssc: StreamingContext,
v3ioParams: Map[String, String],
streamNames: Set[String],
messageHandler: RecordAndMetadata[V] => R)
: InputDStream[R]
Parameters
- ssc
Spark streaming-context object.
- Type: Spark
StreamingContext
- Requirement: Required
- Type: Spark
- v3ioParams
An optional map of platform configuration properties that configure the creation and behavior of the returned Spark input stream. For example,
configures the block size for read operations to 0.5 MB (524288 bytes).Map("default-data-block-size" -> "524288")
- Type:
Map[String, String]
— a map of platform-property name and value string pairs
- Requirement: Optional
NoteAll property values are provided as strings. The "Property Type" information in the following descriptions indicates the property type that the string represents.Start-Position Configuration PropertiesYou can optionally set the following properties to determine the start position for the new Spark input stream:
-
spark.streaming.v3io.streamStartPosition — the stream start-position type.- Property Type: String
- Default Value:
"latest"
The following property values are supported. The start-position information applies to each of the shards in the source V3IO streams (see
streamNames ):-
"earliest" — start at the location of the earliest ingested record in the shard. -
"time" — start at the location of the earliest ingested record in the shard beginning at the base time configured in thespark.streaming.v3io.streamStartTimeMS property. For shards without a matching record ingestion time (i.e., if all records in the shard arrived before the configured base time) the start position is set to the end of the shard. -
"<record sequence number>" — start at the location of the record whose sequence number matches the property value (for example,"1"
). For shards without a matching record sequence number, the start position is set to the end of the shard. -
spark.streaming.v3io.streamStartTimeMS — the base time for a time-based stream start position (spark.streaming.v3io.streamStartPosition ="time"
), as a Unix timestamp in milliseconds. For example,1511260205000
sets the base time for determining the start position to 21 Nov 2017 at 10:30:05 AM UTC.- Property Type: Long
- Default Value:
0
(Unix Epoch — 1 Jan 1970 at 00:00:00 UTC)
- Type:
- streamNames
A set of one or more fully qualified V3IO stream paths of the format
— wherev3io://<container name>/<stream path>
<container name>
is the name of the stream's parent container and<stream path>
is the path to the stream within the specified container. For example,v3io://mycontainer/mydata/metrics_stream
.NoteAll streams paths in the
streamNames set must point to existing V3IO streams that reside in the same container.- Type:
Set[String]
- Requirement: Required
- Type:
- messageHandler
An optional record handler function for converting a stream record of type
RecordAndMetadata into the desired input-stream record type, which is indicated by the handler's return value (R ). The default record handler returns the record's data (also known as the record payload or value), decoded as the value type of theRecordAndMetadata class instance (V ).Type: A record-handler function
messageHandler: RecordAndMetadata[V] => R
- Requirement: Optional
Type Parameters
V — the type of the input stream's record data ("value").VD — the decoder class to use for converting the record data into the specified type — the input stream's value type, as set in the method's V type parameter. See Stream-Data Encoding and Decoding Types.R — the input stream's record type, which is the type of the record handler's return value (see messageHandler). This type parameter is applicable only when providing a record handler in the method call. The default input-stream record type is the decoding type of the record data, as set in the method's V type parameter.
Return Value
Returns an instance of a Spark input stream (
Examples
Example 1
Create a Spark input stream for "/DriverStream" and "/Passengers" V3IO streams in a "mycontainer" container using the
val batchInterval = Seconds(1)
val sparkConf = new SparkConf()
.setAppName("My Car Streams Application")
val ssc = new StreamingContext(sparkConf, batchInterval)
val cfgProperties = Map.empty[String, String]
val streamNames = Set("/DriverStream", "/Passengers")
val carsInputDStream = {
val recordHandler = (rmd: RecordAndMetadata[String]) =>
rmd.payloadWithMetadata()
V3IOUtils.createDirectStream[String, StringDecoder,
PayloadWithMetadata[String]](
ssc,
cfgProperties,
streamNames,
recordHandler)
}
Example 2
Create a Spark input stream for "/WebClicks" and "/ServerLogs" V3IO streams within a "/Web/Streams/" directory in a "mycontainer" container, starting at the earliest ingested record found in each of the stream shards. Use the default record handler to create an input stream that returns the record data as a string:
val batchInterval = Seconds(2)
val sparkConf = new SparkConf()
.setAppName("My Web Streams Application")
val ssc = new StreamingContext(sparkConf, batchInterval)
val cfgProperties = Map("spark.streaming.v3io.streamStartPosition" -> "earliest")
val streamNames =
Set("/Web/Streams/WebClicks", "/Web/Streams/ServerLogs")
val webInputDStream = {
V3IOUtils.createDirectStream[String, StringDecoder](
ssc,
cfgProperties,
streamNames)
}