From 6e4414b2a7ea5a92edfb925868ea21f653e2a7fd Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Tue, 2 Jan 2024 19:27:18 -0500 Subject: [PATCH] prefetch v2; cold-get stats; put size * cold-get counters: semantics; add missing * add scripted test: prefetch latest - requires s3cmd * free put params back to pool * consistently provide put size - all scenarios except one dsort shard * up cli * part three, prev. commit: 3699b60ddad59212 Signed-off-by: Alex Aizman --- ais/backend/aws.go | 1 + ais/backend/azure.go | 1 + ais/backend/common.go | 1 + ais/backend/gcp.go | 1 + ...te_script_test.go => scripted_cli_test.go} | 24 ++++++++++++++++++- ais/test/scripts/s3-prefetch-latest.sh | 22 ++++------------- ais/tgtimpl.go | 16 +++++++++++-- ais/tgtobj.go | 14 +++++------ cmd/cli/cli/utils.go | 17 ++++++++++--- cmd/cli/go.mod | 2 +- cmd/cli/go.sum | 4 ++-- core/ldp.go | 3 ++- core/target.go | 1 + ec/ec.go | 1 + ext/dload/task.go | 1 + ext/dsort/dsort.go | 2 ++ ext/dsort/manager.go | 1 + stats/target_stats.go | 4 ++++ xact/xs/tcb.go | 1 + xact/xs/tcobjs.go | 1 + 20 files changed, 83 insertions(+), 35 deletions(-) rename ais/test/{get_validate_script_test.go => scripted_cli_test.go} (70%) diff --git a/ais/backend/aws.go b/ais/backend/aws.go index f3655e565f5..de3ef5414d1 100644 --- a/ais/backend/aws.go +++ b/ais/backend/aws.go @@ -351,6 +351,7 @@ func (awsp *awsProvider) GetObj(ctx context.Context, lom *core.LOM, owt cmn.OWT) } params := allocPutObjParams(res, owt) err := awsp.t.PutObject(lom, params) + core.FreePutObjParams(params) if superVerbose { nlog.Infoln("[get_object]", lom.String(), err) } diff --git a/ais/backend/azure.go b/ais/backend/azure.go index 9f3e54d5eaa..aa998ef43c8 100644 --- a/ais/backend/azure.go +++ b/ais/backend/azure.go @@ -359,6 +359,7 @@ func (ap *azureProvider) GetObj(ctx context.Context, lom *core.LOM, owt cmn.OWT) } params := allocPutObjParams(res, owt) err := ap.t.PutObject(lom, params) + core.FreePutObjParams(params) if superVerbose { nlog.Infoln("[get_object]", lom.String(), err) } diff --git a/ais/backend/common.go b/ais/backend/common.go index f81ff570272..e0c1cbf1197 100644 --- a/ais/backend/common.go +++ b/ais/backend/common.go @@ -42,6 +42,7 @@ func allocPutObjParams(res core.GetReaderResult, owt cmn.OWT) *core.PutObjectPar params.Reader = res.R params.OWT = owt params.Cksum = res.ExpCksum + params.Size = res.Size params.Atime = time.Now() params.ColdGET = true } diff --git a/ais/backend/gcp.go b/ais/backend/gcp.go index 6179f045ac3..4b1646f3e07 100644 --- a/ais/backend/gcp.go +++ b/ais/backend/gcp.go @@ -309,6 +309,7 @@ func (gcpp *gcpProvider) GetObj(ctx context.Context, lom *core.LOM, owt cmn.OWT) } params := allocPutObjParams(res, owt) err := gcpp.t.PutObject(lom, params) + core.FreePutObjParams(params) if superVerbose { nlog.Infoln("[get_object]", lom.String(), err) } diff --git a/ais/test/get_validate_script_test.go b/ais/test/scripted_cli_test.go similarity index 70% rename from ais/test/get_validate_script_test.go rename to ais/test/scripted_cli_test.go index 66703fc2997..52845457237 100644 --- a/ais/test/get_validate_script_test.go +++ b/ais/test/scripted_cli_test.go @@ -1,6 +1,6 @@ // Package integration_test. /* - * Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. All rights reserved. */ package integration_test @@ -63,3 +63,25 @@ func TestGetWarmValidateRemaisUsingScript(t *testing.T) { } tassert.CheckFatal(t, err) } + +func TestPrefetchLatestUsingScript(t *testing.T) { + tools.CheckSkip(t, &tools.SkipTestArgs{ + CloudBck: true, + Bck: cliBck, + }) + // note additional limitation + normp, _ := cmn.NormalizeProvider(cliBck.Provider) + if normp != apc.AWS { + t.Skipf("skipping %s - the test uses s3cmd (command line tool) and requires s3 bucket (see \"prerequisites\")", t.Name()) + } + + var ( + bucketName = cliBck.Cname("") + cmd = exec.Command("./scripts/s3-prefetch-latest.sh", "--bucket", bucketName) + ) + out, err := cmd.CombinedOutput() + if len(out) > 0 { + tlog.Logln(string(out)) + } + tassert.CheckFatal(t, err) +} diff --git a/ais/test/scripts/s3-prefetch-latest.sh b/ais/test/scripts/s3-prefetch-latest.sh index c8900e5b9f5..0c87b05a29b 100755 --- a/ais/test/scripts/s3-prefetch-latest.sh +++ b/ais/test/scripts/s3-prefetch-latest.sh @@ -29,7 +29,7 @@ while (( "$#" )); do done ## uncomment for verbose output -set -x ## DEBUG +## set -x ## establish existence ais show bucket $bucket -c 1>/dev/null || exit $? @@ -50,8 +50,7 @@ echo "1. out-of-band PUT: first version" echo $lorem | s3cmd put - "$bucket/lorem-duis" $host 1>/dev/null || exit $? echo "2. prefetch, and check" -ais prefetch "$bucket/lorem-duis" -sleep 5 ############################# DEBUG +ais prefetch "$bucket/lorem-duis" --wait checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}') [[ "$checksum" == "$sum1" ]] || { echo "FAIL: $checksum != $sum1"; exit 1; } @@ -59,8 +58,7 @@ echo "3. out-of-band PUT: 2nd version (overwrite)" echo $duis | s3cmd put - "$bucket/lorem-duis" $host 1>/dev/null || exit $? echo "4. prefetch and check (expecting the first version's checksum)" -ais prefetch "$bucket/lorem-duis" -sleep 5 ############################# DEBUG +ais prefetch "$bucket/lorem-duis" --wait checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}') [[ "$checksum" != "$sum2" ]] || { echo "FAIL: $checksum == $sum2"; exit 1; } @@ -68,8 +66,7 @@ echo "5. query cold-get count (statistics)" cnt1=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}') echo "6. prefetch latest: detect version change and trigger cold GET" -ais prefetch "$bucket/lorem-duis" --latest -sleep 5 ############################# DEBUG +ais prefetch "$bucket/lorem-duis" --latest --wait checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}') [[ "$checksum" == "$sum2" ]] || { echo "FAIL: $checksum != $sum2"; exit 1; } @@ -78,21 +75,12 @@ cnt2=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{pr [[ $cnt2 == $(($cnt1+1)) ]] || { echo "FAIL: $cnt2 != $(($cnt1+1))"; exit 1; } echo "8. warm GET must remain \"warm\" and cold-get-count must not increment" -ais get "$bucket/lorem-duis" /dev/null 1>/dev/null +ais get "$bucket/lorem-duis" /dev/null --latest 1>/dev/null checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}') [[ "$checksum" == "$sum2" ]] || { echo "FAIL: $checksum != $sum2"; exit 1; } cnt3=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}') [[ $cnt3 == $cnt2 ]] || { echo "FAIL: $cnt3 != $cnt2"; exit 1; } -echo "9. out-of-band DELETE" -s3cmd del "$bucket/lorem-duis" $host 1>/dev/null || exit $? - -echo "10. warm GET must (silently) trigger deletion" -ais get "$bucket/lorem-duis" /dev/null --silent 1>/dev/null 2>&1 -[[ $? != 0 ]] || { echo "FAIL: expecting GET error, got $?"; exit 1; } -ais ls "$bucket/lorem-duis" --cached --silent -H 2>/dev/null -[[ $? != 0 ]] || { echo "FAIL: expecting 'show object' error, got $?"; exit 1; } - echo -e ais show performance counters --regex "(GET-COLD$|VERSION-CHANGE$|DELETE)" diff --git a/ais/tgtimpl.go b/ais/tgtimpl.go index b8535b6f0a6..ee4c85f8f01 100644 --- a/ais/tgtimpl.go +++ b/ais/tgtimpl.go @@ -15,11 +15,13 @@ import ( "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/cmn/mono" "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" "github.com/NVIDIA/aistore/fs" "github.com/NVIDIA/aistore/memsys" + "github.com/NVIDIA/aistore/stats" "github.com/NVIDIA/aistore/transport/bundle" "github.com/NVIDIA/aistore/xact/xreg" ) @@ -75,6 +77,7 @@ func (t *target) PutObject(lom *core.LOM, params *core.PutObjectParams) error { poi.workFQN = workFQN poi.atime = params.Atime.UnixNano() poi.xctn = params.Xact + poi.size = params.Size poi.owt = params.OWT poi.skipEC = params.SkipEC poi.coldGET = params.ColdGET @@ -84,6 +87,7 @@ func (t *target) PutObject(lom *core.LOM, params *core.PutObjectParams) error { } _, err := poi.putObject() freePOI(poi) + debug.Assert(err != nil || params.Size <= 0 || params.Size == lom.SizeBytes(true), lom.String(), params.Size, lom.SizeBytes(true)) return err } @@ -155,7 +159,7 @@ func (t *target) CopyObject(lom *core.LOM, dm core.DM, dp core.DP, xact core.Xac return size, err } -// compare with goi.getCold +// use `backend.GetObj` (compare w/ other instances calling `backend.GetObjReader`) func (t *target) GetCold(ctx context.Context, lom *core.LOM, owt cmn.OWT) (errCode int, err error) { // 1. lock switch owt { @@ -178,7 +182,8 @@ func (t *target) GetCold(ctx context.Context, lom *core.LOM, owt cmn.OWT) (errCo return } - // 2. get from the remote B + // 2. GET remote object and store it + now := mono.NanoTime() if errCode, err = t.Backend(lom.Bck()).GetObj(ctx, lom, owt); err != nil { if owt != cmn.OwtGetPrefetchLock { lom.Unlock(true) @@ -194,6 +199,13 @@ func (t *target) GetCold(ctx context.Context, lom *core.LOM, owt cmn.OWT) (errCo case cmn.OwtGetTryLock, cmn.OwtGetLock: lom.Unlock(true) } + + // 4. stats + t.statsT.AddMany( + cos.NamedVal64{Name: stats.GetColdCount, Value: 1}, + cos.NamedVal64{Name: stats.GetColdSize, Value: lom.SizeBytes()}, + cos.NamedVal64{Name: stats.GetColdRwLatency, Value: mono.SinceNano(now)}, + ) return } diff --git a/ais/tgtobj.go b/ais/tgtobj.go index b5472bf3648..dcb1edb47a8 100644 --- a/ais/tgtobj.go +++ b/ais/tgtobj.go @@ -392,7 +392,7 @@ func (poi *putOI) write() (buf []byte, slab *memsys.Slab, lmfh *os.File, err err if lmfh, err = poi.lom.CreateFile(poi.workFQN); err != nil { return } - if poi.size == 0 { + if poi.size <= 0 { buf, slab = poi.t.gmm.Alloc() } else { buf, slab = poi.t.gmm.AllocSize(poi.size) @@ -619,7 +619,7 @@ do: } goi.lom.SetAtimeUnix(goi.atime) - if loaded, err = goi.coldLock(); err != nil { + if loaded, err = goi._coldLock(); err != nil { return } if loaded { @@ -629,7 +629,7 @@ do: // zero-out prev. version custom metadata, if any goi.lom.SetCustomMD(nil) - // backend: read remote + // get remote reader (compare w/ t.GetCold) res = goi.t.Backend(goi.lom.Bck()).GetObjReader(goi.ctx, goi.lom) if res.Err != nil { goi.lom.Unlock(true) @@ -653,7 +653,7 @@ do: } // regular path - errCode, err = goi.coldPut(&res) + errCode, err = goi._coldPut(&res) if err != nil { goi.unlocked = true return @@ -687,7 +687,7 @@ fin: // upgrade rlock => wlock // done early to prevent multiple cold-readers duplicating network/disk operation and overwriting each other -func (goi *getOI) coldLock() (loaded bool, err error) { +func (goi *getOI) _coldLock() (loaded bool, err error) { var ( t, lom = goi.t, goi.lom now int64 @@ -713,8 +713,7 @@ outer: return } -// see also: t.GetCold() and goi.coldMem() -func (goi *getOI) coldPut(res *core.GetReaderResult) (int, error) { +func (goi *getOI) _coldPut(res *core.GetReaderResult) (int, error) { var ( t, lom = goi.t, goi.lom poi = allocPOI() @@ -739,7 +738,6 @@ func (goi *getOI) coldPut(res *core.GetReaderResult) (int, error) { nlog.Infoln(ftcg+"(put)", lom.Cname(), err) return code, err } - goi.t.statsT.Add(stats.GetColdRwLatency, mono.SinceNano(goi.ltime)) // load, downgrade lock, inc stats if err = lom.Load(true /*cache it*/, true /*locked*/); err != nil { diff --git a/cmd/cli/cli/utils.go b/cmd/cli/cli/utils.go index 28bbf2af249..4dc3c01353c 100644 --- a/cmd/cli/cli/utils.go +++ b/cmd/cli/cli/utils.go @@ -617,10 +617,21 @@ func ensureRemoteProvider(bck cmn.Bck) error { if !apc.IsProvider(bck.Provider) { return fmt.Errorf("invalid bucket %q: missing backend provider", bck) } - if !bck.IsRemote() { - return fmt.Errorf("invalid bucket %q: expecting remote backend", bck) + if bck.IsRemote() { + return nil } - return nil + if bck.Props == nil { + // double-take: ais:// bucket with remote backend? + p, err := headBucket(bck, true) + if err != nil { + return err + } + bck.Props = p + if bck.IsRemote() { + return nil // yes it is + } + } + return fmt.Errorf("invalid bucket %q: expecting remote backend", bck) } func parseURLtoBck(strURL string) (bck cmn.Bck) { diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index 7e786d3fda4..ebc927b61bd 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -4,7 +4,7 @@ go 1.21 // direct require ( - github.com/NVIDIA/aistore v1.3.22-0.20240102182613-70b89290d39e + github.com/NVIDIA/aistore v1.3.22-0.20240102182946-3699b60ddad5 github.com/fatih/color v1.16.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index 2b5caf7de0b..43dc7b70c7c 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.22-0.20240102182613-70b89290d39e h1:6PgUhYxH7iDCQR97KAnvw1l0TkMvTkA3qdBQ/i72j5Y= -github.com/NVIDIA/aistore v1.3.22-0.20240102182613-70b89290d39e/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I= +github.com/NVIDIA/aistore v1.3.22-0.20240102182946-3699b60ddad5 h1:H0MwsbtH+5oZczV1LGPVCBCLk3/uX8S5y8nDJZnyA+U= +github.com/NVIDIA/aistore v1.3.22-0.20240102182946-3699b60ddad5/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= diff --git a/core/ldp.go b/core/ldp.go index 88386c3fe03..64dbc36e5a4 100644 --- a/core/ldp.go +++ b/core/ldp.go @@ -90,7 +90,8 @@ func (*LDP) Reader(lom *LOM, latestVer bool) (cos.ReadOpenCloser, cos.OAH, error } remote: - // cold GetObjReader and return oah (holder) to represent non-existing object + // GetObjReader and return remote (object) reader and oah for object metadata + // (compare w/ T.GetCold) lom.SetAtimeUnix(time.Now().UnixNano()) oah := &cmn.ObjAttrs{ Ver: "", // TODO: differentiate between copying (same version) vs. transforming diff --git a/core/target.go b/core/target.go index 023e45822d7..a9705162258 100644 --- a/core/target.go +++ b/core/target.go @@ -95,6 +95,7 @@ type ( Atime time.Time Xact Xact WorkTag string // (=> work fqn) + Size int64 OWT cmn.OWT SkipEC bool // don't erasure-code when finalizing ColdGET bool // this PUT is in fact a cold-GET diff --git a/ec/ec.go b/ec/ec.go index 9730abe275b..c223792fa43 100644 --- a/ec/ec.go +++ b/ec/ec.go @@ -404,6 +404,7 @@ func writeObject(lom *core.LOM, reader io.Reader, size int64, xctn core.Xact) er params.Reader = readCloser params.SkipEC = true params.Atime = time.Now() + params.Size = size params.Xact = xctn // to avoid changing version; TODO: introduce cmn.OwtEC params.OWT = cmn.OwtMigrateRepl diff --git a/ext/dload/task.go b/ext/dload/task.go index 9d78312f3c0..d7df39e4873 100644 --- a/ext/dload/task.go +++ b/ext/dload/task.go @@ -150,6 +150,7 @@ func (task *singleTask) _dput(lom *core.LOM, req *http.Request, resp *http.Respo params.Reader = r params.OWT = cmn.OwtPut params.Atime = task.started.Load() + params.Size = size params.Xact = task.xdl } erp := core.T.PutObject(lom, params) diff --git a/ext/dsort/dsort.go b/ext/dsort/dsort.go index 737ba882798..b8f1a96fcc8 100644 --- a/ext/dsort/dsort.go +++ b/ext/dsort/dsort.go @@ -299,6 +299,8 @@ func (m *Manager) createShard(s *shard.Shard, lom *core.LOM) (err error) { // TODO: params.Xact - in part, to count PUTs and bytes in a generic fashion // (vs metrics.ShardCreationStats.updateThroughput - see below) + + // TODO: add params.Size = (size resulting from shardRW.Create below) } err = core.T.PutObject(lom, params) core.FreePutObjParams(params) diff --git a/ext/dsort/manager.go b/ext/dsort/manager.go index 11c56379bca..804e2acd614 100644 --- a/ext/dsort/manager.go +++ b/ext/dsort/manager.go @@ -635,6 +635,7 @@ func (m *Manager) recvShard(hdr *transport.ObjHdr, objReader io.Reader, err erro params.Reader = rc params.Cksum = nil params.Atime = started + params.Size = hdr.ObjAttrs.Size } erp := core.T.PutObject(lom, params) core.FreePutObjParams(params) diff --git a/stats/target_stats.go b/stats/target_stats.go index 45910afe901..93603ca8e1b 100644 --- a/stats/target_stats.go +++ b/stats/target_stats.go @@ -35,6 +35,10 @@ import ( // -> "*.id" - ID const ( // KindCounter & KindSize - always incremented + + // NOTE semantics: + // - counts all instances when remote GET is followed by storing of the new object (version) locally + // - does _not_ count assorted calls to `GetObjReader` (e.g., via tcb/tco -> LDP.Reader) GetColdCount = "get.cold.n" GetColdSize = "get.cold.size" diff --git a/xact/xs/tcb.go b/xact/xs/tcb.go index 3c7ce143aef..1516825dff0 100644 --- a/xact/xs/tcb.go +++ b/xact/xs/tcb.go @@ -285,6 +285,7 @@ func (r *XactTCB) _recv(hdr *transport.ObjHdr, objReader io.Reader, lom *core.LO params.Reader = io.NopCloser(objReader) params.Cksum = hdr.ObjAttrs.Cksum params.Xact = r + params.Size = hdr.ObjAttrs.Size // Transaction is used only by CopyBucket and ETL. In both cases new objects // are created at the destination. Setting `OwtPut` type informs `t.PutObject()` diff --git a/xact/xs/tcobjs.go b/xact/xs/tcobjs.go index ea4334f84ac..1de8e9062ad 100644 --- a/xact/xs/tcobjs.go +++ b/xact/xs/tcobjs.go @@ -267,6 +267,7 @@ func (r *XactTCObjs) _put(hdr *transport.ObjHdr, objReader io.Reader, lom *core. params.Reader = io.NopCloser(objReader) params.Cksum = hdr.ObjAttrs.Cksum params.Xact = r + params.Size = hdr.ObjAttrs.Size // Transaction is used only by CopyBucket and ETL. In both cases, new objects // are created at the destination. Setting `OwtPut` type informs `t.PutObject()`