spline spark agent jar has errors during post processing

578 Views Asked by At

I have been trying to run the following code with the new spline jsr: za.co.absa.spline.agent.spark:spark-3.0-spline-agent-bundle_2.12:0.6.0 but have been getting errors specific to UserExtraMetadataProvider which has been deprecated in the newer versions. I have also tried replacing UserExtraMetadataProvider with UserExtraAppendingPostProcessingFilter using the code shown below this first code block but still getting the errors. Can you please validate and share how to properly write the post processing filter code using the new spline bundle.

%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON

val splineConf: Configuration = StandardSplineConfigurationStack(spark)

spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
//override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
//val test = dbutils.notebook.getContext.notebookPath
val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]

val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")

val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)

val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"name" -> name,
"mounts" -> dbutils.fs.ls("/mnt").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)

override protected def userExtraMetadataProvider: UserExtraMetadataProvider = new UserExtraMetadataProvider {
override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson) // tilføj mount info til searchAndReplace denne funktion indeholder infoen
override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
}
})

Here is the updated code that is still having errors

%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import play.api.libs.json._

val splineConf: Configuration = StandardSplineConfigurationStack(spark)

spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
val notebookInformationJson = Json.toJson(dbutils.notebook.getContext)
val outerMap = Json.toJson(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]

val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")

val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)

val notebookInfo = Map("notebookURL" -> Json.toJson(notebookURL),
"user" -> Json.toJson(user),
"name" -> Json.toJson(name),
"mounts" -> Json.toJson(dbutils.fs.ls("/mnt").map(_.path)),
"timestamp" -> Json.toJson(System.currentTimeMillis))

val notebookInfoJson = Json.toJson(notebookInfo)

def userExtraMetadataProvider: UserExtraAppendingPostProcessingFilter
= new UserExtraAppendingPostProcessingFilter

{
def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
def processExecutionPlan (plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
def processReadOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
def processDataOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
}
})

Here is the error:

    command-2044409137370707:12: error: not enough arguments for constructor DefaultSplineConfigurer: (sparkSession: org.apache.spark.sql.SparkSession, userConfiguration: org.apache.commons.configuration.Configuration)za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.
Unspecified value parameter userConfiguration.
spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
                                ^
command-2044409137370707:32: error: not found: type UserExtraAppendingPostProcessingFilter
 def userExtraMetadataProvider: UserExtraAppendingPostProcessingFilter
                                ^
command-2044409137370707:33: error: not found: type UserExtraAppendingPostProcessingFilter
 = new UserExtraAppendingPostProcessingFilter
       ^
command-2044409137370707:37: error: not found: type ExecutionEvent
    def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
                                     ^
command-2044409137370707:38: error: not found: type ExecutionPlan
    def processExecutionPlan (plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
                                    ^
command-2044409137370707:39: error: not found: type ReadOperation
    def processReadOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
                                 ^
command-2044409137370707:40: error: not found: type WriteOperation
    def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
                                  ^
command-2044409137370707:41: error: not found: type DataOperation
    def processDataOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
                                 ^
command-2044409137370707:36: warning: a pure expression does nothing in statement position; multiline expressions may require enclosing parentheses
  {
  ^
1

There are 1 best solutions below

0
On

Your code doesn't compile for a few reasons:

  1. You miss some imports (the error log makes it clear):

    import za.co.absa.spline.producer.model.v1_1._
    import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
    
  2. The correct signature for the extra metadata provider is the following:

    protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider]
    
  3. UserExtraAppendingPostProcessingFilter is just an adapter for the deprecated UserExtraMetadataProvider. So you still need to create an instance:

    new UserExtraAppendingPostProcessingFilter(new UserExtraMetadataProvider() { 
      // ???
    })
    

Please note that we are working on a declarative solution for capturing the extra metadata, so that most of the rules and values could be defined in the configuration, and little to no coding would be required for that. See https://github.com/AbsaOSS/spline-spark-agent/issues/169

For now just use UserExtraMetadataProvider


For more details see https://github.com/AbsaOSS/spline-spark-agent/discussions/228#discussioncomment-819620