Spark Streaming for Beginners – Towards Data Science

Photo by Safar Safarov at Unsplash.com

Spark is considered a highly fast engine for processing large volumes of data and is found to be 100 times faster than MapReduce. It uses distributed data processing through which it splits data into smaller parts so that chunks of data can be calculated in parallel on machines, saving time. In addition, it uses in-memory processing instead of disk-based processing, allowing for faster computation.

Spark Streaming is one of the most important parts of the Big Data ecosystem. It is an Apache Spark Foundation software framework used to manage Big Data. Basically, it ingests the data from sources like Twitter in real time, processes it using functions and algorithms, and sends it to store in databases and other places.

Source: spark.apache.org

Spark Configuration First we set up spark and tell you where you have to ingest the data from, whether it’s from local directory, spark cluster, mesos cluster, or kubernetes cluster. If you’re not familiar with these terms, don’t worry. Basically, these are cluster management systems that generate needs to handle tasks such as checking node status and scheduling jobs. If you choose your local directory as the master, you must specify the number of cores on your local machine on which you want Spark to run. The more cores you use to run, the faster the performance. If you specify *, it means using all cores on your system. Then we specify the name of the application, which is the name we give to our Spark application.

SparkConf conf = new SparkConf().setAppName(“SparkApp”).setMaster(“local[*]”);

Creating

the Streaming

Context Object We then create a Java Streaming Context object that opens the door for the stream to begin. It provides methods for creating JavaDStream and JavaPairDStream from input sources that we’ll discuss later. When creating the Java Streaming Context object, we need to specify the batch interval; Basically, Spark Streaming splits the incoming data into batches so that the final result is also generated in batches. A batch interval tells Spark that how long it has to get the data, like 1 minute, it would get the last 1 minute’s data.

Source: spark.apache.org

Then, the data would start pouring into a batch stream, this continuous stream of data is called DStream. Each batch of dsteam would contain a collection of items that can be processed in parallel, this collection is called RDDs.

Source: spark.apache.org

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(60));

To receive data, the streaming context provides a method for transmitting data from a TCP socket connection or from files as input sources. Sources can be sources like HDFS, S3, etc. To read text files, there is the textFileStream method of javastreamingcontext.

JavaDStream<String> lines = jssc.textFileStream(“C:\Users\HP\Downloads\Spark_Streams”);

But it will not be able to read the files already present in the directory before the transmission context begins, as it only reads the newly created files.

So here I will be transmitting the data through the socket connection through port 9999 and create a DStream java receiver entry with it.

JavaReceiverInputDStream<String> lines = jssc.socketTextStream(“localhost”, 9999);

So now, if you establish a socket connection and type something in the terminal, and run the dstream, you will see the text that appears in the console.

Note: To start a java streaming context, we need to tell spark to launch it, wait for the calculation to finish, and then stop it. And we need to print the DStream by print() method.

lines.print();jssc.start();jssc.awaitTermination();jssc.stop();

Input through

TCP socket output in the console Notice when you print the output at time t1, but no output is printed

at

time t2 and t3, because you get data for every minute. In the following batch intervals, he did not receive any input, so he does not print anything.

Now I’ll show you how we can use some transformations in these dstreams using lanbda functions.

The map transformation applies the function we specify in the DStream and produces an output value for each input value. So it basically transforms one stream into another. As here, I want to count the length of the line of text, so I will use the map transformation for it.

JavaDStream<Integer> length = lines.map(x -> x.length());

Input
Counts the length of the line of text

The FlatMap transformation applies the function in DStream but can produce one or more output values for each input value. So, if I want to transform the RDD in such a way that it produces more than one value, I will use the FlatMap transformation.

So here I gave you input a line of text ‘hello, how are you?’ and I want to break it down into individual words. I used the lambda function for the same thing. A FlatMap transformation returns an arbitrary number of values that depends on the rdd and the applied function, so the type returned must be a sequence of values.

JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split(” “)).iterator());

Divide the line into words

A reduction transformation adds the elements in each RDD. It takes two RDD arguments from a single element and returns one.

Here, after applying the flatMap function and returning the word flow, I will apply the reduce transformation to get the word with the highest length in each RDD.

JavaDStream<String> reduce = words.reduce((a,b) -> {String max_length_word;if(a.length() >= b.length()) {max_length_word = a;} else {max_length_word = b;} devolver max_length_word;});

Try to understand this code, here you take arguments of type String, in which the words in each RDD are added based on their length, and the word with maximum length is returned.

Input
Returns the word with the maximum length in each batch

The filter transformation filters the DStream according to the given function. Just like after the flatMap transformation, let’s say I want to filter the word hello from the word sequence.

JavaDStream<String> filter = words.filter(x -> !x.equals(“hello”));

Notice

that

the word ‘Hello’ is not filtered as it includes a capital letter that we did not specify in our code

.

The mapToPair transformation transforms each input value into a pair of values.

JavaPairDStream<String, Integer> pairs = filter.mapToPair(x -> new Tuple2<String, Integer>(x, 1));

Note here that the object created is a JavaPairDStream instead of DStream, as we are returning pairs in the stream.

Returns word and integer 1

In a DStream, we can add the RDD elements based on the key using reduceByKey. Unlike the reductive transformation, it takes pairs of non-RDDs of a single element. Like here, you need a tuple of String and Integer, and we’re adding the count of the number of times a word appears in an RDD. It takes two arguments and returns one. But by specifying the call type, we are not specifying tuple<String, Integer>, we are only specifying Integer since reducing by key will take note of the key and add it based on the specified function.

JavaPairDStream<String, Integer> sum = pairs.reduceByKey((a,b) -> a + b);

For very word, add the integer 1 according to

the number of times the word appears

The Count transformation counts the number of elements in each RDD and returns a DStream with single-element RDDs

.

Here, after applying the flatMap transformation, we want to know the word count in each RDD.

JavaDStream<Long> count = words.count(); Returns the number of words in the line of text Therefore, the

number of words in the

RDD was 13, so it returned 13 as output. This was after applying flatMap which split the word line into individual words. If we apply without dividing the words we will see what we get.

Returns the number of lines you receive in each batch

Since we haven’t broken down the word line into individual words, spark is treating the entire line as a single item. In the first batch, it receives 3 lines, so it returns the count as 3, and the next batches receive 2 and 6 lines, so the count is 3 and 6 respectively.

countByValue takes a DStream of type k and counts the number of times the key appears in the RDD and returns a paired PairedDStream of pairs (k, value).

Here, having split the word line with flatMap, I applied the countByValue transformation.

JavaPairDStream<String, Long> countByValue = words.countByValue();

input
Counts how many times a word appears in an RDD

Now I will show you some actions that we can perform in RDDs. So basically, we’re applying transformations on DStreams that contain RDDs, and we’re applying functions on those RDDs when we specify a transformation. There are some actions that Spark provides that we can apply on these RDDs.

So let’s say I want to organize the key value pairs I got after applying countByValue in descending order. What I can do is swap those pairs of key values and then sort them, which I will show later. Using the mapToPair transformation, I am using the swap action in RDD to change it to pairs (Long, String).

JavaPairDStream<Long, String> swap = countByValue.mapToPair(x -> x.swap());

The

input swap action

swaps the key value pairs

Now to sort the values in descending order, I can use the RDD sortByKey transformation. It will be sorted in ascending or descending order according to the specified Boolean value. If we do not specify the Boolean value, it will be sorted in ascending as default.

Now to use sortByKey, I will use the transformToPair transformation. A transform function returns a new DStream by applying RDD to RDD transformation on each RDD. Here I want to return PairedDStream, so I will use transformToPair.

JavaPairDStream<Long, String> sort = swap.transformToPair(x -> x.sortByKey(false)); Sort the keys

in descending order

Now I can swap the pairs again using mapToPair to get words as keys and counts as values.

JavaPairDStream<String, Long> swap_again = sort.mapToPair(x -> x.swap());

entry
Exchanged again

This was a summary of how spark streaming works and some examples of how we apply transformations to DStreams. Thanks for reading!!! If you have doubts, you can ask me in the comments section.

Contact US