diff --git a/client.go b/client.go index acdaeb5..c3f5771 100644 --- a/client.go +++ b/client.go @@ -737,10 +737,18 @@ func (c *Client) importRoaringBitmap(uri *URI, field *Field, shard uint64, views } // ExportField exports columns for a field. -func (c *Client) ExportField(field *Field) (io.Reader, error) { +func (c *Client) ExportField(field *Field, opts ...ExportOption) (io.Reader, error) { span := c.tracer.StartSpan("Client.ExportField") defer span.Finish() + exportOpts := &exportOptions{} + for _, opt := range opts { + err := opt(exportOpts) + if err != nil { + return nil, err + } + } + var shardsMax map[string]uint64 var err error @@ -758,7 +766,7 @@ func (c *Client) ExportField(field *Field) (io.Reader, error) { return nil, err } - return newExportReader(c, shardURIs, field), nil + return newExportReader(c, shardURIs, field, exportOpts.timestamps), nil } // Info returns the server's configuration/host information. @@ -1690,6 +1698,21 @@ func importRecordsFunction(fun func(field *Field, } } +type exportOptions struct { + timestamps bool +} + +// ExportOption is used when running exports. +type ExportOption func(options *exportOptions) error + +// OptExportTimestamps enables exporting timestamps for time fields. +func OptExportTimestamps(enable bool) ExportOption { + return func(options *exportOptions) error { + options.timestamps = enable + return nil + } +} + type fragmentNodeRoot struct { URI fragmentNode `json:"uri"` } @@ -1811,14 +1834,16 @@ type exportReader struct { bodyIndex int currentShard uint64 shardCount uint64 + timestamps bool } -func newExportReader(client *Client, shardURIs map[uint64]*URI, field *Field) *exportReader { +func newExportReader(client *Client, shardURIs map[uint64]*URI, field *Field, timestamps bool) *exportReader { return &exportReader{ client: client, shardURIs: shardURIs, field: field, shardCount: uint64(len(shardURIs)), + timestamps: timestamps, } } @@ -1833,9 +1858,14 @@ func (r *exportReader) Read(p []byte) (n int, err error) { headers := map[string]string{ "Accept": "text/csv", } - path := fmt.Sprintf("/export?index=%s&field=%s&shard=%d", - r.field.index.Name(), r.field.Name(), r.currentShard) - resp, err := r.client.doRequest(uri, "GET", path, headers, nil) + u := url.URL{Path: "/export"} + u.RawQuery = url.Values{ + "index": {r.field.index.Name()}, + "field": {r.field.Name()}, + "shard": {strconv.FormatUint(r.currentShard, 10)}, + "timestamps": {strconv.FormatBool(r.timestamps)}, + }.Encode() + resp, err := r.client.doRequest(uri, "GET", u.String(), headers, nil) if err = anyError(resp, err); err != nil { return 0, errors.Wrap(err, "doing export request") } diff --git a/client_it_test.go b/client_it_test.go index 39bb917..050be3c 100644 --- a/client_it_test.go +++ b/client_it_test.go @@ -852,6 +852,85 @@ func TestImportWithBatchSizeExpectingZero(t *testing.T) { } } +func TestExportNoTimestamps(t *testing.T) { + client := getClient() + schema := NewSchema() + index := schema.Index("export-index") + field := index.Field("f1") + err := client.SyncSchema(schema) + if err != nil { + t.Fatal(err) + } + _, err = client.Query(index.BatchQuery( + field.Set(1, 100), + field.Set(2, 1000), + )) + if err != nil { + t.Fatal(err) + } + r, err := client.ExportField(field) + if err != nil { + t.Fatal(err) + } + b, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + target := []byte("1,100\n2,1000\n") + if !reflect.DeepEqual(target, b) { + t.Fatalf("%v != %v", target, b) + } +} + +func TestExportTimestamps(t *testing.T) { + client := getClient() + schema := NewSchema() + index := schema.Index("export-index-timestamps") + field := index.Field("f1", OptFieldTypeTime(TimeQuantumYearMonthDayHour)) + err := client.SyncSchema(schema) + if err != nil { + t.Fatal(err) + } + + t1 := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC) + t2 := time.Date(2019, 2, 2, 0, 0, 0, 0, time.UTC) + _, err = client.Query(index.BatchQuery( + field.SetTimestamp(1, 100, t1), + field.SetTimestamp(2, 1000, t2), + )) + if err != nil { + t.Fatal(err) + } + + // without timestamps + r, err := client.ExportField(field) + if err != nil { + t.Fatal(err) + } + b, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + target := []byte("1,100\n2,1000\n") + if !reflect.DeepEqual(target, b) { + t.Fatalf("%v != %v", target, b) + } + + // with timestamps + r, err = client.ExportField(field, OptExportTimestamps(true)) + if err != nil { + t.Fatal(err) + } + b, err = ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + target = []byte("1,100,2019-01-01T00:00\n2,1000,2019-02-02T00:00\n") + if !reflect.DeepEqual(target, b) { + t.Fatalf("%v != %v", target, b) + } +} + func failingImportColumns(field *Field, shard uint64, records []Record, nodes []fragmentNode, options *ImportOptions, state *importState) error { if len(records) > 0 { return errors.New("some error") @@ -886,7 +965,7 @@ func TestExportReaderFailure(t *testing.T) { 0: uri, } client, _ := NewClient(uri, OptClientRetries(0)) - reader := newExportReader(client, shardURIs, field) + reader := newExportReader(client, shardURIs, field, false) buf := make([]byte, 1000) _, err = reader.Read(buf) if err == nil { @@ -904,7 +983,7 @@ func TestExportReaderReadBodyFailure(t *testing.T) { field := index.Field("exportfield") shardURIs := map[uint64]*URI{0: uri} client, _ := NewClient(uri, OptClientRetries(0)) - reader := newExportReader(client, shardURIs, field) + reader := newExportReader(client, shardURIs, field, false) buf := make([]byte, 1000) _, err = reader.Read(buf) if err == nil {