Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Added support for timestamp export #242

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,10 +737,18 @@ func (c *Client) importRoaringBitmap(uri *URI, field *Field, shard uint64, views
}

// ExportField exports columns for a field.
func (c *Client) ExportField(field *Field) (io.Reader, error) {
func (c *Client) ExportField(field *Field, opts ...ExportOption) (io.Reader, error) {
span := c.tracer.StartSpan("Client.ExportField")
defer span.Finish()

exportOpts := &exportOptions{}
for _, opt := range opts {
err := opt(exportOpts)
if err != nil {
return nil, err
}
}

var shardsMax map[string]uint64
var err error

Expand All @@ -758,7 +766,7 @@ func (c *Client) ExportField(field *Field) (io.Reader, error) {
return nil, err
}

return newExportReader(c, shardURIs, field), nil
return newExportReader(c, shardURIs, field, exportOpts.timestamps), nil
}

// Info returns the server's configuration/host information.
Expand Down Expand Up @@ -1690,6 +1698,21 @@ func importRecordsFunction(fun func(field *Field,
}
}

type exportOptions struct {
timestamps bool
}

// ExportOption is used when running exports.
type ExportOption func(options *exportOptions) error

// OptExportTimestamps enables exporting timestamps for time fields.
func OptExportTimestamps(enable bool) ExportOption {
return func(options *exportOptions) error {
options.timestamps = enable
return nil
}
}

type fragmentNodeRoot struct {
URI fragmentNode `json:"uri"`
}
Expand Down Expand Up @@ -1811,14 +1834,16 @@ type exportReader struct {
bodyIndex int
currentShard uint64
shardCount uint64
timestamps bool
}

func newExportReader(client *Client, shardURIs map[uint64]*URI, field *Field) *exportReader {
func newExportReader(client *Client, shardURIs map[uint64]*URI, field *Field, timestamps bool) *exportReader {
return &exportReader{
client: client,
shardURIs: shardURIs,
field: field,
shardCount: uint64(len(shardURIs)),
timestamps: timestamps,
}
}

Expand All @@ -1833,9 +1858,14 @@ func (r *exportReader) Read(p []byte) (n int, err error) {
headers := map[string]string{
"Accept": "text/csv",
}
path := fmt.Sprintf("/export?index=%s&field=%s&shard=%d",
r.field.index.Name(), r.field.Name(), r.currentShard)
resp, err := r.client.doRequest(uri, "GET", path, headers, nil)
u := url.URL{Path: "/export"}
u.RawQuery = url.Values{
"index": {r.field.index.Name()},
"field": {r.field.Name()},
"shard": {strconv.FormatUint(r.currentShard, 10)},
"timestamps": {strconv.FormatBool(r.timestamps)},
}.Encode()
resp, err := r.client.doRequest(uri, "GET", u.String(), headers, nil)
if err = anyError(resp, err); err != nil {
return 0, errors.Wrap(err, "doing export request")
}
Expand Down
83 changes: 81 additions & 2 deletions client_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,85 @@ func TestImportWithBatchSizeExpectingZero(t *testing.T) {
}
}

func TestExportNoTimestamps(t *testing.T) {
client := getClient()
schema := NewSchema()
index := schema.Index("export-index")
field := index.Field("f1")
err := client.SyncSchema(schema)
if err != nil {
t.Fatal(err)
}
_, err = client.Query(index.BatchQuery(
field.Set(1, 100),
field.Set(2, 1000),
))
if err != nil {
t.Fatal(err)
}
r, err := client.ExportField(field)
if err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
target := []byte("1,100\n2,1000\n")
if !reflect.DeepEqual(target, b) {
t.Fatalf("%v != %v", target, b)
}
}

func TestExportTimestamps(t *testing.T) {
client := getClient()
schema := NewSchema()
index := schema.Index("export-index-timestamps")
field := index.Field("f1", OptFieldTypeTime(TimeQuantumYearMonthDayHour))
err := client.SyncSchema(schema)
if err != nil {
t.Fatal(err)
}

t1 := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
t2 := time.Date(2019, 2, 2, 0, 0, 0, 0, time.UTC)
_, err = client.Query(index.BatchQuery(
field.SetTimestamp(1, 100, t1),
field.SetTimestamp(2, 1000, t2),
))
if err != nil {
t.Fatal(err)
}

// without timestamps
r, err := client.ExportField(field)
if err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
target := []byte("1,100\n2,1000\n")
if !reflect.DeepEqual(target, b) {
t.Fatalf("%v != %v", target, b)
}

// with timestamps
r, err = client.ExportField(field, OptExportTimestamps(true))
if err != nil {
t.Fatal(err)
}
b, err = ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
target = []byte("1,100,2019-01-01T00:00\n2,1000,2019-02-02T00:00\n")
if !reflect.DeepEqual(target, b) {
t.Fatalf("%v != %v", target, b)
}
}

func failingImportColumns(field *Field, shard uint64, records []Record, nodes []fragmentNode, options *ImportOptions, state *importState) error {
if len(records) > 0 {
return errors.New("some error")
Expand Down Expand Up @@ -886,7 +965,7 @@ func TestExportReaderFailure(t *testing.T) {
0: uri,
}
client, _ := NewClient(uri, OptClientRetries(0))
reader := newExportReader(client, shardURIs, field)
reader := newExportReader(client, shardURIs, field, false)
buf := make([]byte, 1000)
_, err = reader.Read(buf)
if err == nil {
Expand All @@ -904,7 +983,7 @@ func TestExportReaderReadBodyFailure(t *testing.T) {
field := index.Field("exportfield")
shardURIs := map[uint64]*URI{0: uri}
client, _ := NewClient(uri, OptClientRetries(0))
reader := newExportReader(client, shardURIs, field)
reader := newExportReader(client, shardURIs, field, false)
buf := make([]byte, 1000)
_, err = reader.Read(buf)
if err == nil {
Expand Down