Skip to content

Commit

Permalink
Zip writer now supports concurrent access. (#1)
Browse files Browse the repository at this point in the history
Calls to Create() will return a new FileHeader struct which will write
compressed data to a tmpfile. Upon Close() the data will be copied
again to the body of the zip file (while holding the mutex).

This allows multiple Zip writers to write a lot of data concurrently
and amortizes the compression cost over multiple cores.
  • Loading branch information
scudette authored Dec 30, 2020
1 parent ad79af5 commit 29e1725
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 45 deletions.
27 changes: 27 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# A Concurrent Zip writer

This is a fork of the Go standard library's Zip implementation which
provides a better writer for multiple concurrent writers.

The standard library implementation requires files to be written in
series. This is not very efficient and leads to significant slow downs
when we need to write many files since each file is compressed on a
single core.

This implementation is designed to work with many writers - each
writer compresses the file independently into a zip file and then
copies the compressed file into the archive when the writer is
closed. This allows each file to be compressed concurrently.

The main different from the standard library is that created file
writer instances need to be explicitey closed to ensure they are added
to the archive (dont forget to check the error status of Close() as it
confirms if the file was added correctly):

```golang
out_fd, err := zip.Create(name)
if err != nil {
...
}
_, err = ioutils.Copy(out_fd, in_reader)
if err != nil {
...
}
err = out_fd.Close()
if err != nil {
...
}
```


## Pool interface

To make using this even easier, there is a CompressorPool
implementation which accepts readers.

Each reader will be compressed and copied to the zip file by a worker
in the pool. A call to pool.Close() will wait until all workers exit
and allow the zip to be safely closed.

```golang
pool := NewCompressorPool(context.Background(), zip, 10)

pool.Compress(&Request{
Reader: reader,
Name: "My filename",
})

err = pool.Close()
if err != nil {
...
}
err = zip.Close()
if err != nil {
...
}
```
7 changes: 7 additions & 0 deletions debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package zip

import "github.com/davecgh/go-spew/spew"

func Debug(v interface{}) {
spew.Dump(v)
}
12 changes: 11 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
module www.velocidex.com/golang/zip
module github.com/Velocidex/zip

go 1.13

require (
github.com/alecthomas/assert v0.0.0-20170929043011-405dbfeb8e38
github.com/alecthomas/colour v0.1.0 // indirect
github.com/alecthomas/repr v0.0.0-20201120212035-bb82daffcca2 // indirect
github.com/davecgh/go-spew v1.1.1
github.com/hashicorp/go-multierror v1.1.0
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
)
35 changes: 35 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
github.com/alecthomas/assert v0.0.0-20170929043011-405dbfeb8e38 h1:smF2tmSOzy2Mm+0dGI2AIUHY+w0BUc+4tn40djz7+6U=
github.com/alecthomas/assert v0.0.0-20170929043011-405dbfeb8e38/go.mod h1:r7bzyVFMNntcxPZXK3/+KdruV1H5KSlyVY0gc+NgInI=
github.com/alecthomas/colour v0.1.0 h1:nOE9rJm6dsZ66RGWYSFrXw461ZIt9A6+nHgL7FRrDUk=
github.com/alecthomas/colour v0.1.0/go.mod h1:QO9JBoKquHd+jz9nshCh40fOfO+JzsoXy8qTHF68zU0=
github.com/alecthomas/repr v0.0.0-20201120212035-bb82daffcca2 h1:G5TeG64Ox4OWq2YwlsxS7nOedU8vbGgNRTRDAjGvDCk=
github.com/alecthomas/repr v0.0.0-20201120212035-bb82daffcca2/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygvNfaDQL8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
117 changes: 117 additions & 0 deletions pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package zip

import (
"context"
"io"
"sync"

"github.com/hashicorp/go-multierror"
)

var (
pool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32*1024)
return &buffer
},
}
)

type Request struct {
Name string
Reader io.ReadCloser
}

type CompressorPool struct {
ctx context.Context
cancel func()
c chan *Request
wg sync.WaitGroup
zip *Writer

err error
}

func (self *CompressorPool) check_error(err error) {
if err == nil {
return
}
self.err = multierror.Append(self.err, err)
}

func (self *CompressorPool) Compress(request *Request) {
self.c <- request
}

func (self *CompressorPool) Close() error {
self.cancel()
self.wg.Wait()

return self.err
}

func NewCompressorPool(ctx context.Context, zip *Writer, size int) *CompressorPool {
sub_ctx, cancel := context.WithCancel(ctx)

self := &CompressorPool{
ctx: sub_ctx,
cancel: cancel,
c: make(chan *Request, size),
zip: zip,
}

for i := 0; i < size; i++ {
self.wg.Add(1)

go func() {
defer self.wg.Done()

for {
select {
case <-self.ctx.Done():
return

case request := <-self.c:
out_fd, err := self.zip.Create(request.Name)
self.check_error(err)
_, err = Copy(self.ctx, out_fd, request.Reader)
self.check_error(err)
self.check_error(out_fd.Close())
self.check_error(request.Reader.Close())
}
}
}()
}

return self
}

// An io.Copy() that respects context cancellations.
func Copy(ctx context.Context, dst io.Writer, src io.Reader) (n int, err error) {
offset := 0
buff := pool.Get().(*[]byte)
defer pool.Put(buff)

for {
select {
case <-ctx.Done():
return n, nil

default:
n, err = src.Read(*buff)
if err != nil && err != io.EOF {
return offset, err
}

if n == 0 {
return offset, nil
}

_, err = dst.Write((*buff)[:n])
if err != nil {
return offset, err
}
offset += n
}
}
}
Loading

0 comments on commit 29e1725

Please sign in to comment.