Spark Streaming: Average of all the time

1.1k Views Asked by At

I wrote a Spark Streaming application which receives temperature values and calculates the average temperature of all time. For that i used the JavaPairDStream.updateStateByKey transaction to calculate it per device (separated by the Pair's key). For state tracking I use the StatCounter class, which holds all temperature values as doubles and re-calculates the average each stream via calling the StatCounter.mean method. Here my program:

EDITED MY WHOLE CODE: NOW USING StatCounter

JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));

streamingContext.checkpoint("hdfs://server:8020/spark-history/checkpointing");

JavaReceiverInputDStream<String> ingoingStream = streamingContext.socketTextStream(serverIp, 11833);

JavaDStream<SensorData> sensorDStream = ingoingStream.map(new Function<String, SensorData>() {
    public SensorData call(String json) throws Exception {
        ObjectMapper om = new ObjectMapper();
        return (SensorData)om.readValue(json, SensorData.class);
    }
});

JavaPairDStream<String, Float> temperatureDStream = sensorDStream.mapToPair(new PairFunction<SensorData, String, Float>() {
    public Tuple2<String, Float> call(SensorData sensorData) throws Exception {
        return new Tuple2<String, Float>(sensorData.getIdSensor(), sensorData.getValTemp());
    }
});

JavaPairDStream<String, StatCounter> statCounterDStream = temperatureDStream.updateStateByKey(new Function2<List<Float>, Optional<StatCounter>, Optional<StatCounter>>() {
    public Optional<StatCounter> call(List<Float> newTemperatures, Optional<StatCounter> statsYet) throws Exception {
        StatCounter stats = statsYet.or(new StatCounter());

        for(float temp : newTemperatures) {
            stats.merge(temp);
        }

        return Optional.of(stats);
    }
});

JavaPairDStream<String, Double> avgTemperatureDStream = statCounterDStream.mapToPair(new PairFunction<Tuple2<String,StatCounter>, String, Double>() {
    public Tuple2<String, Double> call(Tuple2<String, StatCounter> statCounterTuple) throws Exception {
        String key = statCounterTuple._1();
        double avgValue = statCounterTuple._2().mean();

        return new Tuple2<String, Double>(key, avgValue);
    }
});

avgTemperatureDStream.print();

This seems to work fine. But now to the question:

I just found an example online which also shows how to calculate a average of all time here: https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/total.html

They use AtmoicLongs etc. for storing the "stateful values" and update them in a forEachRDD method.

My question now is: What is the better solution for a stateful calculation of all time in Spark Streaming? Are there any advantages / disadvantages of using one or the other way? Thank you!

0

There are 0 best solutions below