Need to Re-Write Scala Code for Specific JSON Output

300 Views Asked by At

I am trying to register Databricks notebook lineage to Azure Purview through spline and apacheatlas api. There are two versions of the code: 1) is the original code which uses databricks runtime version 6.4 and is working as expected but we need to run this in a newer runtime version of at least version 7.5 and above so there is a 2) second version of the code which is refactored for runtime version 7.5. Specifically the new code needs new JSON packages but the output(see below) does not match the expected output of the original code (also shown below). Needing to re-write the code for it to execute correctly because current new code has errors. Thanks

ORIGINAL OLD CODE: The original code which uses Databricks Runtime version6.4 is below

%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON

val splineConf: Configuration = StandardSplineConfigurationStack(spark)

spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
  //override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
  //val test = dbutils.notebook.getContext.notebookPath
  val notebookInformationJson = dbutils.notebook.getContext.toJson
  val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
  val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]

  val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
  val notebookPath = extraContextMap("notebook_path").split("/")
  
  val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
  val user = tagMap("user")
  val name = notebookPath(notebookPath.size-1)
  
  val notebookInfo = Map("notebookURL" -> notebookURL,  
                "user" -> user, 
                "name" -> name,
                "mounts" -> dbutils.fs.ls("/mnt").map(_.path),
                "timestamp" -> System.currentTimeMillis)
  val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
  
  override protected def userExtraMetadataProvider: UserExtraMetadataProvider = new UserExtraMetadataProvider {
    override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
    override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
    override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
    override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
    override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
  }
})

ORIGINAL EXPECTED OUTPUT: Here is the expected JSON output from the original code

{
  'id': '618c92e1-ae79-491e-b6fd-b5080dc7ef8d',
  'operations': {
    'write': {
      'outputSource': 'dbfs:/mnt/test_data/parquet/Customers_new',
      'append': False,
      'id': 0,
      'childIds': [
        1
      ],
      'params': {
        'path': 'dbfs:/mnt/test_data/parquet/Customers_new'
      },
      'extra': {
        'name': 'InsertIntoHadoopFsRelationCommand',
        'destinationType': 'Parquet',
        'foo': 'bar4'
      }
    },
    'reads': [
      {
        'inputSources': [
          'dbfs:/mnt/test_data/csv/Customers.csv'
        ],
        'id': 1,
        'schema': [
          'b6112e12-9b90-46db-b919-bcc9c6280759',
          '9f0671fe-813e-4608-a870-adae8386c46e',
          '8dfcf4df-211c-48b0-8dec-1b1486dd0db4'
        ],
        'params': {
          'delimiter': ',',
          'inferschema': 'true',
          'header': 'true',
          'path': 'dbfs:/mnt/test_data/csv/Customers.csv'
        },
        'extra': {
          'name': 'LogicalRelation',
          'sourceType': 'CSV',
          'foo': 'bar3'
        }
      }
    ]
  },
  'systemInfo': {
    'name': 'spark',
    'version': '2.4.5'
  },
  'agentInfo': {
    'name': 'spline',
    'version': '0.5.3'
  },
  'extraInfo': {
    'appName': 'Databricks Shell',
    'dataTypes': [
      {
        '_typeHint': 'dt.Simple',
        'id': 'df02093b-d529-4c8d-b422-9ac468baa765',
        'name': 'integer',
        'nullable': True
      },
      {
        '_typeHint': 'dt.Simple',
        'id': '88f773f8-982c-4d6c-bed3-1600a99c5943',
        'name': 'string',
        'nullable': True
      }
    ],
    'attributes': [
      {
        'id': 'b6112e12-9b90-46db-b919-bcc9c6280759',
        'name': 'CustomerID',
        'dataTypeId': 'df02093b-d529-4c8d-b422-9ac468baa765'
      },
      {
        'id': '9f0671fe-813e-4608-a870-adae8386c46e',
        'name': 'FirstName',
        'dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
      },
      {
        'id': '8dfcf4df-211c-48b0-8dec-1b1486dd0db4',
        'name': 'LastName',
        'dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
      }
    ],
    'notebookInfo': {
      'obj': {
        'name': 'initialize_spline_original',
        'timestamp': 1623430575561,
        'notebookURL': 'adb-2323242424.azuredatabricks.net/',
        'mounts': [
          'dbfs:/mnt/datalake/',
          'dbfs:/mnt/landing_dde/',
          'dbfs:/mnt/landing_newc/',
          'dbfs:/mnt/landing_sourcedb/',
          'dbfs:/mnt/testmount/',
          'dbfs:/mnt/training/'
        ],
        'user': '[email protected]'
      }
    }
  }
}

NEW CODE: This is the new code which is using Databricks runtime version 7.5 and upgraded, and refactored JSON packages.

    %scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import play.api.libs.json._
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper


val splineConf: Configuration = StandardSplineConfigurationStack(spark)


spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
  val notebookInformationJson = dbutils.notebook.getContext.toJson
  val mapper = new ObjectMapper() with ScalaObjectMapper 
  mapper.registerModule(DefaultScalaModule)
val outerMap = mapper.readValue[Map[String, Object]](notebookInformationJson)
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)

  val notebookInfo = Map("notebookURL" -> (notebookURL),
    "user" -> (user),
    "name" -> (name),
    "mounts" -> (dbutils.fs.ls("/mnt").map(_.path)),
    "timestamp" -> (System.currentTimeMillis))

 val mapper1 = new ObjectMapper()
 val notebookInfoJson = mapper1.writeValueAsString(notebookInfo)

  override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
    override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
    override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
    override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
    override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
    override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
  })
})

NEW OUTPUT: The JSON object output looks like this for the new code:

{
  'id': '12812131038144',
  'operations': {
    'write': {
      'outputSource': 'dbfs:/mnt/test_data/delta/customers_join_orders_new',
      'append': False,
      'id': 0,
      'childIds': [
        1
      ],
      'params': {
        'path': 'dbfs:/mnt/test_data/delta/customers_join_orders_new'
      },
      'extra': {
        'destinationType': 'tahoe',
        'foo': 'bar4',
        'name': 'SaveIntoDataSourceCommand'
      }
    },
    'reads': [
      {
        'childIds': [
          
        ],
        'inputSources': [
          'dbfs:/mnt/test_data/csv/Orders.csv'
        ],
        'id': 4,
        'schema': [
          '2018',
          '2019',
          '2020',
          '2021',
          '2022',
          '2023'
        ],
        'params': {
          'inferschema': 'true',
          'header': 'true',
          'delimiter': ',',
          'path': 'dbfs:/mnt/test_data/csv/Orders.csv'
        },
        'extra': {
          'sourceType': 'csv',
          'foo': 'bar3',
          'name': 'LogicalRelation'
        }
      },
      {
        'childIds': [
          
        ],
        'inputSources': [
          'dbfs:/mnt/test_data/csv/Customers.csv'
        ],
        'id': 6,
        'schema': [
          '2065',
          '2066',
          '2067'
        ],
        'params': {
          'inferschema': 'true',
          'header': 'true',
          'delimiter': ',',
          'path': 'dbfs:/mnt/test_data/csv/Customers.csv'
        },
        'extra': {
          'sourceType': 'csv',
          'foo': 'bar3',
          'name': 'LogicalRelation'
        }
      }
    ],
    'other': [
      {
        'id': 3,
        'childIds': [
          4
        ],
        'schema': [
          '2018',
          '2019',
          '2020',
          '2021',
          '2022',
          '2023'
        ],
        'params': {
          'identifier': 'orders'
        },
        'extra': {
          'foo': 'bar5',
          'name': 'SubqueryAlias'
        }
      },
      {
        'id': 5,
        'childIds': [
          6
        ],
        'schema': [
          '2065',
          '2066',
          '2067'
        ],
        'params': {
          'identifier': 'customers'
        },
        'extra': {
          'foo': 'bar5',
          'name': 'SubqueryAlias'
        }
      },
      {
        'id': 2,
        'childIds': [
          3,
          5
        ],
        'schema': [
          '2018',
          '2019',
          '2020',
          '2021',
          '2022',
          '2023',
          '2065',
          '2066',
          '2067'
        ],
        'params': {
          'condition': {
            '_typeHint': 'expr.Binary',
            'symbol': '=',
            'dataTypeId': '05ffb715-9781-4019-aee3-21c77f80d2a1',
            'children': [
              {
                '_typeHint': 'expr.AttrRef',
                'refId': '2019'
              },
              {
                '_typeHint': 'expr.AttrRef',
                'refId': '2065'
              }
            ]
          },
          'hint': '',
          'joinType': 'INNER'
        },
        'extra': {
          'foo': 'bar5',
          'name': 'Join'
        }
      },
      {
        'id': 1,
        'childIds': [
          2
        ],
        'schema': [
          '2065',
          '2020',
          '2022',
          '2023'
        ],
        'params': {
          'projectList': [
            {
              '_typeHint': 'expr.AttrRef',
              'refId': '2065'
            },
            {
              '_typeHint': 'expr.AttrRef',
              'refId': '2020'
            },
            {
              '_typeHint': 'expr.AttrRef',
              'refId': '2022'
            },
            {
              '_typeHint': 'expr.AttrRef',
              'refId': '2023'
            }
          ]
        },
        'extra': {
          'foo': 'bar5',
          'name': 'Project'
        }
      }
    ]
  },
  'systemInfo': {
    'name': 'spark',
    'version': '3.0.1'
  },
  'agentInfo': {
    'name': 'spline',
    'version': '0.6.0'
  },
  'extraInfo': {
    'appName': 'Databricks Shell',
    'dataTypes': [
      {
        '_typeHint': 'dt.Simple',
        'id': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca',
        'name': 'integer',
        'nullable': True
      },
      {
        '_typeHint': 'dt.Simple',
        'id': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be',
        'name': 'double',
        'nullable': True
      },
      {
        '_typeHint': 'dt.Simple',
        'id': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4',
        'name': 'string',
        'nullable': True
      },
      {
        '_typeHint': 'dt.Simple',
        'id': '05ffb715-9781-4019-aee3-21c77f80d2a1',
        'name': 'boolean',
        'nullable': True
      }
    ],
    'notebookInfo': '{"traversableAgain":true,"empty":false}',
    'attributes': [
      {
        'id': '2018',
        'name': 'SalesOrderID',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2019',
        'name': 'CustomerID',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2020',
        'name': 'OrderQty',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2021',
        'name': 'ProductID',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2022',
        'name': 'UnitPrice',
        'dataTypeId': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be'
      },
      {
        'id': '2023',
        'name': 'LineTotal',
        'dataTypeId': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be'
      },
      {
        'id': '2065',
        'name': 'CustomerID',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2066',
        'name': 'FirstName',
        'dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
      },
      {
        'id': '2067',
        'name': 'LastName',
        'dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
      }
    ]
  }
}
1

There are 1 best solutions below

0
On

There is no need for explicit JSON serialization/deserialization. All you need to do is to pull necessary values from the notebook Context using appropriate getter methods, pack them into any serializable object (e.g. a Map) and put it into the Spline execution plan extras directly.

import org.apache.commons.configuration._

import za.co.absa.spline.harvester.postprocessing.AbstractPostProcessingFilter
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.HarvestingContext
import za.co.absa.spline.producer.model.v1_1._

import za.co.absa.spline.harvester.ExtraMetadataImplicits._
import za.co.absa.spline.harvester.SparkLineageInitializer._

val notebookInfo = {
  val ctx = dbutils.notebook.getContext
  val tags = ctx.tags
  val notebookName = ctx.notebookPath.get.split("/").last
  val notebookURL = s"${tags("browserHostName")}/?o=${tags("orgId")}${tags("browserHash")}"
  val user = tags("user")
  val mounts = dbutils.fs.ls("/mnt").map(_.path)
  
  Map(
    "notebookURL" -> notebookURL,
    "user" -> user,
    "name" -> notebookName,
    "mounts" -> mounts,
    "timestamp" -> System.currentTimeMillis
  )
}

val splineConf = StandardSplineConfigurationStack(spark)

spark.enableLineageTracking(new DefaultSplineConfigurer(spark, splineConf) {
  override def postProcessingFilter = new AbstractPostProcessingFilter {
    override def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext): ExecutionPlan = {
      plan.withAddedExtra(Map(
        "notebookInfo" -> notebookInfo
      ))
    }
  }
})

Note: I'm not using UserExtraMetadataProvider (it's deprecated and is removed from the upcoming major release). Instead, I'm using PostProcessingFilter which essentially is almost the same abstraction but with expanded responsibilities. (For more details see - https://github.com/AbsaOSS/spline-spark-agent#filters)