Spark UDF with Maxmind Geo Data

1.3k Views Asked by At

I'm trying to use the Maxmind snowplow library to pull out geo data on each IP that I have in a dataframe.

We are using Spark SQL (spark version 2.1.0) and I created an UDF in the following class:

class UdfDefinitions @Inject() extends Serializable with StrictLogging {

 sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
 val s3Config = configuration.databases.dataWarehouse.s3
 val lruCacheConst = 20000
 val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName) ),
  ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)

 def lookupIP(ip: String): LookupIPResult = {
  val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
  loc match {
    case None => LookupIPResult("", "", "")
    case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""), 
   x.city.getOrElse(""), x.regionName.getOrElse(""))
   }
 }

 val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)

}

The intention is to create the pointer to the file (ipLookups) outside the UDF and use it inside, so not to open files on each row. This get an error of task no serialized and when we use the addFiles in the UDF, we get a too many files open error (when using a large dataset, on a small dataset it does work).

This thread show how to use to solve the problem using RDD, but we would like to use Spark SQL. using maxmind geoip in spark serialized

Any thoughts? Thanks

1

There are 1 best solutions below

0
On

The problem here is that IpLookups is not Serializable. Yet it makes the lookups from a static file (frmo what I gathered) so you should be able to fix that. I would advise that you clone the repo and make IpLookups Serializable. Then, to make it work with spark SQL, wrap everything in a class like you did. The in the main spark job, you can write something as follows:

val IPResolver = new MySerializableIpResolver()
val resolveIP = udf((ip : String) => IPResolver.resolve(ip))
data.withColumn("Result", resolveIP($"IP"))

If you do not have that many distinct IP addresses, there is another solution: you could do everything in the driver.

val ipMap = data.select("IP").distinct.collect
    .map(/* calls to the non serializable IpLookups but that's ok, we are in the driver*/)
    .toMap
val resolveIP = udf((ip : String) => ipMap(ip))
data.withColumn("Result", resolveIP($"IP"))