I have scala code that takes multiple input files from HDFS using wildcards and each files goes into a function where processing takes place for each file individually.
import de.l3s.boilerpipe.extractors.KeepEverythingExtractor
val data = sc.wholeTextFiles("hdfs://localhost:port/akshat/folder/*/*")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String): (String,String) = {
// logic of processing a single file comes here
val logData = sc.textFile(file);
val c = logData.toLocalIterator.mkString
val d = KeepEverythingExtractor.INSTANCE.getText(c)
val e = sc.parallelize(d.split("\n"))
val recipeName = e.take(10).last
val prepTime = e.take(18).last
(recipeName,prepTime)
}
//How transformation and action applied here?
I am stuck at how to to apply further transformations and actions so that all my input files are mapped according to function doSomething and all the output from each of the input files is stored in a single file using saveAsTextFile.
So if my understanding is correct, you have an RDD of Pairs and you wish to transform it some more, and then save the output for each key to a unique file. Transforming it some more is relatively easy,
mapValue
will allow you to write transformations on just the value, as well any other transformation will work on RDDs of Pairs.Saving the output to a unique file for each key however, is a bit trickier. One option would be to try and find a hadoopoutput format which does what you want and then use
saveAsHadoopFile
, another option would be to useforeach
and then just write the code to output each key/value pair as desired.