I have 3 log files in my folders. Like
foldera = emplog,deptlog,companylog
folderb = emplog,deptlog,companylog
folderc = emplog,deptlog,companylog
I have 3 diff scala program file to extract the data from each of them.
employee.scala
department.scala
companylog.scala
Each of them code like below.
I want to combine all these files and execute them paralleled manner.
package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}
object logparser {
def main(args: Array[String]) = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
//Start the Spark context
val conf = new SparkConf()
.setAppName("Parser")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext= new SQLContext(sc)
val test = sc.wholeTextFiles("C:\\mkdir\\*\\*")
.map{l =>
if(l._1.endsWith("emplog.txt")){
empparser(l._2,sc,sqlContext)
}
l
}
.foreach{println}
}
def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = {
val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r
import sqlContext.implicits._
val indrecs = emppattern.findAllIn(record)
.map{ line =>
val emppattern(eid,ename) = line
(eid,ename)
}
.toSeq
.toDF("eid","ename")
.show()
}
}
I have tried my code in attaching each method within same object.
Now 2 questions arise Q1. When I compile I get
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6b0615ae)
- field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class com.sample.logparser$$anonfun$1, <function1>)
As far as I know(newbie only) Spark context can't be serialized. If I dont pass sc as parameter, I get Nullpointer Exception. How do I solve this?
Q2 :I will insert to hive table code within empparser method after converting to DF. Once that done , I dont want to do anything within my main. But my map code wont execute unless I have action after that. thats why I have foreacch println after that. Is there way to overcome this issue?
To attempt to answer the question, I'm going to assume that the result of processing a employee or a department results in the same kind of record. I would expect this to be different for each kind of data so I'm keeping the processing of different kinds of records separately to allow for this "adjustment with reality".
First, we define a record
case class
and parsers for the different kind or record types. (Here I'm copying the same impl for the sake of simplicity)We load the data using
wholeFiles
:And then, we process the different kind of records by filtering the files to obtain the kind of files that we require and apply the parser we defined above. Note how we are practically repeating the same process. This could be abstracted out.
We now convert to a DataFrame
And we could do the same for the other record types as well.
There's plenty of room to reduce code duplication in this process depending on whether we can find commonalities in the processes of the different data types.