I do have the following dataframe, which contains all the paths within a tree after going through all nodes. For each jump between nodes, a row will be created where "dist" is the number of nodes so far, "node" the current node and "path" the path so far.

dist   |  node     |  path
0      |     1     |    [1]   
1      |     2     |    [1,2] 
1      |     5     |    [1,5] 
2      |     3     |    [1,2,3] 
2      |     4     |    [1,2,4] 

At the end I just want to have a dataframe containing the complete paths without the intermediate steps:

dist   |  node     |  path
1      |     5     |    [1,5] 
2      |     3     |    [1,2,3] 
2      |     4     |    [1,2,4]

I also tried by having the path column as a string ("1;2;3") and comparing which row is a substring from each other, however i could not find a way to do that.


I found my old code and created an adapted example for your problem. I used the spark graph library Graphframes for this. The path can be determined by a Pregel like message aggregation loop.

Here the code. First import all modules

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

import pyspark.sql.functions as f
from graphframes import GraphFrame
from pyspark.sql.types import *

from graphframes.lib import *
# shortcut for the aggregate message object from the graphframes.lib

# to plot the graph
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

spark = (SparkSession


Then create a sample dataset

# create dataframe
raw_data = [

schema = ["src","dst"]
data = spark.createDataFrame(data=raw_data, schema = schema)

|  0|  1|
|  1|  2|
|  1|  5|
|  2|  3|
|  2|  4|
|  a|  b|
|  b|  c|
|  c|  d|

For visualisation run

plotData_1 ="src","dst").rdd.collect()
plotData_2 = np.array(plotData_1)

for row in plotData_2:


options = {
    'node_color': 'orange',
    'node_size': 500,
    'width': 2,
    'arrowstyle': '-|>',
    'arrowsize': 20,

nx.draw(G, arrows=True, **options,with_labels=True)

plot of graph

With this message aggregation algorithm you find the paths as you searched them. if you set the flag show_steps to True the results of each step is shown which helps to understand.

# if flag is true print results within the loop for debuging
# max itertions of the loop, should be larger then the longest expected path

# create vertices from edge data set
vertices=("src").union("dst")).distinct().withColumnRenamed('src', 'id'))

# create graph to get in and out degrees
gx = GraphFrame(vertices, edges)
# calclulate in and out degrees of each node

  print("in and out degrees")

# create intial vertices
               # join out degrees on vertices
               # join in degree on vertices
               # define root, childs in the middle and leafs of the path in order to distinguish full paths later on
               # define message with all information [array(id) and array(nodeType)] to be send to the next noe
               # remove columns that are not used anymore

  print("init vertices")

# update graph object with init vertices
gx = GraphFrame(init_vertices, edges)

# define empty dataframe to append found paths on
results = sqlContext.createDataFrame(

# start loopp for mesage aggregation. Set a max_iter value which has to be larger as the longest path expected

for iter_ in range(max_iter):

        print("iteration step=" + str(iter_))
    # define the message that should be send. Here we send a message to the source node and we take the column message from the destination source we send backward
    msgToSrc = AM.dst["message"]
    agg = gx.aggregateMessages(
      f.collect_set(AM.msg).alias("aggMess"),  # aggregation function is a collect into an array (attention!! this can be an expensive operation in terms of shuffel)
      print("aggregated message")
    # stop loop if no more agg messages collected
      print("All paths found in " + str(iter_) + " iterations")
    # get new vertices to send into next round. Here we have to prepare the next message columns all _column_names are temporary columns for calculation purpose only
                     # join initial data to aggregation in order to have to nodeType of the vertice
                     # exploe the nested array with the path and the nodeType
                     # put the path aray into a seperate column 
                     # put the node type into a seperate column
                     # deside if a path is complete. A path is complete if the vertices type is a root and the message type is a leaf
                     .withColumn("pathComplete",f.when(((f.col("nodeType")=="root") & (f.col("_typeMsg")=="leaf")),True).otherwise(False))
                     # append the curent vertice id to the path array that is send forward
                     # merge together the path array and the nodeType array for the new message object
      print("new vertices with all temp columns")
    # add complete paths to the result dataframe

    # chache the vertices for next iteration and only push forward the two relevant columns in order to reduce data shuffeling between spark executors
    cachedNewVertices = AM.getCachedDataFrame("id","message"))
    # create new updated graph object for next iteration
    gx = GraphFrame(cachedNewVertices, gx.edges)
print("Collecting result set")  

it shows then the correct results

All paths found in 3 iterations
Collecting result set
|       paths|
|   [0, 1, 5]|
|[0, 1, 2, 3]|
|[0, 1, 2, 4]|
|[a, b, c, d]|

to get your final dataframe you can join it back or take the first and last element of the array into separate columns

         .withColumn("dist",f.element_at(f.col("paths"), 1))
         .withColumn("node",f.element_at(f.col("paths"), -1))

|       paths|dist|node|
|   [0, 1, 5]|   0|   5|
|[0, 1, 2, 3]|   0|   3|
|[0, 1, 2, 4]|   0|   4|
|[a, b, c, d]|   a|   d|

You can write the same algorithm with the Graphframes Pregel API I suppose.

P.S: The algorithm in this form might cause problems if the graph has lops or backward directed edges. I had another algorithm to first clean up loops and cycles