main kafka stream
import com.yizhisec.bigdata.deserializer.TrafficDeserializer;
import com.yizhisec.bigdata.model.Traffic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.io.IOException;
import java.util.*;
public class KafkaStream {
private Map<String, Object> kafkaParams = new HashMap<>();
private Collection<String> topics;
private void getKafkaSetting() throws IOException {
Properties kafkaProp = Config.getProp();
kafkaParams.put("bootstrap.servers", kafkaProp.getProperty("kafka.broker.list"));
// SSL 配置
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", Config.getPath(Config.KAFKA_JKS));
kafkaParams.put("ssl.truststore.password", kafkaProp.getProperty("kafka.jks.passwd"));
// Kafka 配置
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", TrafficDeserializer.class);
kafkaParams.put("group.id", "big_data");
// kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
// topic
topics = Arrays.asList(kafkaProp.getProperty("kafka.topic").split(","));
}
private void run() {
try {
this.getKafkaSetting();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("Kafka").setMaster("local[2]");
// sparkConf.set("spark.serializer", KryoSerializer.class.getName());
// sparkConf.registerKryoClasses((Class<?>[]) Arrays.asList(ConsumerRecord.class).toArray());
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
topics = Arrays.asList("traffic");
JavaInputDStream<ConsumerRecord<String, Traffic>> stream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams)
);
JavaDStream<Traffic> traffics = stream.map(new Function<ConsumerRecord<String, Traffic>, Traffic>() {
@Override
public Traffic call(ConsumerRecord<String, Traffic> record) throws Exception {
return record.value();
}
}).filter(new Function<Traffic, Boolean>() {
@Override
public Boolean call(Traffic traffic) throws Exception {
return traffic != null;
}
});
traffics.foreachRDD(new VoidFunction<JavaRDD<Traffic>>() {
@Override
public void call(JavaRDD<Traffic> trafficRdd) throws Exception {
Traffic traffic = trafficRdd.take(1).get(0);
System.out.println(traffic);
}
});
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
KafkaStream k = new KafkaStream();
k.run();
}
}
model
public class Traffic {
public Long guid;
public int time;
public int end_time;
public String srcip;
public String srcmac;
public int srcport;
public String destip;
public String destmac;
public int destport;
public String proto;
public String appproto;
public Long upsize;
public Long downsize;
@Override
public String toString() {
return "Traffic{" +
"guid=" + guid +
", time=" + time +
", end_time=" + end_time +
", srcip='" + srcip + '\'' +
", srcmac='" + srcmac + '\'' +
", srcport=" + srcport +
", destip='" + destip + '\'' +
", destmac='" + destmac + '\'' +
", destport=" + destport +
", proto='" + proto + '\'' +
", appproto='" + appproto + '\'' +
", upsize=" + upsize +
", downsize=" + downsize +
'}';
}
}
Deserializer
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yizhisec.bigdata.model.Traffic;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.IOException;
import java.util.Map;
public class TrafficDeserializer implements Deserializer<Traffic> {
private ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public Traffic deserialize(String s, byte[] bytes) {
if (bytes == null) {
return null;
} else {
try {
return this.objectMapper.readValue(bytes, Traffic.class);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
@Override
public void close() {
}
}
ERROR
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2128)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:587)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:587)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:727)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:263)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:586)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157)
at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:42)
at com.yizhisec.bigdata.KafkaStream.run(KafkaStream.java:67)
at com.yizhisec.bigdata.KafkaStream.main(KafkaStream.java:100)
Caused by: java.io.NotSerializableException: com.yizhisec.bigdata.KafkaStream
Serialization stack:
- object not serializable (class: com.yizhisec.bigdata.KafkaStream, value: com.yizhisec.bigdata.KafkaStream@1bc425e7)
- field (class: com.yizhisec.bigdata.KafkaStream$2, name: this$0, type: class com.yizhisec.bigdata.KafkaStream)
- object (class com.yizhisec.bigdata.KafkaStream$2, com.yizhisec.bigdata.KafkaStream$2@426e505c)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 14 more
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>2.4.4</spark.version>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>io.snappydata</groupId>
<artifactId>snappy-spark-streaming-kafka-0.10_2.11</artifactId>
<version>2.1.1.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<!-- kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>