-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdegree.py
112 lines (89 loc) · 3.33 KB
/
degree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import sys
import pandas
import networkx as nx
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from graphframes import *
sc=SparkContext("local", "degree.py")
sqlContext = SQLContext(sc)
''' return the simple closure of the graph as a graphframe.'''
def simple(g):
# Extract edges and make a data frame of "flipped" edges
# YOUR CODE HERE
# Combine old and new edges. Distinctify to eliminate multi-edges
# Filter to eliminate self-loops.
# A multigraph with loops will be closured to a simple graph
# If we try to undirect an undirected graph, no harm done
# YOUR CODE HERE
''' Return a data frame of the degree distribution of each edge in the provided graphframe '''
def degreedist(g):
# Generate a DF with degree,count
# YOUR CODE HERE
def findDegree(G, ver):
degree = 0
for i in range(G.v):
if G.diri[ver][i] == 1:
degree += 1
if G.diri[ver][ver] == 1:
degree += 1
return degree
''' Read in an edgelist file with lines of the format id1<delim>id2
and return a corresponding graphframe. If "large" we assume
a header row and that delim = " ", otherwise no header and
delim = ","'''
def readFile(filename, large, sqlContext=sqlContext):
lines = sc.textFile(filename)
if large:
delim=" "
# Strip off header row.
lines = lines.mapPartitionsWithIndex(lambda ind,it: iter(list(it)[1:]) if ind==0 else it)
else:
delim=","
# Extract pairs from input file and convert to data frame matching
# schema for graphframe edges.
# YOUR CODE HERE
# Extract all endpoints from input file (hence flatmap) and create
# data frame containing all those node names in schema matching
# graphframe vertices
# YOUR CODE HERE
# Create graphframe g from the vertices and edges.
return g
# main stuff
# If you got a file, yo, I'll parse it.
if len(sys.argv) > 1:
filename = sys.argv[1]
if len(sys.argv) > 2 and sys.argv[2]=='large':
large=True
else:
large=False
print("Processing input file " + filename)
g = readFile(filename, large)
print("Original graph has " + str(g.edges.count()) + " directed edges and " + str(g.vertices.count()) + " vertices.")
g2 = simple(g)
print("Simple graph has " + str(g2.edges.count()/2) + " undirected edges.")
distrib = degreedist(g2)
distrib.show()
nodecount = g2.vertices.count()
print("Graph has " + str(nodecount) + " vertices.")
out = filename.split("/")[-1]
print("Writing distribution to file " + out + ".csv")
distrib.toPandas().to_csv(out + ".csv")
# Otherwise, generate some random graphs.
else:
print("Generating random graphs.")
vschema = StructType([StructField("id", IntegerType())])
eschema = StructType([StructField("src", IntegerType()),StructField("dst", IntegerType())])
gnp1 = nx.gnp_random_graph(100, 0.05, seed=1234)
gnp2 = nx.gnp_random_graph(2000, 0.01, seed=5130303)
gnm1 = nx.gnm_random_graph(100,1000, seed=27695)
gnm2 = nx.gnm_random_graph(1000,100000, seed=9999)
todo = {"gnp1": gnp1, "gnp2": gnp2, "gnm1": gnm1, "gnm2": gnm2}
for gx in todo:
print("Processing graph " + gx)
v = sqlContext.createDataFrame(sc.parallelize(todo[gx].nodes()), vschema)
e = sqlContext.createDataFrame(sc.parallelize(todo[gx].edges()), eschema)
g = simple(GraphFrame(v,e))
distrib = degreedist(g)
print("Writing distribution to file " + gx + ".csv")
distrib.toPandas().to_csv(gx + ".csv")