-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcentrality.py
43 lines (32 loc) · 1.21 KB
/
centrality.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions
from graphframes import *
from pyspark.sql.functions import explode
sc=SparkContext("local", "degree.py")
sqlContext = SQLContext(sc)
def closeness(g):
# Get list of vertices. We'll generate all the shortest paths at
# once using this list.
# YOUR CODE HERE
# first get all the path lengths.
# Break up the map and group by ID for summing
# Sum by ID
# Get the inverses and generate desired dataframe.
print("Reading in graph for problem 2.")
graph = sc.parallelize([('A','B'),('A','C'),('A','D'),
('B','A'),('B','C'),('B','D'),('B','E'),
('C','A'),('C','B'),('C','D'),('C','F'),('C','H'),
('D','A'),('D','B'),('D','C'),('D','E'),('D','F'),('D','G'),
('E','B'),('E','D'),('E','F'),('E','G'),
('F','C'),('F','D'),('F','E'),('F','G'),('F','H'),
('G','D'),('G','E'),('G','F'),
('H','C'),('H','F'),('H','I'),
('I','H'),('I','J'),
('J','I')])
e = sqlContext.createDataFrame(graph,['src','dst'])
v = e.selectExpr('src as id').unionAll(e.selectExpr('dst as id')).distinct()
print("Generating GraphFrame.")
g = GraphFrame(v,e)
print("Calculating closeness.")
closeness(g).sort('closeness',ascending=False).show()