Skip to content

Commit 45d8ca9

Browse files
Update decompress method
1 parent 76ab56b commit 45d8ca9

File tree

6 files changed

+188
-82
lines changed

6 files changed

+188
-82
lines changed

cmd/server/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ require (
120120
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
121121
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
122122
github.com/golang/protobuf v1.5.4 // indirect
123-
github.com/golang/snappy v0.0.4 // indirect
123+
github.com/golang/snappy v1.0.0 // indirect
124124
github.com/google/s2a-go v0.1.4 // indirect
125125
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
126126
github.com/googleapis/gax-go/v2 v2.12.0 // indirect

cmd/server/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
169169
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
170170
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
171171
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
172-
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
173-
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
172+
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
173+
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
174174
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
175175
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
176176
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package executorstore
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
7+
"github.com/golang/snappy"
8+
9+
"github.com/uber/cadence/common/types"
10+
)
11+
12+
// compress compresses JSON data using snappy compression
13+
func compress(data []byte) ([]byte, error) {
14+
return snappy.Encode(nil, data), nil
15+
}
16+
17+
// decompress decompresses snappy-compressed data with backward compatibility for uncompressed data
18+
// If decompression fails (indicating legacy uncompressed data), return the data as-is
19+
func decompress(data []byte) ([]byte, error) {
20+
if len(data) == 0 {
21+
return data, nil
22+
}
23+
24+
decompressed, err := snappy.Decode(nil, data)
25+
if err != nil {
26+
// Decompression failed
27+
// Return it as-is for backward compatibility
28+
return data, nil
29+
}
30+
31+
return decompressed, nil
32+
}
33+
34+
func compressedActiveStatus() string {
35+
compressed, _ := compress([]byte(fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE)))
36+
return string(compressed)
37+
}
38+
39+
// decompressAndUnmarshal decompresses data (if needed) and unmarshals it into the target
40+
// This function handles both compressed and uncompressed data for backward compatibility
41+
// errorContext is used to provide meaningful error messages
42+
func decompressAndUnmarshal(data []byte, target interface{}, errorContext string) error {
43+
decompressed, err := decompress(data)
44+
if err != nil {
45+
return fmt.Errorf("decompress %s: %w", errorContext, err)
46+
}
47+
if err := json.Unmarshal(decompressed, target); err != nil {
48+
return fmt.Errorf("unmarshal %s: %w", errorContext, err)
49+
}
50+
return nil
51+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package executorstore
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/uber/cadence/common/types"
11+
)
12+
13+
func TestCompressDecompress(t *testing.T) {
14+
original := []byte(`{"status":"ACTIVE","shards":["shard1","shard2"]}`)
15+
16+
compressed, err := compress(original)
17+
require.NoError(t, err)
18+
require.NotNil(t, compressed)
19+
20+
assert.NotEqual(t, original, compressed)
21+
22+
decompressed, err := decompress(compressed)
23+
require.NoError(t, err)
24+
assert.Equal(t, original, decompressed)
25+
}
26+
27+
func TestDecompressBackwardCompatibility(t *testing.T) {
28+
t.Run("Old uncompressed data", func(t *testing.T) {
29+
oldData := []byte(`{"status":"ACTIVE"}`)
30+
31+
result, err := decompress(oldData)
32+
require.NoError(t, err)
33+
assert.Equal(t, oldData, result, "Old uncompressed data should be returned as-is")
34+
35+
var status map[string]string
36+
err = json.Unmarshal(result, &status)
37+
require.NoError(t, err)
38+
assert.Equal(t, "ACTIVE", status["status"])
39+
})
40+
41+
t.Run("New compressed data", func(t *testing.T) {
42+
original := []byte(`{"status":"DRAINING"}`)
43+
compressed, err := compress(original)
44+
require.NoError(t, err)
45+
46+
result, err := decompress(compressed)
47+
require.NoError(t, err)
48+
assert.Equal(t, original, result)
49+
50+
var status map[string]string
51+
err = json.Unmarshal(result, &status)
52+
require.NoError(t, err)
53+
assert.Equal(t, "DRAINING", status["status"])
54+
})
55+
}
56+
57+
func TestDecompressAndUnmarshalBackwardCompatibility(t *testing.T) {
58+
type testData struct {
59+
Status string `json:"status"`
60+
Shards []string `json:"shards"`
61+
}
62+
63+
t.Run("Old uncompressed JSON", func(t *testing.T) {
64+
oldData := []byte(`{"status":"ACTIVE","shards":["shard1","shard2"]}`)
65+
66+
var result testData
67+
err := decompressAndUnmarshal(oldData, &result, "test data")
68+
require.NoError(t, err)
69+
assert.Equal(t, "ACTIVE", result.Status)
70+
assert.Equal(t, []string{"shard1", "shard2"}, result.Shards)
71+
})
72+
73+
t.Run("New compressed data", func(t *testing.T) {
74+
original := testData{
75+
Status: "DRAINING",
76+
Shards: []string{"shard3", "shard4"},
77+
}
78+
originalJSON, _ := json.Marshal(original)
79+
compressed, err := compress(originalJSON)
80+
require.NoError(t, err)
81+
82+
var result testData
83+
err = decompressAndUnmarshal(compressed, &result, "test data")
84+
require.NoError(t, err)
85+
assert.Equal(t, original.Status, result.Status)
86+
assert.Equal(t, original.Shards, result.Shards)
87+
})
88+
89+
t.Run("Invalid JSON in uncompressed data", func(t *testing.T) {
90+
invalidJSON := []byte(`{invalid json}`)
91+
92+
var result testData
93+
err := decompressAndUnmarshal(invalidJSON, &result, "test data")
94+
require.Error(t, err)
95+
assert.Contains(t, err.Error(), "unmarshal test data")
96+
})
97+
}
98+
99+
func TestCompressedActiveStatus(t *testing.T) {
100+
compressed := compressedActiveStatus()
101+
require.NotEmpty(t, compressed)
102+
103+
decompressed, err := decompress([]byte(compressed))
104+
require.NoError(t, err)
105+
106+
var status types.ExecutorStatus
107+
err = json.Unmarshal(decompressed, &status)
108+
require.NoError(t, err)
109+
assert.Equal(t, types.ExecutorStatusACTIVE, status)
110+
}

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 21 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"strconv"
1212
"time"
1313

14-
"github.com/golang/snappy"
1514
clientv3 "go.etcd.io/etcd/client/v3"
1615
"go.uber.org/fx"
1716

@@ -24,26 +23,6 @@ import (
2423
"github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/shardcache"
2524
)
2625

27-
var (
28-
_executorStatusRunningJSON = fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE)
29-
_executorStatusRunningJSONCompressed string
30-
)
31-
32-
func init() {
33-
compressed, _ := compressJSON([]byte(_executorStatusRunningJSON))
34-
_executorStatusRunningJSONCompressed = string(compressed)
35-
}
36-
37-
// compressJSON compresses JSON data using snappy compression
38-
func compressJSON(data []byte) ([]byte, error) {
39-
return snappy.Encode(nil, data), nil
40-
}
41-
42-
// decompressJSON decompresses snappy-compressed data
43-
func decompressJSON(data []byte) ([]byte, error) {
44-
return snappy.Decode(nil, data)
45-
}
46-
4726
type executorStoreImpl struct {
4827
client *clientv3.Client
4928
prefix string
@@ -139,12 +118,12 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
139118
}
140119

141120
// Compress data before writing to etcd
142-
compressedReportedShards, err := compressJSON(reportedShardsData)
121+
compressedReportedShards, err := compress(reportedShardsData)
143122
if err != nil {
144123
return fmt.Errorf("compress reported shards: %w", err)
145124
}
146125

147-
compressedState, err := compressJSON(jsonState)
126+
compressedState, err := compress(jsonState)
148127
if err != nil {
149128
return fmt.Errorf("compress state: %w", err)
150129
}
@@ -199,31 +178,16 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string,
199178
}
200179
heartbeatState.LastHeartbeat = timestamp
201180
case etcdkeys.ExecutorStatusKey:
202-
decompressed, err := decompressJSON(kv.Value)
203-
if err != nil {
204-
return nil, nil, fmt.Errorf("decompress heartbeat state: %w", err)
205-
}
206-
err = json.Unmarshal(decompressed, &heartbeatState.Status)
207-
if err != nil {
208-
return nil, nil, fmt.Errorf("parse heartbeat state: %w", err)
181+
if err := decompressAndUnmarshal(kv.Value, &heartbeatState.Status, "heartbeat state"); err != nil {
182+
return nil, nil, err
209183
}
210184
case etcdkeys.ExecutorReportedShardsKey:
211-
decompressed, err := decompressJSON(kv.Value)
212-
if err != nil {
213-
return nil, nil, fmt.Errorf("decompress reported shards: %w", err)
214-
}
215-
err = json.Unmarshal(decompressed, &heartbeatState.ReportedShards)
216-
if err != nil {
217-
return nil, nil, fmt.Errorf("unmarshal reported shards: %w", err)
185+
if err := decompressAndUnmarshal(kv.Value, &heartbeatState.ReportedShards, "reported shards"); err != nil {
186+
return nil, nil, err
218187
}
219188
case etcdkeys.ExecutorAssignedStateKey:
220-
decompressed, err := decompressJSON(kv.Value)
221-
if err != nil {
222-
return nil, nil, fmt.Errorf("decompress assigned state: %w", err)
223-
}
224-
err = json.Unmarshal(decompressed, &assignedState)
225-
if err != nil {
226-
return nil, nil, fmt.Errorf("unmarshal assigned shards: %w", err)
189+
if err := decompressAndUnmarshal(kv.Value, &assignedState, "assigned state"); err != nil {
190+
return nil, nil, err
227191
}
228192
}
229193
}
@@ -262,31 +226,16 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
262226
timestamp, _ := strconv.ParseInt(value, 10, 64)
263227
heartbeat.LastHeartbeat = timestamp
264228
case etcdkeys.ExecutorStatusKey:
265-
decompressed, err := decompressJSON(kv.Value)
266-
if err != nil {
267-
return nil, fmt.Errorf("decompress heartbeat state: %w", err)
268-
}
269-
err = json.Unmarshal(decompressed, &heartbeat.Status)
270-
if err != nil {
271-
return nil, fmt.Errorf("parse heartbeat state: %w", err)
229+
if err := decompressAndUnmarshal(kv.Value, &heartbeat.Status, "heartbeat state"); err != nil {
230+
return nil, err
272231
}
273232
case etcdkeys.ExecutorReportedShardsKey:
274-
decompressed, err := decompressJSON(kv.Value)
275-
if err != nil {
276-
return nil, fmt.Errorf("decompress reported shards: %w", err)
277-
}
278-
err = json.Unmarshal(decompressed, &heartbeat.ReportedShards)
279-
if err != nil {
280-
return nil, fmt.Errorf("unmarshal reported shards: %w", err)
233+
if err := decompressAndUnmarshal(kv.Value, &heartbeat.ReportedShards, "reported shards"); err != nil {
234+
return nil, err
281235
}
282236
case etcdkeys.ExecutorAssignedStateKey:
283-
decompressed, err := decompressJSON(kv.Value)
284-
if err != nil {
285-
return nil, fmt.Errorf("decompress assigned state: %w", err)
286-
}
287-
err = json.Unmarshal(decompressed, &assigned)
288-
if err != nil {
289-
return nil, fmt.Errorf("unmarshal assigned shards: %w", err)
237+
if err := decompressAndUnmarshal(kv.Value, &assigned, "assigned state"); err != nil {
238+
return nil, err
290239
}
291240
assigned.ModRevision = kv.ModRevision
292241
}
@@ -358,7 +307,7 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
358307
if err != nil {
359308
return fmt.Errorf("marshal assigned shards for executor %s: %w", executorID, err)
360309
}
361-
compressedValue, err := compressJSON(value)
310+
compressedValue, err := compress(value)
362311
if err != nil {
363312
return fmt.Errorf("compress assigned shards for executor %s: %w", executorID, err)
364313
}
@@ -441,12 +390,8 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
441390
// If the executor already has shards, load its state.
442391
kv := resp.Kvs[0]
443392
modRevision = kv.ModRevision
444-
decompressed, err := decompressJSON(kv.Value)
445-
if err != nil {
446-
return fmt.Errorf("decompress assigned state: %w", err)
447-
}
448-
if err := json.Unmarshal(decompressed, &state); err != nil {
449-
return fmt.Errorf("unmarshal assigned state: %w", err)
393+
if err := decompressAndUnmarshal(kv.Value, &state, "assigned state"); err != nil {
394+
return err
450395
}
451396
} else {
452397
// If this is the first shard, initialize the state map.
@@ -463,7 +408,7 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
463408
return fmt.Errorf("marshal new assigned state: %w", err)
464409
}
465410

466-
compressedStateValue, err := compressJSON(newStateValue)
411+
compressedStateValue, err := compress(newStateValue)
467412
if err != nil {
468413
return fmt.Errorf("compress new assigned state: %w", err)
469414
}
@@ -472,7 +417,7 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
472417

473418
// 3. Prepare and commit the transaction with three atomic checks.
474419
// a) Check that the executor's status is ACTIVE.
475-
comparisons = append(comparisons, clientv3.Compare(clientv3.Value(statusKey), "=", _executorStatusRunningJSONCompressed))
420+
comparisons = append(comparisons, clientv3.Compare(clientv3.Value(statusKey), "=", compressedActiveStatus()))
476421
// b) Check that the assigned_state key hasn't been changed by another process.
477422
comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(assignedState), "=", modRevision))
478423
// c) Check that the cache is up to date.
@@ -510,9 +455,9 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
510455
if err != nil || len(currentStatusResp.Kvs) == 0 {
511456
return store.ErrExecutorNotFound
512457
}
513-
if string(currentStatusResp.Kvs[0].Value) != _executorStatusRunningJSONCompressed {
458+
if string(currentStatusResp.Kvs[0].Value) != compressedActiveStatus() {
514459
// Decompress the status for error message
515-
decompressedStatus, _ := decompressJSON(currentStatusResp.Kvs[0].Value)
460+
decompressedStatus, _ := decompress(currentStatusResp.Kvs[0].Value)
516461
return fmt.Errorf(`%w: executor status is %s"`, store.ErrVersionConflict, string(decompressedStatus))
517462
}
518463

service/sharddistributor/store/etcd/executorstore/etcdstore_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ func TestRecordHeartbeat(t *testing.T) {
5757
resp, err = tc.Client.Get(ctx, stateKey)
5858
require.NoError(t, err)
5959
require.Equal(t, int64(1), resp.Count, "State key should exist")
60-
decompressedState, err := decompressJSON(resp.Kvs[0].Value)
60+
decompressedState, err := decompress(resp.Kvs[0].Value)
6161
require.NoError(t, err)
6262
assert.Equal(t, stringStatus(types.ExecutorStatusACTIVE), string(decompressedState))
6363

6464
resp, err = tc.Client.Get(ctx, reportedShardsKey)
6565
require.NoError(t, err)
6666
require.Equal(t, int64(1), resp.Count, "Reported shards key should exist")
6767

68-
decompressedReportedShards, err := decompressJSON(resp.Kvs[0].Value)
68+
decompressedReportedShards, err := decompress(resp.Kvs[0].Value)
6969
require.NoError(t, err)
7070
var reportedShards map[string]*types.ShardStatusReport
7171
err = json.Unmarshal(decompressedReportedShards, &reportedShards)
@@ -357,7 +357,7 @@ func TestSubscribe(t *testing.T) {
357357
// Now update the reported shards, which IS a significant change
358358
reportedShardsKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards")
359359
require.NoError(t, err)
360-
compressedShards, err := compressJSON([]byte(`{"shard-1":{"status":"running"}}`))
360+
compressedShards, err := compress([]byte(`{"shard-1":{"status":"running"}}`))
361361
require.NoError(t, err)
362362
_, err = tc.Client.Put(ctx, reportedShardsKey, string(compressedShards))
363363
require.NoError(t, err)

0 commit comments

Comments
 (0)