Skip to content

Commit 81475b9

Browse files
committed
small style edits
1 parent ee9d86f commit 81475b9

File tree

5 files changed

+37
-41
lines changed

5 files changed

+37
-41
lines changed

run-demo-cluster.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
/root/spark/bin/spark-submit \
44
--master `cat /root/spark-ec2/cluster-url` \
55
--class "distopt.driver" \
6-
--driver-memory 80423M \
6+
--driver-memory 8G \
77
--driver-java-options "-Dspark.local.dir=/mnt/spark,/mnt2/spark -XX:+UseG1GC" \
88
target/scala-2.10/cocoa-assembly-0.1.jar \
99
"$@"

src/main/scala/driver.scala

+6-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ object driver {
3333
val localIterFrac = options.getOrElse("localIterFrac","1.0").toDouble; // fraction of local points to be processed per round, H = localIterFrac * n
3434
val beta = options.getOrElse("beta","1.0").toDouble; // scaling parameter when combining the updates of the workers (1=averaging)
3535
val debugIter = options.getOrElse("debugIter","10").toInt // set to -1 to turn off debugging output
36+
val seed = options.getOrElse("seed","0").toInt // set seed for debug purposes
3637

3738
// print out inputs
3839
println("master: " + master); println("trainFile: " + trainFile);
@@ -41,7 +42,7 @@ object driver {
4142
println("testfile: " + testFile); println("justCoCoA " + justCoCoA);
4243
println("lambda: " + lambda); println("numRounds: " + numRounds);
4344
println("localIterFrac:" + localIterFrac); println("beta " + beta);
44-
println("debugIter " + debugIter);
45+
println("debugIter " + debugIter); println("seed " + seed);
4546

4647
// start spark context
4748
val conf = new SparkConf().setMaster(master)
@@ -72,22 +73,22 @@ object driver {
7273

7374

7475
// run CoCoA
75-
val (finalwCoCoA, finalalphaCoCoA) = CoCoA.runCoCoA(sc, data, n, wInit, numRounds, localIters, lambda, beta, chkptIter, testData, debugIter)
76+
val (finalwCoCoA, finalalphaCoCoA) = CoCoA.runCoCoA(sc, data, n, wInit, numRounds, localIters, lambda, beta, chkptIter, testData, debugIter, seed)
7677
OptUtils.printSummaryStatsPrimalDual("CoCoA", data, finalwCoCoA, finalalphaCoCoA, lambda, testData)
7778

7879
// optionally run other methods for comparison
7980
if(!justCoCoA) {
8081

8182
// run Mini-batch CD
82-
val (finalwMbCD, finalalphaMbCD) = MinibatchCD.runMbCD(sc, data, n, wInit, numRounds, localIters, lambda, beta, chkptIter, testData, debugIter)
83+
val (finalwMbCD, finalalphaMbCD) = MinibatchCD.runMbCD(sc, data, n, wInit, numRounds, localIters, lambda, beta, chkptIter, testData, debugIter, seed)
8384
OptUtils.printSummaryStatsPrimalDual("Mini-batch CD", data, finalwMbCD, finalalphaMbCD, lambda, testData)
8485

8586
// run Mini-batch SGD
86-
val finalwMbSGD = SGD.runSGD(sc, data, n, wInit, numRounds, localIters, lambda, local=false, beta, chkptIter, testData, debugIter)
87+
val finalwMbSGD = SGD.runSGD(sc, data, n, wInit, numRounds, localIters, lambda, local=false, beta, chkptIter, testData, debugIter, seed)
8788
OptUtils.printSummaryStats("Mini-batch SGD", data, finalwMbSGD, lambda, testData)
8889

8990
// run Local SGD
90-
val finalwLocalSGD = SGD.runSGD(sc, data, n, wInit, numRounds, localIters, lambda, local=true, beta, chkptIter, testData, debugIter)
91+
val finalwLocalSGD = SGD.runSGD(sc, data, n, wInit, numRounds, localIters, lambda, local=true, beta, chkptIter, testData, debugIter, seed)
9192
OptUtils.printSummaryStats("Local SGD", data, finalwLocalSGD, lambda, testData)
9293

9394
}

src/main/scala/solvers/CoCoA.scala

+14-16
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ object CoCoA {
3535
beta: Double,
3636
chkptIter: Int,
3737
testData: RDD[SparseClassificationPoint],
38-
debugIter: Int) : (Array[Double], RDD[(Int, Double)]) = {
38+
debugIter: Int,
39+
seed: Int) : (Array[Double], RDD[(Int, Double)]) = {
3940

4041
val parts = data.partitions.size // number of partitions of the data, K in the paper
4142
println("\nRunning CoCoA on "+n+" data examples, distributed over "+parts+" workers")
@@ -45,16 +46,16 @@ object CoCoA {
4546
var w = wInit
4647
val scaling = beta / parts;
4748

48-
for(t <- 1 until numRounds+1){
49+
for(t <- 1 to numRounds){
4950

5051
// zip alpha with data
5152
val zipData = alpha.zip(data)
5253

5354
// find updates to alpha, w
54-
val updates = zipData.mapPartitions(partitionUpdate(_,w,localIters,lambda,n,scaling),preservesPartitioning=true).persist()
55+
val updates = zipData.mapPartitions(partitionUpdate(_,w,localIters,lambda,n,scaling,seed+t),preservesPartitioning=true).persist()
5556
alpha = updates.map(kv => kv._2)
5657
val primalVariables = updates.map(kv => kv._1)
57-
val primalUpdates = primalVariables.mapPartitions(singleElementFromPartition,preservesPartitioning=true).reduce(_ plus _)
58+
val primalUpdates = primalVariables.mapPartitions(x => Iterator(x.next())).reduce(_ plus _)
5859
w = primalUpdates.times(scaling).plus(w)
5960

6061
// optionally calculate errors
@@ -75,13 +76,6 @@ object CoCoA {
7576
return (w, alpha)
7677
}
7778

78-
private def singleElementFromPartition(
79-
primalVariables: Iterator[Array[Double]]): Iterator[Array[Double]] = {
80-
var wVectorList = List[Array[Double]]()
81-
wVectorList = primalVariables.next() :: wVectorList
82-
return wVectorList.iterator
83-
}
84-
8579
/**
8680
* Performs one round of local updates using a given local dual algorithm,
8781
* here locaSDCA. Will perform localIters many updates per worker.
@@ -92,6 +86,7 @@ object CoCoA {
9286
* @param lambda
9387
* @param n
9488
* @param scaling this is the scaling factor beta/K in the paper
89+
* @param seed
9590
* @return
9691
*/
9792
private def partitionUpdate(
@@ -100,14 +95,15 @@ object CoCoA {
10095
localIters: Int,
10196
lambda: Double,
10297
n: Int,
103-
scaling: Double): Iterator[(Array[Double], (Int, Double))] = {
98+
scaling: Double,
99+
seed: Int): Iterator[(Array[Double], (Int, Double))] = {
104100

105101
val zipArr = zipData.toArray
106102
var localData = zipArr.map(x => x._2)
107103
var alpha = zipArr.map(x => x._1._2)
108104
val indices = (0 to localData.length-1).map(x => localData(x).index).toArray
109105
val alphaOld = alpha.clone
110-
val (deltaAlpha, deltaW) = localSDCA(localData, wInit, localIters, lambda, n, alpha, alphaOld)
106+
val (deltaAlpha, deltaW) = localSDCA(localData, wInit, localIters, lambda, n, alpha, alphaOld, seed)
111107

112108
alpha = alphaOld.plus(deltaAlpha.times(scaling))
113109
var wArray = Array.fill(localData.length)(Array(0.0))
@@ -134,6 +130,7 @@ object CoCoA {
134130
* @param n global number of points (needed for the primal-dual correspondence)
135131
* @param alpha
136132
* @param alphaOld
133+
* @param seed
137134
* @return deltaAlpha and deltaW, summarizing the performed local changes, see paper
138135
*/
139136
def localSDCA(
@@ -143,10 +140,11 @@ object CoCoA {
143140
lambda: Double,
144141
n: Int,
145142
alpha: Array[Double],
146-
alphaOld: Array[Double]): (Array[Double], Array[Double]) = {
143+
alphaOld: Array[Double],
144+
seed: Int): (Array[Double], Array[Double]) = {
147145
var w = wInit
148146
val nLocal = localData.length
149-
var r = new scala.util.Random
147+
var r = new scala.util.Random(seed)
150148
var deltaW = Array.fill(wInit.length)(0.0)
151149

152150
// perform local udpates
@@ -172,7 +170,7 @@ object CoCoA {
172170
val qii = x.dot(x)
173171
var newAlpha = 1.0
174172
if (qii != 0.0) {
175-
newAlpha = Math.min(Math.max(alpha(idx) - grad / qii, 0.0), 1.0)
173+
newAlpha = Math.min(Math.max((alpha(idx) - (grad / qii)), 0.0), 1.0)
176174
}
177175

178176
// update primal and dual variables

src/main/scala/solvers/MinibatchCD.scala

+9-14
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ object MinibatchCD {
3333
beta: Double,
3434
chkptIter: Int,
3535
testData: RDD[SparseClassificationPoint],
36-
debugIter: Int) : (Array[Double], RDD[(Int, Double)]) = {
36+
debugIter: Int,
37+
seed: Int) : (Array[Double], RDD[(Int, Double)]) = {
3738

3839
val parts = data.partitions.size // number of partitions of the data, K in the paper
3940
println("\nRunning Mini-batch CD on "+n+" data examples, distributed over "+parts+" workers")
@@ -43,16 +44,16 @@ object MinibatchCD {
4344
var w = wInit
4445
val scaling = beta / (parts * localIters);
4546

46-
for(t <- 1 until numRounds+1){
47+
for(t <- 1 to numRounds){
4748

4849
// zip alpha with data
4950
val zipData = alpha.zip(data)
5051

5152
// find updates to alpha, w
52-
val updates = zipData.mapPartitions(partitionUpdate(_,w,localIters,lambda,n,scaling),preservesPartitioning=true).persist()
53+
val updates = zipData.mapPartitions(partitionUpdate(_,w,localIters,lambda,n,scaling,seed+t),preservesPartitioning=true).persist()
5354
alpha = updates.map(kv => kv._2)
5455
val primalVariables = updates.map(kv => kv._1)
55-
val primalUpdates = primalVariables.mapPartitions(singleElementFromPartition,preservesPartitioning=true).reduce(_ plus _)
56+
val primalUpdates = primalVariables.mapPartitions(x => Iterator(x.next())).reduce(_ plus _)
5657
w = primalUpdates.times(scaling).plus(w)
5758

5859
// optionally calculate errors
@@ -73,13 +74,6 @@ object MinibatchCD {
7374
return (w, alpha)
7475
}
7576

76-
private def singleElementFromPartition(
77-
primalVariables: Iterator[Array[Double]]): Iterator[Array[Double]] = {
78-
var wVectorList = List[Array[Double]]()
79-
wVectorList = primalVariables.next() :: wVectorList
80-
return wVectorList.iterator
81-
}
82-
8377
/**
8478
* Performs one round of mini-batch CD updates
8579
*
@@ -97,7 +91,8 @@ object MinibatchCD {
9791
localIters: Int,
9892
lambda: Double,
9993
n: Int,
100-
scaling: Double): Iterator[(Array[Double], (Int, Double))] = {
94+
scaling: Double,
95+
seed: Int): Iterator[(Array[Double], (Int, Double))] = {
10196

10297
val zipArr = zipData.toArray
10398
var localData = zipArr.map(x => x._2)
@@ -106,7 +101,7 @@ object MinibatchCD {
106101
val alphaOld = alpha.clone
107102
var w = wInit
108103
val nLocal = localData.length
109-
var r = new scala.util.Random
104+
var r = new scala.util.Random(seed)
110105
var deltaW = Array.fill(wInit.length)(0.0)
111106

112107
// perform local udpates
@@ -132,7 +127,7 @@ object MinibatchCD {
132127
val qii = x.dot(x)
133128
var newAlpha = 1.0
134129
if (qii != 0.0) {
135-
newAlpha = Math.min(Math.max(alpha(idx) - grad / qii, 0.0), 1.0)
130+
newAlpha = Math.min(Math.max((alpha(idx) - (grad / qii)), 0.0), 1.0)
136131
}
137132

138133
// update primal and dual variables

src/main/scala/solvers/SGD.scala

+7-5
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ object SGD {
3636
beta: Double,
3737
chkptIter: Int,
3838
testData: RDD[SparseClassificationPoint],
39-
debugIter: Int) : Array[Double] = {
39+
debugIter: Int,
40+
seed: Int) : Array[Double] = {
4041

4142
val parts = data.partitions.size // number of partitions of the data, K in the paper
4243
println("\nRunning SGD (with local updates = "+local+") on "+n+" data examples, distributed over "+parts+" workers")
@@ -51,7 +52,7 @@ object SGD {
5152
scaling = beta / (parts * localIters)
5253
}
5354

54-
for(t <- 1 until numRounds+1){
55+
for(t <- 1 to numRounds){
5556

5657
// update step size
5758
val step = 1/(lambda*(t))
@@ -63,7 +64,7 @@ object SGD {
6364
}
6465

6566
// find updates to w
66-
val updates = data.mapPartitions(partitionUpdate(_, w, lambda, ((t-1) * localIters * parts), localIters, local, parts), preservesPartitioning = true).persist()
67+
val updates = data.mapPartitions(partitionUpdate(_, w, lambda, ((t-1) * localIters * parts), localIters, local, parts, seed+t), preservesPartitioning = true).persist()
6768
val primalUpdates = updates.reduce(_ plus _)
6869
if (local) {
6970
w = primalUpdates.times(scaling).plus(w)
@@ -102,11 +103,12 @@ object SGD {
102103
t:Double,
103104
localIters:Int,
104105
local:Boolean,
105-
parts:Int) : Iterator[Array[Double]] = {
106+
parts:Int,
107+
seed: Int) : Iterator[Array[Double]] = {
106108

107109
val dataArr = localData.toArray
108110
val nLocal = dataArr.length
109-
var r = new scala.util.Random
111+
var r = new scala.util.Random(seed)
110112
var w = wInit.clone
111113
var deltaW = Array.fill(wInit.length)(0.0)
112114

0 commit comments

Comments
 (0)