python
from operator import add
import numpy as np
import numpy.random
Lets consider reading in the matrices first. In order for the code to be generic, we will detect the dimensions of the matrix from the data itself. We do this by computing the maximum \(i\) and \(j\) indices of our data.
python
atext = sc.textFile("a2/mats/a_100x200.txt").map(lambda s: s.split(" ")).cache()
btext = sc.textFile("a2/mats/b_200x100.txt").map(lambda s: s.split(" ")).cache()
# lets get the dimensions of the matrices (maybe it is sparse)
# assume matrices are mxn and nxp
m = atext.map(lambda s: int(s[0])).reduce(max)
While this might be ok for a single value that we are interested in, it is not so good in the present case where we will need to do it twice. So lets change the code to compute both dimensions in a single pass.
python
# lets do better and do it in one sweep.
(m,n) = atext.map(lambda s: (int(s[0]), int(s[1]))) \
.reduce(lambda a,b: map(max, zip(a,b)))
(n1,p) = btext.map(lambda s: (int(s[0]), int(s[1]))) \
.reduce(lambda a,b: map(max, zip(a,b)))
assert n == n1
m = m+1
n = n+1
p = p+1
print m, n, p
python
# First some dummy code to test ...
# Lets do the two pass approach first
x = sc.parallelize([((0,0),'a'), ((0,1),'b'), ((1,0),'c'), ((1,1),'d')])
y = sc.parallelize([((0,0),'e'), ((0,1),'f'), ((1,0),'g'), ((1,1),'h')])
# j, ('M', i, m_ij)
x1 = x.map(lambda (k,v): (k[1], (k[0],v)))
# j, ('N', k, n_jk)
y1 = y.map(lambda (k,v): (k[0], (k[1],v)))
# now we gather by the common key j
# a function to reduce (j, ((i, m_ij), (k, n_jk))) to ((i,k), m_ij * n_jk)
z = x1.join(y1).map(lambda (k,v): ((v[0][0], v[1][0]), v[0][1]+v[1][1])) \
.reduceByKey(lambda a,b: '+'.join([a,b]))
sorted(z.collect())
Let’s also write a simple sequential check for our code … so that we know our code is correct
python
def load_matrix(fname, sz):
M = np.matrix(np.zeros(sz))
with open(fname) as f:
content = f.readlines()
for l in content:
s = l.split()
M[int(s[0]), int(s[1])] = float(s[2])
return M
A = load_matrix("a2/mats/a_100x200.txt", (100, 200))
B = load_matrix("a2/mats/b_200x100.txt", (200, 100))
C = A*B
print C
python
# lets group the matrices into blocks
block_size = 2
def fill_block(B, x):
B[x[0]%block_size, x[1]%block_size] = x[2]
return B
a = atext.map( lambda s:((int(s[0])/block_size, (int(s[1]))/block_size), (int(s[0]), int(s[1]), float(s[2])))) \
.aggregateByKey(np.matrix(np.zeros((block_size, block_size))), fill_block, add).cache()
b = btext.map(lambda s: ( (int(s[0])/block_size, (int(s[1]))/block_size), (int(s[0]), int(s[1]), float(s[2])))) \
.aggregateByKey(np.matrix(np.zeros((block_size, block_size))), fill_block, add).cache()
Now that we have matrices \(A\) and \(B\) in the block format, we can easily write our matrix-multiplication function. Since our grouped matrices behave like regular matrices, let us write some dummy code that is easy to debug.
Now we can use the same code on our blocked matrices …
python
a1 = a.map(lambda (k,v): (k[1], (k[0],v)))
b1 = b.map(lambda (k,v): (k[0], (k[1],v)))
c = a1.join(b1).map(lambda (k,v): ((v[0][0], v[1][0]), v[0][1]*v[1][1])) \
.reduceByKey(add)
sorted(c.collect())
## Shallow graphs
For the shallow graph test, we need to compute \(A^2 + A\), so let us first compute that. We start by loading the graph as a blocked matrix.
python
gtext = sc.textFile("a2/graphs/Assign2_100.txt").map(lambda s: s.split(" ")).cache()
(m,n) = gtext.map(lambda s: (int(s[0]), int(s[1]))).reduce(lambda a,b: map(max, zip(a,b)))
assert m==n
block_size = 50
def fill_block(B, x):
B[x[0]%block_size, x[1]%block_size] = x[2]
return B
A = gtext.map(lambda s: ( (int(s[0])/block_size, (int(s[1]))/block_size), (int(s[0]), int(s[1]), float(s[2])))) \
.aggregateByKey(np.matrix(np.zeros((block_size, block_size))), fill_block, add).cache()
def is_full(mat):
return np.count_nonzero(mat) == np.prod(mat.shape)
# lets check if A is full ...
A.values().map(is_full).reduce(lambda x,y: x & y)
[((0, 1), matrix([[ 1., 1., 1., …, 1., 1., 1.],
[ 1., 1., 1., …, 1., 1., 1.],
[ 1., 1., 1., …, 1., 1., 1.],
…,
[ 1., 1., 1., …, 1., 1., 1.],
[ 1., 1., 1., …, 1., 1., 1.],
[ 1., 1., 1., …, 1., 1., 1.]])),
((1, 0), matrix([[ 1., 1., 1., …, 1., 1., 1.],
[ 1., 1., 1., …, 1., 1., 1.],
[ 1., 1., 1., …, 1., 1., 1.],
…,
[ 1., 1., 1., …, 1., 1., 1.],
[ 1., 1., 1., …, 1., 1., 1.],
[ 1., 1., 1., …, 1., 1., 1.]]))]
python
# good ... so lets compute A^2 + A
def addI(a):
k,v = a
if (k[0] == k[1]):
v = v + np.matrix(np.identity(block_size))
return (k[0], (k[1],v))
Ap1 = A.map(addI)
A1 = A.map(lambda (k,v): (k[1], (k[0],v)))
AsqpA = A1.join(Ap1).map(lambda (k,v): ((v[0][0], v[1][0]), v[0][1]*v[1][1])) \
.reduceByKey(add)
# now test if this is full ?
AsqpA.values().map(is_full).reduce(lambda x,y: x&y)
True
python
G = load_matrix("a2/graphs/Assign2_100.txt", (100, 100))
g2p1 = G*G + G
is_full(g2p1)