Spark GraphFrame Basics

Out of the box the Spark distribution does not contain the GraphFrames package. You simply need to download the jar from here and put it into the libexec\jars directory of the Spark download. Of course, restart the whole lot if already running. Whether the Pypi GraphFrames package for Python is needed is not clear to me.

Start Spark in the usual fashion

import findspark, os
from pyspark.sql import SparkSession
from pyspark import SparkContext

os.environ["JAVA_HOME"]="/Library/Java/JavaVirtualMachines/jdk1.8.0_202.jdk/Contents/Home"

print(findspark.find())
findspark.init()

sc = SparkContext.getOrCreate()
spark = SparkSession.Builder().appName('GraphFrames').getOrCreate()

and create a standard dataframe for both the nodes and the edges

from pyspark import SQLContext
sqlContext = SQLContext(sc)
nodes = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])

edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])

A graph frame is then simply a combination of these two frames

from graphframes import *
g = GraphFrame(nodes, edges)

People often wonder whether you can render a GraphFrame graph. This only makes sense if your graph is ‘small’. Just like it does not make sense to convert terabyte Spark dataframes to a Panda frame, you should not attempt to think of big-data graph frames as something you can visualize.
Still, if you want to do it, just take the frame of edges and hand it over to NetworkX.

import networkx as nx
gp = nx.from_pandas_edgelist(edges.toPandas(),'src','dst')
nx.draw(gp, with_labels = True)

png

On the analytic side, you can from here on enjoy the whole GraphFrames API:

g.inDegrees.show()  

+---+--------+
| id|inDegree|
+---+--------+
|  c|       1|
|  b|       2|
+---+--------+

including the page-rank algorithm

results = g.pageRank(resetProbability=0.01, maxIter=20)
results.vertices.select("id", "pagerank").show()


+-----+--------------------+
|   id|            pagerank|
+-----+--------------------+
| 14,6|   0.294145761242611|
| 10,5|  0.2022338632841195|
| 14,1| 0.07235944148922492|
| 0,15| 0.03654559907802415|
| 0,19| 0.03654604538590314|
| 2,18| 0.10774836041079427|
|12,17|  1.6440001759095513|
|10,19|   3.399398634973527|
| 9,11|   0.543782922230953|
| 16,2| 0.10746807938250608|
| 15,8|  0.6612202246354048|
|  4,8| 0.16755781414237972|
|  7,8| 0.23648950488934758|
| 0,12|0.036542159103863825|
|19,14|   9.203254033641397|
| 16,9|  0.9285135363356551|
|19,12|   6.099750937014576|
| 13,1| 0.07235119679030458|
| 2,16| 0.10746807938250608|
|  3,8| 0.13806308065786485|
+-----+--------------------+
only showing top 20 rows

The results of the pageranking can also be visualized with NetworkX, of course. Note that NetworkX has its own page-rank algorithm as well.
You can generate some predefined graphs via the example namespace

from graphframes.examples import Graphs
g = Graphs(sqlContext).gridIsingModel(20)

which looks like this

gp = nx.from_pandas_edgelist(g.edges.toPandas(),'src','dst')
nx.draw_spectral(gp, center=["2", "5"], scale=3.1)

png

When all is done, shut down your cluster:

spark.stop()