Skip to content

Commit

Permalink
Refactor: Use PriorityQueue instead of Queue in queue.go
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzl committed Nov 2, 2024
1 parent d4b6b68 commit e6807a0
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/golang/glog v1.2.0
github.com/liuzl/ds v0.0.0-20190109073647-d2aafa2ae9eb
github.com/liuzl/ds v0.0.0-20241102124846-854c79ec12e2
github.com/liuzl/store v0.0.0-20190530065605-e2dbcd3c77fc
github.com/syndtr/goleveldb v1.0.0
zliu.org/goutil v0.0.0-20241031150925-efd2494eb218
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/justinas/alice v1.2.0 h1:+MHSA/vccVCF4Uq37S42jwlkvI2Xzl7zTPCN5BnZNVo=
github.com/justinas/alice v1.2.0/go.mod h1:fN5HRH/reO/zrUflLfTN43t3vXvKzvZIENsNEe7i7qA=
github.com/liuzl/ds v0.0.0-20190109073647-d2aafa2ae9eb h1:F/hKBnRcUbW//NrJSoKP0Tn1D/DkrhH0jchye2lp3wg=
github.com/liuzl/ds v0.0.0-20190109073647-d2aafa2ae9eb/go.mod h1:YFlD8yquAegVCxRU3sq3UCMfC1WwKnnj5TYe3UanIGk=
github.com/liuzl/ds v0.0.0-20241102124846-854c79ec12e2 h1:vq2jctPA+imXhecpYnYluwPbIbYr673+2UbI9UXcmFA=
github.com/liuzl/ds v0.0.0-20241102124846-854c79ec12e2/go.mod h1:H4fJlsetwnfon9zXYqR4lRLa/DA/1t7i51txLsj6Fjs=
github.com/liuzl/filestore v0.0.0-20200229104338-5ea723a6a528 h1:g+uxFYnxN+bMSgLu+t7k4zzVIUsRhKykir1C4F5Gp2c=
github.com/liuzl/filestore v0.0.0-20200229104338-5ea723a6a528/go.mod h1:aMgfSMkON/7fp+l9vv8w0xq870iSPVrNs7IqEu3xu5Q=
github.com/liuzl/store v0.0.0-20190530065605-e2dbcd3c77fc h1:mZ1DgWJEXekv8VFCurVYxQdqJ8bgnsx7cFyBAE+ORCE=
Expand Down Expand Up @@ -61,7 +61,5 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
zliu.org/goutil v0.0.0-20241019130207-6c223a2df734 h1:giuCzrxjd/VENibfUqN/nc86tn/uh2x8/Ot6hJJHS/U=
zliu.org/goutil v0.0.0-20241019130207-6c223a2df734/go.mod h1:inD0L5TSPMM0YJgPC3cUPZPnoLk9cHUpL2DlZQeQ4wM=
zliu.org/goutil v0.0.0-20241031150925-efd2494eb218 h1:9L72bA/zDUAQ5nIDpPI9Cz/9Hnv/NfLMXDyg9i+0Yf8=
zliu.org/goutil v0.0.0-20241031150925-efd2494eb218/go.mod h1:inD0L5TSPMM0YJgPC3cUPZPnoLk9cHUpL2DlZQeQ4wM=
27 changes: 16 additions & 11 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ import (
type Queue struct {
Path string `json:"path"`

queue *ds.Queue
queue *ds.PriorityQueue
runningStore *store.LevelStore
exit chan bool
retryLimit int
}

type valueCnt struct {
Value []byte
Cnt int
Value []byte
Priority uint8
Cnt int
}

func NewQueueWithRetryLimit(path string, limit int) (*Queue, error) {
Expand All @@ -39,7 +40,7 @@ func NewQueue(path string) (*Queue, error) {
q := &Queue{Path: path, exit: make(chan bool)}
var err error
queueDir := filepath.Join(path, "queue")
if q.queue, err = ds.OpenQueue(queueDir); err != nil {
if q.queue, err = ds.OpenPriorityQueue(queueDir); err != nil {
return nil, err
}
storeDir := filepath.Join(path, "running")
Expand Down Expand Up @@ -70,14 +71,18 @@ func (q *Queue) Status() map[string]interface{} {
}
}

func (q *Queue) Enqueue(data string) error {
func (q *Queue) EnqueueWithPriority(data string, priority uint8) error {
if q.queue != nil {
_, err := q.queue.EnqueueObject(&valueCnt{Value: []byte(data)})
_, err := q.queue.EnqueueObject(&valueCnt{Value: []byte(data), Priority: priority}, priority)
return err
}
return fmt.Errorf("queue is nil")
}

func (q *Queue) Enqueue(data string) error {
return q.EnqueueWithPriority(data, 128)
}

func (q *Queue) Peek() (string, error) {
if q.queue == nil || q.queue.Length() == 0 {
return "", fmt.Errorf("Queue is empty")
Expand Down Expand Up @@ -111,7 +116,7 @@ func (q *Queue) Dequeue(timeout int64) (string, string, error) {
if timeout > 0 && (q.retryLimit <= 0 || v.Cnt < q.retryLimit) {
now := time.Now().Unix()
key = goutil.TimeStr(now+timeout) + ":" + goutil.ContentMD5(item.Value)
if err = q.addToRunning(key, v.Value, v.Cnt+1); err != nil {
if err = q.addToRunning(key, v.Value, v.Priority, v.Cnt+1); err != nil {
return "", "", err
}
}
Expand Down Expand Up @@ -149,14 +154,14 @@ func (q *Queue) Drop() {
os.RemoveAll(q.Path)
}

func (q *Queue) addToRunning(key string, value []byte, cnt int) error {
func (q *Queue) addToRunning(key string, value []byte, priority uint8, cnt int) error {
if len(value) == 0 {
return fmt.Errorf("empty value")
}
if q.runningStore == nil {
return fmt.Errorf("runningStore is nil")
}
v, err := store.ObjectToBytes(valueCnt{value, cnt})
v, err := store.ObjectToBytes(valueCnt{value, priority, cnt})
if err != nil {
return err
}
Expand All @@ -176,7 +181,7 @@ func (q *Queue) retry() {
if err := store.BytesToObject(value, &v); err != nil {
return false, err
}
if _, err := q.queue.EnqueueObject(v); err != nil {
if _, err := q.queue.EnqueueObject(v, v.Priority); err != nil {
return false, err
}
q.runningStore.Delete(string(key))
Expand Down Expand Up @@ -210,7 +215,7 @@ func (q *Queue) DequeueWithPreviousRetryCount(timeout int64) (string, string, in
if timeout > 0 && (q.retryLimit <= 0 || previousRetryCount < q.retryLimit) {
now := time.Now().Unix()
key = goutil.TimeStr(now+timeout) + ":" + goutil.ContentMD5(item.Value)
if err = q.addToRunning(key, v.Value, previousRetryCount+1); err != nil {
if err = q.addToRunning(key, v.Value, v.Priority, previousRetryCount+1); err != nil {
return "", "", 0, err
}
}
Expand Down

0 comments on commit e6807a0

Please sign in to comment.