Last active
October 6, 2015 14:01
-
-
Save tanmaykm/364395756f549950038b to your computer and use it in GitHub Desktop.
Twitter link graph
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Elly | |
using HadoopBlocks | |
@everywhere begin | |
#const INP = "hdfs://root@" * string(getipaddr()) * ":9000/twitter_rv.net" | |
#const COLSEP = '\t' | |
const INP = "hdfs://root@" * string(getipaddr()) * ":9000/twitter_small.csv" | |
# const YARNHOST = string(getipaddr() | |
#const INP = "hdfs://tan@localhost:9000/twitter_small.csv" | |
const COLSEP = ',' | |
const MAXNODE = 11316811 | |
const YARNHOST = "localhost" | |
function findrow(r::HadoopBlocks.HdfsBlockReader, iter_status) | |
rec = HadoopBlocks.find_rec(r, iter_status, Matrix, '\n', COLSEP) | |
#HadoopBlocks.logmsg("findrow found rec:$rec") | |
rec | |
end | |
function to_id(v::AbstractString) | |
v = strip(v) | |
isempty(v) ? 0 : parse(Int32, v) | |
end | |
to_id(v::Number) = Int32(v) | |
function maprow(rec) | |
HadoopBlocks.logmsg("map starting...") | |
L = size(rec,1) | |
I = Int32[] | |
J = Int32[] | |
for idx in 1:L | |
i = to_id(rec[idx,2]) | |
j = to_id(rec[idx,1]) | |
if (i > 0) && (j > 0) | |
push!(I, i) | |
push!(J, j) | |
end | |
end | |
L = length(I) | |
S = sparse(I, J, ones(L), MAXNODE, MAXNODE) | |
HadoopBlocks.logmsg("map finished.") | |
rets = Any[] | |
push!(rets, S) | |
rets | |
end | |
function collectrow(results, rec) | |
HadoopBlocks.logmsg("collect starting...") | |
isempty(rec) && (return results) | |
if results == nothing | |
results = rec | |
else | |
results = results .+ rec | |
end | |
HadoopBlocks.logmsg("collect finished.") | |
results | |
end | |
function reducerow(reduced, results...) | |
HadoopBlocks.logmsg("reduce starting...") | |
for res in results | |
if nothing == reduced | |
reduced = res | |
else | |
reduced = reduced .+ res | |
end | |
end | |
HadoopBlocks.logmsg("reduce finished.") | |
reduced | |
end | |
end | |
function wait_results(j_mon) | |
loopstatus = true | |
while(loopstatus) | |
sleep(5) | |
jstatus,jstatusinfo = status(j_mon,true) | |
((jstatus == "error") || (jstatus == "complete")) && (loopstatus = false) | |
(jstatus == "running") && println("$(j_mon): $(jstatusinfo)% complete...") | |
end | |
wait(j_mon) | |
println("time taken (total time, wait time, run time): $(times(j_mon))") | |
println("") | |
end | |
function submit_job() | |
yarncm = YarnManager(yarnhost=YARNHOST, rmport=8032, schedport=8030, launch_timeout=120); | |
addprocs(yarncm; np=10, env=Dict("JULIA_PKGDIR"=>Pkg.dir()), mem=4096); | |
println("nworkers: $(nworkers())") | |
@everywhere require("demo.jl") | |
HadoopBlocks.Scheduler.prep_remotes(true) | |
j = dmapreduce(MRHdfsFileInput([INP], findrow), maprow, collectrow, reducerow) | |
wait_results(j) | |
R = results(j) | |
println(R) | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# dataset: http://parsa.epfl.ch/cloudsuite/graph.html | |
# 1,468,365,182 rows | |
# 1000000 rows => 5 secs | |
function store(N, A) | |
open("$N.ser", "w") do f | |
serialize(f, A) | |
end | |
nothing | |
end | |
function load(N) | |
open("$N.ser") do f | |
return deserialize(f) | |
end | |
end | |
function preprocess() | |
f = open("twitter_rv.net"); | |
lno = 0; | |
t1 = time(); | |
I = Int32[] | |
J = Int32[] | |
while !eof(f) | |
lno += 1 | |
l = readline(f) | |
id1, id2 = map((x)->parse(Int32, strip(x)), split(l)) | |
if (lno % 1000000) == 0 | |
t2 = time() | |
println("$lno : $id1 -> $id2 in $(t2-t1) secs") | |
t1 = t2 | |
end | |
push!(I, id1) | |
push!(J, id2) | |
end | |
println("storing...") | |
store("I", I) | |
store("J", J) | |
I, J | |
end | |
function process(S) | |
E = eigs(S; nev=1) | |
store("E", E) | |
E | |
end | |
load_sparse() = load_sparse(load("I"), load("J")) | |
function load_sparse(I, J) | |
println("creating sparse matrix...") | |
V = ones(length(I)) | |
S = sparse(I, J, V) | |
println("done") | |
S | |
end | |
load_eig() = load("E") | |
function find_extreme_pos(S, E) | |
EV = convert(Array{Float64}, E[2]) | |
P = S * EV | |
P = abs(P[:,1]) | |
sp = sortperm(P) | |
infl = sp[end-9:end] | |
newb = sp[1:10] | |
infl, newb | |
end | |
function count_connections(S, ids) | |
for id in ids | |
println("id:$id connections:$(nnz(S[:,id]))") | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment