Skip to content

Commit d7c86ea

Browse files
committed
Removed timer, updated batch interfaces to include finalize
1 parent bf3c504 commit d7c86ea

File tree

4 files changed

+20
-96
lines changed

4 files changed

+20
-96
lines changed

batch.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,42 @@ type Batch struct {
1414
mutex *sync.Mutex
1515
}
1616

17-
// convenience interface - not used directly by this module
17+
// BatchSource is a convenience interface - not used directly by this module
1818
type BatchSource interface {
1919
// when the caller wants to process slices of data
2020
// gives the batch and some context about where in the whole set
2121
GetBatches(
2222
onBatch func(batch []interface{}, batchIndex, batchSize, totalItemCount int) error,
2323
) error
24+
25+
// when the caller wants to close/finalize assets and resources
26+
Finalize() error
2427
}
2528

2629
// convenience interface - not used directly by this module
2730
type BatchSourceFactory func() BatchSource
2831

29-
// convenience interface - not used directly by this module
32+
// BatchDestination is a convenience interface - not used directly by this module
3033
type BatchDestination interface {
34+
35+
// when the caller wants to put a slice of data somewhere
3136
PutBatch([]interface{}) error
37+
38+
// when the caller wants to close/finalize assets and resources
3239
Finalize() error
3340
}
3441

35-
// convenience interface - not used directly by this module
42+
// BytesSource is a convenience interface - not used directly by this module
3643
type BytesSource interface {
44+
3745
// when the caller wants to process bytes of data per batch
3846
// gives the batch and some context about where in the whole set
3947
GetBatches(
4048
onBatch func(bytes []byte, batchIndex, batchSize, totalItemCount int) error,
4149
) error
50+
51+
// when the caller wants to close/finalize assets and resources
52+
Finalize() error
4253
}
4354

4455
// note: io.WriteCloser makes a convenient alternative to "BytesDestination"

dispatcher.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (d *Dispatcher) run() {
6868
go d.sample()
6969
}
7070

71-
// get the number of workers currently running
71+
// RunCount will get the number of workers currently running
7272
func (d *Dispatcher) RunCount() int32 {
7373
var total int32 = 0
7474

@@ -207,7 +207,7 @@ func (d *Dispatcher) IsAnyWorkerIdle() bool {
207207
return int(d.RunCount()) < cap(d.workerPool)
208208
}
209209

210-
// pulls a job from the job queue and adds it to the worker's job queue - a worker will grab it in the worker logic
210+
// dispatch pulls a job from the job queue and adds it to the worker's job queue - a worker will grab it in the worker logic
211211
func (d *Dispatcher) dispatch() {
212212
runCountAtLastLog := -1
213213
for {
@@ -238,7 +238,7 @@ func (d *Dispatcher) dispatch() {
238238
}
239239
}
240240

241-
// periodically check on the workers to get the runcount - if zero, add to the elapsed time count for "all workers idle"
241+
// sample periodically check on the workers to get the runcount - if zero, add to the elapsed time count for "all workers idle"
242242
func (d *Dispatcher) sample() {
243243
ticker := time.NewTicker(d.idlenessSamplerInterval)
244244
d.idlenessSamplerStopChannel = make(chan bool)

timer.go

-87
This file was deleted.

xml/sample/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,15 @@ func tokenRecordsBuilderFunction(reader *xmlWorkerBatch.Reader) func(t xml.Token
8787

8888
func main() {
8989
var (
90-
maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue")
91-
maxBatchSize = flag.Int("max_batch_size", 100, "The max size of batches sent to workers")
90+
maxQueueSize = flag.Int("max_queue_size", 1, "The size of job queue")
91+
maxBatchSize = flag.Int("max_batch_size", 1, "The max size of batches sent to workers")
9292
maxWorkers = flag.Int("max_workers", 2, "The number of workers to start")
9393
)
9494
flag.Parse()
9595

9696
// allocate the XML batch reader
9797
reader := xmlWorkerBatch.Reader{}
98-
reader.Init(*maxQueueSize, *maxWorkers, *maxBatchSize, doWork, work.JobErrorsFatalLogFunction, work.NoLogFunction) // work.PrintlnFunction is an alternative
98+
reader.Init(*maxQueueSize, *maxWorkers, *maxBatchSize, doWork, work.JobErrorsFatalLogFunction, work.PrintlnFunction) // work.PrintlnFunction is an alternative
9999

100100
// start decoding
101101
if err := reader.Decode("test.xml", tokenRecordsBuilderFunction(&reader)); err != nil {

0 commit comments

Comments
 (0)