I am trying to register Databricks notebook lineage to Azure Purview through spline and apacheatlas api. There are two versions of the code: 1) is the original code which uses databricks runtime version 6.4 and is working as expected but we need to run this in a newer runtime version of at least version 7.5 and above so there is a 2) second version of the code which is refactored for runtime version 7.5. Specifically the new code needs new JSON packages but the output(see below) does not match the expected output of the original code (also shown below). Needing to re-write the code for it to execute correctly because current new code has errors. Thanks
ORIGINAL OLD CODE: The original code which uses Databricks Runtime version6.4 is below
%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
//override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
//val test = dbutils.notebook.getContext.notebookPath
val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"name" -> name,
"mounts" -> dbutils.fs.ls("/mnt").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
override protected def userExtraMetadataProvider: UserExtraMetadataProvider = new UserExtraMetadataProvider {
override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
}
})
ORIGINAL EXPECTED OUTPUT: Here is the expected JSON output from the original code
{
'id': '618c92e1-ae79-491e-b6fd-b5080dc7ef8d',
'operations': {
'write': {
'outputSource': 'dbfs:/mnt/test_data/parquet/Customers_new',
'append': False,
'id': 0,
'childIds': [
1
],
'params': {
'path': 'dbfs:/mnt/test_data/parquet/Customers_new'
},
'extra': {
'name': 'InsertIntoHadoopFsRelationCommand',
'destinationType': 'Parquet',
'foo': 'bar4'
}
},
'reads': [
{
'inputSources': [
'dbfs:/mnt/test_data/csv/Customers.csv'
],
'id': 1,
'schema': [
'b6112e12-9b90-46db-b919-bcc9c6280759',
'9f0671fe-813e-4608-a870-adae8386c46e',
'8dfcf4df-211c-48b0-8dec-1b1486dd0db4'
],
'params': {
'delimiter': ',',
'inferschema': 'true',
'header': 'true',
'path': 'dbfs:/mnt/test_data/csv/Customers.csv'
},
'extra': {
'name': 'LogicalRelation',
'sourceType': 'CSV',
'foo': 'bar3'
}
}
]
},
'systemInfo': {
'name': 'spark',
'version': '2.4.5'
},
'agentInfo': {
'name': 'spline',
'version': '0.5.3'
},
'extraInfo': {
'appName': 'Databricks Shell',
'dataTypes': [
{
'_typeHint': 'dt.Simple',
'id': 'df02093b-d529-4c8d-b422-9ac468baa765',
'name': 'integer',
'nullable': True
},
{
'_typeHint': 'dt.Simple',
'id': '88f773f8-982c-4d6c-bed3-1600a99c5943',
'name': 'string',
'nullable': True
}
],
'attributes': [
{
'id': 'b6112e12-9b90-46db-b919-bcc9c6280759',
'name': 'CustomerID',
'dataTypeId': 'df02093b-d529-4c8d-b422-9ac468baa765'
},
{
'id': '9f0671fe-813e-4608-a870-adae8386c46e',
'name': 'FirstName',
'dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
},
{
'id': '8dfcf4df-211c-48b0-8dec-1b1486dd0db4',
'name': 'LastName',
'dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
}
],
'notebookInfo': {
'obj': {
'name': 'initialize_spline_original',
'timestamp': 1623430575561,
'notebookURL': 'adb-2323242424.azuredatabricks.net/',
'mounts': [
'dbfs:/mnt/datalake/',
'dbfs:/mnt/landing_dde/',
'dbfs:/mnt/landing_newc/',
'dbfs:/mnt/landing_sourcedb/',
'dbfs:/mnt/testmount/',
'dbfs:/mnt/training/'
],
'user': '[email protected]'
}
}
}
}
NEW CODE: This is the new code which is using Databricks runtime version 7.5 and upgraded, and refactored JSON packages.
%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import play.api.libs.json._
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
val notebookInformationJson = dbutils.notebook.getContext.toJson
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val outerMap = mapper.readValue[Map[String, Object]](notebookInformationJson)
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> (notebookURL),
"user" -> (user),
"name" -> (name),
"mounts" -> (dbutils.fs.ls("/mnt").map(_.path)),
"timestamp" -> (System.currentTimeMillis))
val mapper1 = new ObjectMapper()
val notebookInfoJson = mapper1.writeValueAsString(notebookInfo)
override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
})
})
NEW OUTPUT: The JSON object output looks like this for the new code:
{
'id': '12812131038144',
'operations': {
'write': {
'outputSource': 'dbfs:/mnt/test_data/delta/customers_join_orders_new',
'append': False,
'id': 0,
'childIds': [
1
],
'params': {
'path': 'dbfs:/mnt/test_data/delta/customers_join_orders_new'
},
'extra': {
'destinationType': 'tahoe',
'foo': 'bar4',
'name': 'SaveIntoDataSourceCommand'
}
},
'reads': [
{
'childIds': [
],
'inputSources': [
'dbfs:/mnt/test_data/csv/Orders.csv'
],
'id': 4,
'schema': [
'2018',
'2019',
'2020',
'2021',
'2022',
'2023'
],
'params': {
'inferschema': 'true',
'header': 'true',
'delimiter': ',',
'path': 'dbfs:/mnt/test_data/csv/Orders.csv'
},
'extra': {
'sourceType': 'csv',
'foo': 'bar3',
'name': 'LogicalRelation'
}
},
{
'childIds': [
],
'inputSources': [
'dbfs:/mnt/test_data/csv/Customers.csv'
],
'id': 6,
'schema': [
'2065',
'2066',
'2067'
],
'params': {
'inferschema': 'true',
'header': 'true',
'delimiter': ',',
'path': 'dbfs:/mnt/test_data/csv/Customers.csv'
},
'extra': {
'sourceType': 'csv',
'foo': 'bar3',
'name': 'LogicalRelation'
}
}
],
'other': [
{
'id': 3,
'childIds': [
4
],
'schema': [
'2018',
'2019',
'2020',
'2021',
'2022',
'2023'
],
'params': {
'identifier': 'orders'
},
'extra': {
'foo': 'bar5',
'name': 'SubqueryAlias'
}
},
{
'id': 5,
'childIds': [
6
],
'schema': [
'2065',
'2066',
'2067'
],
'params': {
'identifier': 'customers'
},
'extra': {
'foo': 'bar5',
'name': 'SubqueryAlias'
}
},
{
'id': 2,
'childIds': [
3,
5
],
'schema': [
'2018',
'2019',
'2020',
'2021',
'2022',
'2023',
'2065',
'2066',
'2067'
],
'params': {
'condition': {
'_typeHint': 'expr.Binary',
'symbol': '=',
'dataTypeId': '05ffb715-9781-4019-aee3-21c77f80d2a1',
'children': [
{
'_typeHint': 'expr.AttrRef',
'refId': '2019'
},
{
'_typeHint': 'expr.AttrRef',
'refId': '2065'
}
]
},
'hint': '',
'joinType': 'INNER'
},
'extra': {
'foo': 'bar5',
'name': 'Join'
}
},
{
'id': 1,
'childIds': [
2
],
'schema': [
'2065',
'2020',
'2022',
'2023'
],
'params': {
'projectList': [
{
'_typeHint': 'expr.AttrRef',
'refId': '2065'
},
{
'_typeHint': 'expr.AttrRef',
'refId': '2020'
},
{
'_typeHint': 'expr.AttrRef',
'refId': '2022'
},
{
'_typeHint': 'expr.AttrRef',
'refId': '2023'
}
]
},
'extra': {
'foo': 'bar5',
'name': 'Project'
}
}
]
},
'systemInfo': {
'name': 'spark',
'version': '3.0.1'
},
'agentInfo': {
'name': 'spline',
'version': '0.6.0'
},
'extraInfo': {
'appName': 'Databricks Shell',
'dataTypes': [
{
'_typeHint': 'dt.Simple',
'id': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca',
'name': 'integer',
'nullable': True
},
{
'_typeHint': 'dt.Simple',
'id': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be',
'name': 'double',
'nullable': True
},
{
'_typeHint': 'dt.Simple',
'id': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4',
'name': 'string',
'nullable': True
},
{
'_typeHint': 'dt.Simple',
'id': '05ffb715-9781-4019-aee3-21c77f80d2a1',
'name': 'boolean',
'nullable': True
}
],
'notebookInfo': '{"traversableAgain":true,"empty":false}',
'attributes': [
{
'id': '2018',
'name': 'SalesOrderID',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2019',
'name': 'CustomerID',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2020',
'name': 'OrderQty',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2021',
'name': 'ProductID',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2022',
'name': 'UnitPrice',
'dataTypeId': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be'
},
{
'id': '2023',
'name': 'LineTotal',
'dataTypeId': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be'
},
{
'id': '2065',
'name': 'CustomerID',
'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
},
{
'id': '2066',
'name': 'FirstName',
'dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
},
{
'id': '2067',
'name': 'LastName',
'dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
}
]
}
}
There is no need for explicit JSON serialization/deserialization. All you need to do is to pull necessary values from the notebook
Context
using appropriate getter methods, pack them into any serializable object (e.g. aMap
) and put it into the Spline execution plan extras directly.Note: I'm not using
UserExtraMetadataProvider
(it's deprecated and is removed from the upcoming major release). Instead, I'm usingPostProcessingFilter
which essentially is almost the same abstraction but with expanded responsibilities. (For more details see - https://github.com/AbsaOSS/spline-spark-agent#filters)