Java Spark get kafka parse json

382 Views Asked by At

main kafka stream

import com.yizhisec.bigdata.deserializer.TrafficDeserializer;
import com.yizhisec.bigdata.model.Traffic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.io.IOException;
import java.util.*;

public class KafkaStream {

    private Map<String, Object> kafkaParams = new HashMap<>();
    private Collection<String> topics;

    private void getKafkaSetting() throws IOException {
        Properties kafkaProp = Config.getProp();
        kafkaParams.put("bootstrap.servers", kafkaProp.getProperty("kafka.broker.list"));
        // SSL 配置
        kafkaParams.put("security.protocol", "SSL");
        kafkaParams.put("ssl.truststore.location", Config.getPath(Config.KAFKA_JKS));
        kafkaParams.put("ssl.truststore.password", kafkaProp.getProperty("kafka.jks.passwd"));
        // Kafka 配置
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", TrafficDeserializer.class);
        kafkaParams.put("group.id", "big_data");
//        kafkaParams.put("auto.offset.reset", "earliest");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        // topic
        topics = Arrays.asList(kafkaProp.getProperty("kafka.topic").split(","));
    }

    private void run() {
        try {
            this.getKafkaSetting();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        SparkConf sparkConf = new SparkConf().setAppName("Kafka").setMaster("local[2]");
//        sparkConf.set("spark.serializer", KryoSerializer.class.getName());
//        sparkConf.registerKryoClasses((Class<?>[]) Arrays.asList(ConsumerRecord.class).toArray());
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));


        topics = Arrays.asList("traffic");

        JavaInputDStream<ConsumerRecord<String, Traffic>> stream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topics, kafkaParams)
        );

        JavaDStream<Traffic> traffics = stream.map(new Function<ConsumerRecord<String, Traffic>, Traffic>() {
            @Override
            public Traffic call(ConsumerRecord<String, Traffic> record) throws Exception {
                return record.value();
            }
        }).filter(new Function<Traffic, Boolean>() {
            @Override
            public Boolean call(Traffic traffic) throws Exception {
                return traffic != null;
            }
        });

        traffics.foreachRDD(new VoidFunction<JavaRDD<Traffic>>() {
            @Override
            public void call(JavaRDD<Traffic> trafficRdd) throws Exception {
                Traffic traffic = trafficRdd.take(1).get(0);
                System.out.println(traffic);
            }
        });



        jssc.start();
        try {
            jssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        KafkaStream k = new KafkaStream();
        k.run();
    }

}

model

public class Traffic {
    public Long guid;
    public int time;
    public int end_time;
    public String srcip;
    public String srcmac;
    public int srcport;
    public String destip;
    public String destmac;
    public int destport;
    public String proto;
    public String appproto;
    public Long upsize;
    public Long downsize;

    @Override
    public String toString() {
        return "Traffic{" +
                "guid=" + guid +
                ", time=" + time +
                ", end_time=" + end_time +
                ", srcip='" + srcip + '\'' +
                ", srcmac='" + srcmac + '\'' +
                ", srcport=" + srcport +
                ", destip='" + destip + '\'' +
                ", destmac='" + destmac + '\'' +
                ", destport=" + destport +
                ", proto='" + proto + '\'' +
                ", appproto='" + appproto + '\'' +
                ", upsize=" + upsize +
                ", downsize=" + downsize +
                '}';
    }
}

Deserializer

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yizhisec.bigdata.model.Traffic;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.IOException;
import java.util.Map;

public class TrafficDeserializer implements Deserializer<Traffic> {
    private ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);

    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public Traffic deserialize(String s, byte[] bytes) {
        if (bytes == null) {
            return null;
        } else {
            try {
                return this.objectMapper.readValue(bytes, Traffic.class);
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
        }
    }

    @Override
    public void close() {

    }
}

ERROR

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2128)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:587)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:587)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:727)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:263)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:586)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157)
    at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:42)
    at com.yizhisec.bigdata.KafkaStream.run(KafkaStream.java:67)
    at com.yizhisec.bigdata.KafkaStream.main(KafkaStream.java:100)
Caused by: java.io.NotSerializableException: com.yizhisec.bigdata.KafkaStream
Serialization stack:
    - object not serializable (class: com.yizhisec.bigdata.KafkaStream, value: com.yizhisec.bigdata.KafkaStream@1bc425e7)
    - field (class: com.yizhisec.bigdata.KafkaStream$2, name: this$0, type: class com.yizhisec.bigdata.KafkaStream)
    - object (class com.yizhisec.bigdata.KafkaStream$2, com.yizhisec.bigdata.KafkaStream$2@426e505c)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 14 more
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spark.version>2.4.4</spark.version>
        <scala.version>2.11.12</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>io.snappydata</groupId>
            <artifactId>snappy-spark-streaming-kafka-0.10_2.11</artifactId>
            <version>2.1.1.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <!--        kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-json</artifactId>
            <version>2.3.1</version>
        </dependency>

    </dependencies>
0

There are 0 best solutions below