Suche
Suche Menü

Qudosoft goes Big Data Part 3

„Qudosoft goes Big Data“ is an ongoing series of blog posts in which we will share our experience, the first steps we took and the best practices we found for ourselves for our use cases.
In previous posts we already discussed Hadoop and his ecosystem, Cluster and Software and we will continue with a blog post about Spark.

Spark

We currently use spark as our main computing engine for the most problems which we cannot solve on our local machines (regarding needed volume or computing-power). Spark was introduced by the AMPLab in 2009 and is an open source project of the Apache Foundation with more than 800 contributors. It can be executed as standalone or cluster system and follows the same principle of computing independent subsets of data in a stateless manner. This is achieved by using RDDs, which stands for „Resilient Distributed Datasets“, allowing you to handle a huge dataset as for example a list of certain objects. The actions performed on RDDs have the same look-and-feel as actions performed on well-known data types of your preferred programming language and you do not have to think in the way of map-reduce.

Let us have a quick look at the ‚hello world‘ equivalent in big data context, which is counting words. The first code snippet is a MapReduce-job. as you can see, it needs at least three classes :

  • Mapper: Map-phase where the needed values are prepared for the reduce-phase. (Have a look at lines 22-30 in the following code-snippet )
  • Reducer:  Actual aggregation of the values prepared in the map-phase. (Have a look at lines 32-46 in the following code-snippet)
  • Driver: Represents the main for a map-reduce job. Here you define your iterative commands and combine multiple mapreduce-jobs. (The class itself, but have a look at the main-method)

Every Chunk (the subset of the data, in this case it is a line of a file) will be mapped to a tuple of (word, 1). In the reduce-phase, same words will be grouped together and the second value will be summed up. In the end, you will have a list of tuples of the word and the count of the occurence in your processed files.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

How does Spark solve this problem? The following code snippet does the exact same thing. As you can see, although it is Scala and generally seems less verbose than Java, you need one class rather than three but the identical operations are executed as methods. Using Spark, you do not have to think in a different paradigm and it feels as if you were programming locally. With Java 8 and lambdas your spark code seems less verbose like Scala.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
 
object SparkWordCount {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
    val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))
    val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)  
  }
}

You will realize that you are working with a distributed system once you get your first serialization errors because your map function of your java-lambda is not serializable.

RDDs behave lazy, meaning that your code will not be executed until a certain step in which the data you want to extract with your previous statements on the RDD are really needed. You could define multiple transformations with map and reduce or other aggregation methods but this operations will not be executed until you dont need the results like a print on the console of the count.

You can also cache transformations on RDDs that will be persisted in memory or at least on disk. Caching is useful if you intent to use a RDD for more than one operation.

DataFrames and Spark SQL

DataFrames are a well-known datastructure in R. They were introduced in Spark 1.3 and made Spark SQL possible. DataFrames are a representation of data as you would represent them in a relational database table or excel sheet. Python also has libraries like pandas that allow working on DataFrames because they really are comfortable to work with. When using DataFrames you can query your data and do the extraction and transformation of it without using the programming API of Spark itself. From databricks itself (company of the main contributors), we also heard a lot that using different programming APIs (Scala, Java, Python and R) leads to different perfomance but with Spark SQL you get an abstraction that will automatically generate a computing plan of your query. This translation of your query is done exactly the same in every programming API supported by Spark. For examples please have a look at DataFrames in R and Spark.

Here you can find an example of using the DataFrames API to count words and read more about the query optimizer if you follow this link explaining Tungsten.

With Spark SQL you can also generate a HiveContext. To do so, you will have to copy your hive-site.xml to your spark-folder and instanciate a HiveContext in your Spark-Application. Then you will have access to your hive metastore-table and be able to query all databases and their tables with Spark as computing engine. I am sure this will be faster than the default Hive performance, but we reached the limits of multiple joins that Hive could handle because of translations in many slow mapreduce-jobs while using less memory. We did not test if the Spark SQL performance can hold up to the Impala performance but had the possibility to execute operations provided by Spark SQL on structured data. For everything else you can still use the programming API of Spark.

Spark Streaming

Spark allows you to do micro-batching. Micro-batching means that you will not get data in realtime but rather in little batches where your application will wait for a certain period of time and then will retrieve new elements of the datasource you expect new entries arrived. We never really used Spark Streaming but tried Apache Storm in a slack project in order to display arriving orders in a heatmap of Germany displaying their total order value. Maybe Mesos is a better alternative for streaming applications because it allows dynamic resource allocation. YARN will reserve resources for undefined time which you possibly don’t need all the time.

There is much more to explore in Spark with GraphX (processing graph-data), MLLib (the machine learning library of Spark) and all the possible combinations with Hive, HBase, and Cassandra.

We definitely recommend using Spark since it is more intuitive than MapReduce and it supports the implementation of many usecases. It is a very popular open source project with more than 800 contributors over 200 companies. This leads to the hope that it will even get enhanced to combines multiple jobs like batch- and stream-processing, machine learning, graph-processing etc. in one framework.

Schreibe einen Kommentar

Pflichtfelder sind mit * markiert.


Agile Softwareentwicklung