I have been experimenting with the Graphx APIs of Spark, primarily to learn and have a feel of how to use them. In the process, I have to load an adjacency matrix into a graph. The matrix dataset is here.
From the site, the matrix is described as
A number of employees in a factory was interviewed on a question: “Do you like to work with your co-worker?”. Possible answers are 1 for yes and 0 for no. Each employee gave an answer for each other employee thus creating an adjecancy matrix.
So, I have decided to name the employees as English alphabets ("A" onwards). Employees form the nodes of the graph, and their preferences for their co-workers form the edges. I haven't found any straightforward way in Spark to achieve this; my R-programmer friends tell me that it is quite easy to do so, in their world. So, I set upon writing a naive implementation to do so. Here's the code
val conf = new SparkConf().setMaster("local[*]").setAppName("GraphExploration App")
val spark = SparkSession
.builder()
.appName("Spark SQL: beginners exercise")
.getOrCreate()
val sc = SparkContext.getOrCreate(conf)
val df = spark.read.csv("./BlogInputs/sociogram-employees-un.csv").cache
val allRows = df.toLocalIterator.toIndexedSeq
type EmployeeVertex = (Long,String)
val employeesWithNames = (0 until allRows.length).map(i => (i.toLong,((i + 'A').toChar.toString())))
val columnNames = (0 until allRows.length).map(i => ("_c" + i)).toIndexedSeq // It is a square matrix; rows == columns
val edgesAsCollected = (for {
rowIndex <- 0 until df.count.toInt
colIndex <- 0 until df.count.toInt
if (rowIndex != colIndex)
} yield {
if (allRows(rowIndex).fieldIndex(columnNames(colIndex)) == 1)
Some(Edge(employeesWithNames(rowIndex)._1,employeesWithNames(colIndex)._1,"Likes"))
else
None
}).flatten
val employeeNodes = sc.parallelize(employeesWithNames)
val edges = sc.parallelize(edgesAsCollected)
val employeeGraph = Graph(sc.parallelize(employeesWithNames),edges,"Nobody")
Here is the schema:
scala>df.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
|-- _c5: string (nullable = true)
|-- _c6: string (nullable = true)
|-- _c7: string (nullable = true)
|-- _c8: string (nullable = true)
|-- _c9: string (nullable = true)
|-- _c10: string (nullable = true)
|-- _c11: string (nullable = true)
|-- _c12: string (nullable = true)
|-- _c13: string (nullable = true)
|-- _c14: string (nullable = true)
|-- _c15: string (nullable = true)
|-- _c16: string (nullable = true)
|-- _c17: string (nullable = true)
|-- _c18: string (nullable = true)
|-- _c19: string (nullable = true)
|-- _c20: string (nullable = true)
|-- _c21: string (nullable = true)
|-- _c22: string (nullable = true)
|-- _c23: string (nullable = true)
|-- _c24: string (nullable = true)
.. and first few rows here
scala> df.show
16/12/21 07:12:00 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_8_0]
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| 0| 1| 0| 1| 1| 0| 1| 1| 1| 0| 0| 1| 0| 1| 1| 0| 1| 1| 0| 1| 0| 1| 0| 1| 1|
| 1| 0| 0| 1| 0| 0| 1| 0| 1| 0| 0| 1| 0| 0| 1| 0| 1| 0| 1| 0| 0| 1| 0| 1| 0|
| 0| 1| 0| 1| 1| 0| 0| 0| 1| 0| 0| 0| 0| 1| 1| 0| 0| 1| 0| 0| 0| 1| 1| 0| 1|
| 0| 1| 1| 0| 0| 0| 1| 0| 0| 0| 1| 1| 0| 1| 0| 0| 1| 1| 0| 0| 1| 0| 1| 1| 0|
This serves my purpose, but I feel there may be a different way. My very little knowledge of Spark's MLLib APIs is perhaps a barrier. Could someone please comment on this? Better even, could someone show me a better yet simple way (by editing my code, if necessary)?
I find @DanieldePaula's suggestion acceptable as an answer, for the case at hand:
Thanks, Daniel!