Spline, pyspark: How to get spline console output in my python code?

725 Views Asked by At

In my pyspark code im reading test csv file, filtering it, and writing. All that actions i can see in console with LoggingLineageDispatcher in json format, but i want to find a way to get this data right in my python code. Cant find any options for that.

My pyspark code:

session = create_spark_session()
test_df: DataFrame = session.read.csv(
    "test.csv",
    sep =',', 
    header = True
)

mc_df = test_df.filter(col("Card Type Code") == "MC")
mc_df.write.csv("mc.csv")
session.stop() 

Im running it with

spark-submit \
--packages za.co.absa.spline.agent.spark:spark-3.2-spline-agent-bundle_2.12:0.7.8 \
--conf "spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener" \
--conf "spark.spline.lineageDispatcher=logging" \
spline_test.py

Example of console output, that i want get in python code:

22/05/27 18:52:50 INFO LoggingLineageDispatcher: ["plan",{"id":"8408ed4b-2f96-5076- 
aaab-59ac3beb7568","name":"spline_test.py","operations":{"write": 
{"outputSource":"mc.csv","append":false,"id":"op- 
0","name":"InsertIntoHadoopFsRelationCommand","childIds":["op-1"],"params":{"path"....
1

There are 1 best solutions below

0
On

The short answer is yes, it is doable. But there are some but's. Let me elaborate on it. Spline agent is called by Spark driver from a separate thread, so to pull the lineage content into a variable you need to do it in a concurrent manner. With Spark 2.x it's easier because the actions were blocking and by the time the control is returned the Spline work has already done, and all the dispatchers are called, so you can expect the lineage to be captured. However in Spark 3+ the event listeners are processed asynchronously to the actions, hence you need to implement some sort of synchronization and wait until the lineage content is ready and written into your variable. This is not that straight-forward, but is doable. We do it in our integration-tests. Take a look at the LineageCaptor class, and the usage in some tests, e.g. BasicIntegrationTests

So, in a nutshell, create a custom LineageDispatcher that would take the lineage information (an execution plan and an event objects) and put it into a "global" thread-safe variable that your code also has access to. The variable should be thread-safe, e.g. a Promiseor a concurrent collection. Then run the Spark action and let your code wait until the lineage info has arrived into that variable.

An example of a custom LineageDispatcher project can be found here - https://github.com/AbsaOSS/spline-getting-started/tree/main/spark-agent-extension-example Basically, that project builds a JAR that contains your custom extension. Include that JAR into the Spark driver classpath together with the Spline agent JAR, then register and activate it in the Spline configuration, e.g.

pyspark ... \
    --jars my-extension.jar \
    --packages za.co.absa.spline.agent.spark:spark-2.4-spline-agent-bundle_2.12 \
    --conf "spark.spline.lineageDispatcher.my.className=org.example.MyLineageDispatcher" \
    --conf "spark.spline.lineageDispatcher=my" \
    ...