-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplication.go
171 lines (155 loc) · 5.19 KB
/
replication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package pouchdb
import (
"context"
"net/http"
"sync"
"time"
"github.com/gopherjs/gopherjs/js"
kivik "github.com/go-kivik/kivik/v4"
"github.com/go-kivik/kivik/v4/driver"
"github.com/go-kivik/pouchdb/v4/bindings"
)
type replication struct {
source string
target string
startTime time.Time
endTime time.Time
state kivik.ReplicationState
err error
// mu protects the above values
mu sync.RWMutex
client *client
rh *replicationHandler
}
var _ driver.Replication = &replication{}
func (c *client) newReplication(target, source string, rep *js.Object) *replication {
r := &replication{
target: target,
source: source,
rh: newReplicationHandler(rep),
client: c,
}
c.replicationsMU.Lock()
defer c.replicationsMU.Unlock()
c.replications = append(c.replications, r)
return r
}
func (r *replication) readLock() func() {
r.mu.RLock()
return r.mu.RUnlock
}
func (r *replication) ReplicationID() string { return "" }
func (r *replication) Source() string { defer r.readLock()(); return r.source }
func (r *replication) Target() string { defer r.readLock()(); return r.target }
func (r *replication) StartTime() time.Time { defer r.readLock()(); return r.startTime }
func (r *replication) EndTime() time.Time { defer r.readLock()(); return r.endTime }
func (r *replication) State() string { defer r.readLock()(); return string(r.state) }
func (r *replication) Err() error { defer r.readLock()(); return r.err }
func (r *replication) Update(ctx context.Context, state *driver.ReplicationInfo) (err error) {
defer bindings.RecoverError(&err)
r.mu.Lock()
defer r.mu.Unlock()
event, info, err := r.rh.Status()
if err != nil {
return err
}
switch event {
case bindings.ReplicationEventDenied, bindings.ReplicationEventError:
r.state = kivik.ReplicationError
r.err = bindings.NewPouchError(info.Object)
case bindings.ReplicationEventComplete:
r.state = kivik.ReplicationComplete
case bindings.ReplicationEventPaused, bindings.ReplicationEventChange, bindings.ReplicationEventActive:
r.state = kivik.ReplicationStarted
}
if info != nil {
startTime, endTime := info.StartTime(), info.EndTime()
if r.startTime.IsZero() && !startTime.IsZero() {
r.startTime = startTime
}
if r.endTime.IsZero() && !endTime.IsZero() {
r.endTime = endTime
}
if r.rh.state != nil {
state.DocWriteFailures = r.rh.state.DocWriteFailures
state.DocsRead = r.rh.state.DocsRead
state.DocsWritten = r.rh.state.DocsWritten
}
}
return nil
}
func (r *replication) Delete(ctx context.Context) (err error) {
defer bindings.RecoverError(&err)
r.rh.Cancel()
r.client.replicationsMU.Lock()
defer r.client.replicationsMU.Unlock()
for i, rep := range r.client.replications {
if rep == r {
last := len(r.client.replications) - 1
r.client.replications[i] = r.client.replications[last]
r.client.replications[last] = nil
r.client.replications = r.client.replications[:last]
return nil
}
}
return &kivik.Error{Status: http.StatusNotFound, Message: "replication not found"}
}
func replicationEndpoint(dsn string, object interface{}) (name string, obj interface{}, err error) {
defer bindings.RecoverError(&err)
if object == nil {
return dsn, dsn, nil
}
switch t := object.(type) {
case *js.Object:
tx := object.(*js.Object) // https://github.com/gopherjs/gopherjs/issues/682
// Assume it's a raw PouchDB object
return tx.Get("name").String(), tx, nil
case *bindings.DB:
// Unwrap the bare object
return t.Object.Get("name").String(), t.Object, nil
}
// Just let it pass through
return "<unknown>", obj, nil
}
func (c *client) Replicate(_ context.Context, targetDSN, sourceDSN string, options map[string]interface{}) (driver.Replication, error) {
opts := c.options(options)
// Allow overriding source and target with options, i.e. for PouchDB objects
sourceName, sourceObj, err := replicationEndpoint(sourceDSN, opts["source"])
if err != nil {
return nil, err
}
targetName, targetObj, err := replicationEndpoint(targetDSN, opts["target"])
if err != nil {
return nil, err
}
delete(opts, "source")
delete(opts, "target")
rep, err := c.pouch.Replicate(sourceObj, targetObj, opts)
if err != nil {
return nil, err
}
return c.newReplication(targetName, sourceName, rep), nil
}
func (c *client) GetReplications(_ context.Context, options map[string]interface{}) ([]driver.Replication, error) {
for range options {
return nil, &kivik.Error{Status: http.StatusBadRequest, Message: "options not yet supported"}
}
c.replicationsMU.RLock()
defer c.replicationsMU.RUnlock()
reps := make([]driver.Replication, len(c.replications))
for i, rep := range c.replications {
reps[i] = rep
}
return reps, nil
}