I am trying to run the Wordcount Demo from here with the Samza Runner. This is my build.gradle
plugins {
id 'eclipse'
id 'java'
id 'application'
// 'shadow' allows us to embed all the dependencies into a fat jar.
id 'com.github.johnrengelman.shadow' version '4.0.3'
}
mainClassName = 'samples.quickstart.WordCount'
maven {
url = uri('http://packages.confluent.io/maven/')
}
mavenCentral()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
ext.apacheBeamVersion = '2.22.0'
dependencies {
shadow "org.apache.beam:beam-sdks-java-core:$apacheBeamVersion"
runtime "org.apache.beam:beam-runners-direct-java:$apacheBeamVersion"
runtime "org.slf4j:slf4j-api:1.+"
runtime "org.slf4j:slf4j-jdk14:1.+"
compile group: 'org.apache.beam', name: 'beam-runners-samza', version: '2.22.0'
compile group: 'org.apache.samza', name: 'samza-api', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-core_2.11', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-kafka_2.11', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-kv_2.11', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: '1.4.0'
testCompile "junit:junit:4.+"
}
shadowJar {
zip64 true
baseName = 'WordCount' // Name of the fat jar file.
classifier = null // Set to null, otherwise 'shadow' appends a '-all' to the jar file name.
manifest {
attributes('Main-Class': mainClassName) // Specify where the main class resides.
}
}
My wordcount.java is as follows.
package samples.quickstart;
import org.apache.beam.runners.samza.SamzaRunner;
//import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class WordCount {
private static final String jobName = "beamtest";
public static void main(String[] args) {
String inputsDir = "data/*";
String outputsPrefix = "outputs/part";
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setRunner(SamzaRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read lines", TextIO.read().from(inputsDir))
.apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
.apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
.apply("Count words", Count.perElement())
.apply("Write results", MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
.apply(TextIO.write().to(outputsPrefix));
pipeline.run().waitUntilFinish();
}
}
I am using Beam Version 2.22.0. I tried the following Combinations. Samza 1.4 with Beam 2.22, Samza 1.0 with Beam 2.11 and Beam 2.22 and Samza 0.14.1 with Beam 2.11.0. However while executing i get the following error:
java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
I am using Java 1.8. Does anybody have an Idea what causes this Problem?
may you paste the build.gradle and modified wordcount.java using Samza runner here, so that we can have an investigation on whether it is incompatibility issue or configuration issue. Thanks for having a try over Samza runner!