@@ -2,80 +2,61 @@ package distopt.solvers
2
2
3
3
import org .apache .spark .SparkContext
4
4
import org .apache .spark .rdd .RDD
5
- import distopt .utils .Implicits ._
6
5
import distopt .utils ._
6
+ import breeze .linalg .{Vector , NumericOps , DenseVector , SparseVector }
7
+
7
8
8
9
object CoCoA {
9
10
10
11
/**
11
- * CoCoA - Communication-efficient distributed dual Coordinate Ascent.
12
+ * CoCoA/CoCoA+ - Communication-efficient distributed dual Coordinate Ascent.
12
13
* Using LocalSDCA as the local dual method. Here implemented for standard
13
14
* hinge-loss SVM. For other objectives, adjust localSDCA accordingly.
14
15
*
15
- * @param sc
16
16
* @param data RDD of all data examples
17
- * @param wInit initial weight vector (has to be zero)
18
- * @param numRounds number of outer iterations T in the paper
19
- * @param localIters number of inner localSDCA iterations, H in the paper
20
- * @param lambda the regularization parameter
21
- * @param beta scaling parameter. beta=1 gives averaging, beta=K=data.partitions.size gives (aggressive) adding
22
- * @param chkptIter checkpointing the resulting RDDs from time to time, to ensure persistence and shorter dependencies
23
- * @param testData
24
- * @param debugIter
25
- * @param seed
17
+ * @param params Algorithmic parameters
18
+ * @param debug Systems/debugging parameters
19
+ * @param plus Whether to use the CoCoA+ framework (plus=true) or CoCoA (plus=false)
26
20
* @return
27
21
*/
28
22
def runCoCoA (
29
- sc : SparkContext ,
30
- data : RDD [SparseClassificationPoint ],
31
- n : Int ,
32
- wInit : Array [Double ],
33
- numRounds : Int ,
34
- localIters : Int ,
35
- lambda : Double ,
36
- beta : Double ,
37
- chkptIter : Int ,
38
- testData : RDD [SparseClassificationPoint ],
39
- debugIter : Int ,
40
- seed : Int ,
41
- plus : Boolean ) : (Array [Double ], RDD [Array [Double ]]) = {
23
+ data : RDD [LabeledPoint ],
24
+ params : Params ,
25
+ debug : DebugParams ,
26
+ plus : Boolean ) : (Vector [Double ], RDD [Vector [Double ]]) = {
42
27
43
28
val parts = data.partitions.size // number of partitions of the data, K in the paper
44
29
val alg = if (plus) " CoCoA+" else " CoCoA"
45
- println(" \n Running " + alg+ " on " + n+ " data examples, distributed over " + parts+ " workers" )
30
+ println(" \n Running " + alg+ " on " + params. n+ " data examples, distributed over " + parts+ " workers" )
46
31
47
32
// initialize alpha, w
48
33
var alphaVars = data.map(x => 0.0 ).cache()
49
- var alpha = alphaVars.mapPartitions(x => Iterator (x.toArray))
34
+ var alpha = alphaVars.mapPartitions(x => Iterator (Vector ( x.toArray) ))
50
35
var dataArr = data.mapPartitions(x => Iterator (x.toArray))
51
- var w = wInit
52
- var scaling = if (plus) beta else 1.0 / parts
36
+ var w = params. wInit.copy
37
+ var scaling = if (plus) params.gamma else params.beta / parts
53
38
54
- for (t <- 1 to numRounds){
39
+ for (t <- 1 to params. numRounds) {
55
40
56
41
// zip alpha with data
57
42
val zipData = alpha.zip(dataArr)
58
43
59
44
// find updates to alpha, w
60
- val updates = zipData.mapPartitions(partitionUpdate(_,w, localIters,lambda,n, scaling,seed+ t, plus,parts* beta), preservesPartitioning= true ).persist()
45
+ val updates = zipData.mapPartitions(partitionUpdate(_, w, params. localIters, params. lambda, params.n, scaling, debug. seed + t, plus, parts * params.gamma), preservesPartitioning = true ).persist()
61
46
alpha = updates.map(kv => kv._2)
62
- val primalUpdates = updates.map(kv => kv._1).reduce(_ plus _)
63
- if (plus) {
64
- w = primalUpdates.plus(w)
65
- } else {
66
- w = primalUpdates.times(scaling).plus(w)
67
- }
47
+ val primalUpdates = updates.map(kv => kv._1).reduce(_ + _)
48
+ w += (primalUpdates * scaling)
68
49
69
50
// optionally calculate errors
70
- if (debugIter> 0 && t % debugIter == 0 ) {
51
+ if (debug. debugIter > 0 && t % debug. debugIter == 0 ) {
71
52
println(" Iteration: " + t)
72
- println(" primal objective: " + OptUtils .computePrimalObjective(data, w, lambda))
73
- println(" primal-dual gap: " + OptUtils .computeDualityGap(data, w, alpha, lambda))
74
- if (testData != null ) { println(" test error: " + OptUtils .computeClassificationError(testData, w)) }
53
+ println(" primal objective: " + OptUtils .computePrimalObjective(data, w, params. lambda))
54
+ println(" primal-dual gap: " + OptUtils .computeDualityGap(data, w, alpha, params. lambda))
55
+ if (debug. testData != null ) { println(" test error: " + OptUtils .computeClassificationError(debug. testData, w)) }
75
56
}
76
57
77
58
// optionally checkpoint RDDs
78
- if (t % chkptIter == 0 ){
59
+ if (t % debug. chkptIter == 0 ){
79
60
zipData.checkpoint()
80
61
alpha.checkpoint()
81
62
}
@@ -84,6 +65,7 @@ object CoCoA {
84
65
return (w, alpha)
85
66
}
86
67
68
+
87
69
/**
88
70
* Performs one round of local updates using a given local dual algorithm,
89
71
* here locaSDCA. Will perform localIters many updates per worker.
@@ -93,35 +75,35 @@ object CoCoA {
93
75
* @param localIters
94
76
* @param lambda
95
77
* @param n
96
- * @param scaling this is the scaling factor beta/K in the paper
78
+ * @param scaling This is either gamma for CoCoA+ or beta/K for CoCoA
97
79
* @param seed
80
+ * @param plus
81
+ * @param sigma sigma' in the CoCoA+ paper
98
82
* @return
99
83
*/
100
84
private def partitionUpdate (
101
- zipData : Iterator [(Array [Double ],Array [SparseClassificationPoint ])],// ((Int, Double), SparseClassificationPoint)],
102
- wInit : Array [Double ],
85
+ zipData : Iterator [(Vector [Double ],Array [LabeledPoint ])],// ((Int, Double), SparseClassificationPoint)],
86
+ wInit : Vector [Double ],
103
87
localIters : Int ,
104
88
lambda : Double ,
105
89
n : Int ,
106
90
scaling : Double ,
107
91
seed : Int ,
108
92
plus : Boolean ,
109
- sigma : Double ): Iterator [(Array [Double ], Array [Double ])] = {
93
+ sigma : Double ): Iterator [(Vector [Double ], Vector [Double ])] = {
110
94
111
95
val zipPair = zipData.next()
112
96
val localData = zipPair._2
113
97
var alpha = zipPair._1
114
- val alphaOld = alpha.clone
98
+ val alphaOld = alpha.copy
99
+
115
100
val (deltaAlpha, deltaW) = localSDCA(localData, wInit, localIters, lambda, n, alpha, alphaOld, seed, plus, sigma)
116
-
117
- if (plus) {
118
- alpha = alphaOld.plus(deltaAlpha)
119
- } else {
120
- alpha = alphaOld.plus(deltaAlpha.times(scaling))
121
- }
101
+ alpha = alphaOld + (deltaAlpha * scaling)
102
+
122
103
return Iterator ((deltaW, alpha))
123
104
}
124
105
106
+
125
107
/**
126
108
* This is an implementation of LocalDualMethod, here LocalSDCA (coordinate ascent),
127
109
* with taking the information of the other workers into account, by respecting the
@@ -132,32 +114,35 @@ object CoCoA {
132
114
* regularization parameter C = 1.0/(lambda*numExamples), and re-scaling
133
115
* the alpha variables with 1/C.
134
116
*
135
- * @param localData the local data examples
117
+ * @param localData The local data examples
136
118
* @param wInit
137
- * @param localIters number of local coordinates to update
119
+ * @param localIters Number of local coordinates to update
138
120
* @param lambda
139
- * @param n global number of points (needed for the primal-dual correspondence)
121
+ * @param n Global number of points (needed for the primal-dual correspondence)
140
122
* @param alpha
141
123
* @param alphaOld
142
124
* @param seed
143
- * @return deltaAlpha and deltaW, summarizing the performed local changes, see paper
125
+ * @param plus
126
+ * @param sigma
127
+ * @param plus
128
+ * @return (deltaAlpha, deltaW) Summarizing the performed local changes
144
129
*/
145
130
def localSDCA (
146
- localData : Array [SparseClassificationPoint ],
147
- wInit : Array [Double ],
131
+ localData : Array [LabeledPoint ],
132
+ wInit : Vector [Double ],
148
133
localIters : Int ,
149
134
lambda : Double ,
150
135
n : Int ,
151
- alpha : Array [Double ],
152
- alphaOld : Array [Double ],
136
+ alpha : Vector [Double ],
137
+ alphaOld : Vector [Double ],
153
138
seed : Int ,
154
139
plus : Boolean ,
155
- sigma : Double ): (Array [Double ], Array [Double ]) = {
140
+ sigma : Double ): (Vector [Double ], Vector [Double ]) = {
156
141
157
142
var w = wInit
158
143
val nLocal = localData.length
159
144
var r = new scala.util.Random (seed)
160
- var deltaW = Array .fill (wInit.length)( 0.0 )
145
+ var deltaW = DenseVector .zeros[ Double ] (wInit.length)
161
146
162
147
// perform local udpates
163
148
for (i <- 1 to localIters) {
@@ -171,37 +156,38 @@ object CoCoA {
171
156
// compute hinge loss gradient
172
157
val grad = {
173
158
if (plus) {
174
- (y* (x.dot(w)+ sigma* x.dot(deltaW)) - 1.0 )* (lambda* n)
159
+ (y * (x.dot(w) + ( sigma * x.dot(deltaW))) - 1.0 ) * (lambda * n)
175
160
} else {
176
- (y* (x.dot(w)) - 1.0 )* (lambda* n)
161
+ (y * (x.dot(w)) - 1.0 ) * (lambda * n)
177
162
}
178
163
}
179
164
180
165
// compute projected gradient
181
166
var proj_grad = grad
182
167
if (alpha(idx) <= 0.0 )
183
- proj_grad = Math .min(grad,0 )
168
+ proj_grad = Math .min(grad, 0 )
184
169
else if (alpha(idx) >= 1.0 )
185
- proj_grad = Math .max(grad,0 )
170
+ proj_grad = Math .max(grad, 0 )
186
171
187
172
if (Math .abs(proj_grad) != 0.0 ) {
188
- val qii = if (plus) x.dot(x)* sigma else x.dot(x)
173
+ val xnorm = Math .pow(x.norm(2 ), 2 )
174
+ val qii = if (plus) xnorm * sigma else xnorm
189
175
var newAlpha = 1.0
190
176
if (qii != 0.0 ) {
191
177
newAlpha = Math .min(Math .max((alpha(idx) - (grad / qii)), 0.0 ), 1.0 )
192
178
}
193
179
194
180
// update primal and dual variables
195
- val update = x.times( y * (newAlpha- alpha(idx))/ (lambda* n) )
181
+ val update = x * (y * (newAlpha - alpha(idx)) / (lambda * n) )
196
182
if (! plus) {
197
- w = update.plus(w)
183
+ w = w + update
198
184
}
199
- deltaW = update.plus(deltaW)
185
+ deltaW + = update
200
186
alpha(idx) = newAlpha
201
187
}
202
188
}
203
189
204
- val deltaAlpha = (alphaOld.times( - 1.0 )).plus( alpha)
190
+ val deltaAlpha = alpha - alphaOld
205
191
return (deltaAlpha, deltaW)
206
192
}
207
193
0 commit comments