Skip to content

Commit

Permalink
Merge pull request #21 from mreiferson/startup_test_21
Browse files Browse the repository at this point in the history
address scheduling issues with 'startup' test
  • Loading branch information
jehiah committed Aug 28, 2012
2 parents 3fdc426 + 171a142 commit 909f2ba
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 33 deletions.
2 changes: 1 addition & 1 deletion nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewChannel(topicName string, channelName string, inMemSize int64, dataPath
name: channelName,
msgTimeout: msgTimeout,
backend: NewDiskQueue(backendName, dataPath, maxBytesPerFile, syncEvery),
incomingMsgChan: make(chan *nsq.Message, 5),
incomingMsgChan: make(chan *nsq.Message, 1),
memoryMsgChan: make(chan *nsq.Message, inMemSize),
clientMsgChan: make(chan *nsq.Message),
exitChan: make(chan int),
Expand Down
16 changes: 15 additions & 1 deletion nsqd/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func TestDiskQueueEmpty(t *testing.T) {
for i := 0; i < 3; i++ {
<-dq.ReadChan()
}

for {
if dq.Depth() == 97 {
break
}
time.Sleep(50 * time.Millisecond)
}
assert.Equal(t, dq.Depth(), int64(97))

// cheat and use the lower level method so that the test doesn't
Expand All @@ -92,6 +99,13 @@ func TestDiskQueueEmpty(t *testing.T) {
<-dq.ReadChan()
}

for {
if dq.Depth() == 0 {
break
}
time.Sleep(50 * time.Millisecond)
}

assert.Equal(t, dq.Depth(), int64(0))
assert.Equal(t, dq.(*DiskQueue).readFileNum, dq.(*DiskQueue).writeFileNum)
assert.Equal(t, dq.(*DiskQueue).readPos, dq.(*DiskQueue).writePos)
Expand Down Expand Up @@ -171,7 +185,7 @@ func TestDiskQueueTorture(t *testing.T) {
if dq.Depth() == 0 {
break
}
runtime.Gosched()
time.Sleep(50 * time.Millisecond)
}

log.Printf("closing readExitChan")
Expand Down
28 changes: 1 addition & 27 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"net"
"os"
"runtime"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -46,34 +45,9 @@ func TestStartup(t *testing.T) {
topic.PutMessage(msg)
}

log.Printf("checking that PUTs are finished")
for {
count := topic.Depth()
log.Printf("there are %d and %d", len(topic.memoryMsgChan), topic.backend.Depth())
if count == int64(iterations-1) { // this will always be one short because the channel buffer is primed
break
}
runtime.Gosched()
}

log.Printf("pulling from channel")
channel1 := topic.GetChannel("ch1")

// channel should drain topic
// topic -> channel is buffered, so this may take a few cycles
for {
topic_count := topic.Depth()
chan_count := channel1.Depth()

log.Printf("%d %d; waiting for channel to drain topic; there are %d and %d in topic and %d and %d in channel",
topic_count, chan_count,
len(topic.memoryMsgChan), topic.backend.Depth(), len(channel1.memoryMsgChan), channel1.backend.Depth())
if topic_count == 0 && chan_count == int64(iterations) {
break
}
runtime.Gosched()
}

log.Printf("read %d msgs", iterations/2)
for i := 0; i < iterations/2; i++ {
msg := <-channel1.clientMsgChan
Expand All @@ -85,7 +59,7 @@ func TestStartup(t *testing.T) {
if channel1.Depth() == int64(iterations/2) {
break
}
runtime.Gosched()
time.Sleep(50 * time.Millisecond)
}

exitChan <- 1
Expand Down
6 changes: 5 additions & 1 deletion nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
if err != nil {
log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error())
}
case msg := <-client.Channel.clientMsgChan:
case msg, ok := <-client.Channel.clientMsgChan:
if !ok {
goto exit
}

if *verbose {
log.Printf("PROTOCOL(V2): writing msg(%s) to client(%s) - %s",
msg.Id, client, msg.Body)
Expand Down
2 changes: 1 addition & 1 deletion nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewTopic(topicName string, memQueueSize int64, dataPath string, maxBytesPer
name: topicName,
channelMap: make(map[string]*Channel),
backend: NewDiskQueue(topicName, dataPath, maxBytesPerFile, syncEvery),
incomingMsgChan: make(chan *nsq.Message, 5),
incomingMsgChan: make(chan *nsq.Message, 1),
memoryMsgChan: make(chan *nsq.Message, memQueueSize),
memQueueSize: memQueueSize,
dataPath: dataPath,
Expand Down
4 changes: 2 additions & 2 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -e
for dir in nsqd util/pqueue; do
echo "testing $dir"
pushd $dir >/dev/null
go test -test.v -timeout 2s
go test -test.v -timeout 5s
popd >/dev/null
done

Expand All @@ -24,7 +24,7 @@ trap cleanup INT TERM EXIT
popd >/dev/null
pushd nsq >/dev/null
echo "testing nsq"
go test -v -timeout 2s
go test -v -timeout 5s
popd >/dev/null

# no tests, but a build is something
Expand Down

0 comments on commit 909f2ba

Please sign in to comment.