Skip to content

Commit

Permalink
ios: zero-copy block stats
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 24, 2023
1 parent 00057e0 commit d03ad79
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 126 deletions.
31 changes: 19 additions & 12 deletions ios/diskstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,23 @@
*/
package ios

type (
diskBlockStat interface {
Reads() int64
ReadBytes() int64
Writes() int64
WriteBytes() int64
IOMs() int64
WriteMs() int64
ReadMs() int64
}
// based on:
// - https://www.kernel.org/doc/Documentation/iostats.txt
// - https://www.kernel.org/doc/Documentation/block/stat.txt
type blockStats struct {
readComplete int64 // 1 - # of reads completed
readMerged int64 // 2 - # of reads merged
readSectors int64 // 3 - # of sectors read
readMs int64 // 4 - # ms spent reading
writeComplete int64 // 5 - # writes completed
writeMerged int64 // 6 - # writes merged
writeSectors int64 // 7 - # of sectors written
writeMs int64 // 8 - # of milliseconds spent writing
ioPending int64 // 9 - # of I/Os currently in progress
ioMs int64 // 10 - # of milliseconds spent doing I/Os
ioMsWeighted int64 // 11 - weighted # of milliseconds spent doing I/Os
// 12 - 15: discard I/Os, discard merges, discard sectors, discard ticks
// 16, 17: flash I/Os, flash ticks, as per https://github.com/sysstat/sysstat/blob/master/iostat.c
}

diskBlockStats map[string]diskBlockStat
)
type allBlockStats map[string]*blockStats
52 changes: 18 additions & 34 deletions ios/diskstats_darwin.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Package ios is a collection of interfaces to the local storage subsystem;
// the package includes OS-dependent implementations for those interfaces.
/*
* Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
*/
package ios

Expand All @@ -10,46 +10,30 @@ import (
"github.com/lufia/iostat"
)

type dblockStat struct {
readBytes int64
readMs int64
writeBytes int64
writeMs int64
ioMs int64
}

// interface guard
var (
_ diskBlockStat = (*dblockStat)(nil)
)

// readStats returns disk stats
func readStats(disks, _ cos.StrKVs) diskBlockStats {
func readStats(disks, _ cos.StrKVs, all allBlockStats) {
driveStats, err := iostat.ReadDriveStats()
if err != nil {
return diskBlockStats{}
return
}

dblockStats := make(diskBlockStats, len(disks))
for _, driveStat := range driveStats {
dblockStats[driveStat.Name] = dblockStat{
readBytes: driveStat.BytesRead,
readMs: driveStat.TotalReadTime.Milliseconds(),
writeBytes: driveStat.BytesWritten,
writeMs: driveStat.TotalWriteTime.Milliseconds(),
ioMs: driveStat.TotalReadTime.Milliseconds() + driveStat.TotalWriteTime.Milliseconds(),
for _, stats := range driveStats {
ds := all[stats.Name]
*ds = blockStats{
readSectors: stats.BytesRead / 512, // HACK
readMs: stats.TotalReadTime.Milliseconds(),
writeSectors: stats.BytesWritten / 512, // ditto
writeMs: stats.TotalWriteTime.Milliseconds(),
ioMs: stats.TotalReadTime.Milliseconds() + stats.TotalWriteTime.Milliseconds(),
}
}
return dblockStats
}

func (dblockStat) Reads() int64 { return 0 } // TODO: not implemented
func (dbs dblockStat) ReadBytes() int64 { return dbs.readBytes }
func (dblockStat) Writes() int64 { return 0 } // TODO: not implemented
func (dbs dblockStat) WriteBytes() int64 { return dbs.writeBytes }
func (dbs dblockStat) IOMs() int64 { return dbs.ioMs }
func (dbs dblockStat) WriteMs() int64 { return dbs.writeMs }
func (dbs dblockStat) ReadMs() int64 { return dbs.readMs }
func (*blockStats) Reads() int64 { return 0 } // TODO: not implemented
func (ds *blockStats) ReadBytes() int64 { return ds.readSectors * 512 }
func (*blockStats) Writes() int64 { return 0 } // TODO: not implemented
func (ds *blockStats) WriteBytes() int64 { return ds.writeSectors * 512 }
func (ds *blockStats) IOMs() int64 { return ds.ioMs }
func (ds *blockStats) WriteMs() int64 { return ds.writeMs }
func (ds *blockStats) ReadMs() int64 { return ds.readMs }

// NVMe multipathing - Linux only
// * nvmeInN: instance I namespace N
Expand Down
107 changes: 39 additions & 68 deletions ios/diskstats_linux.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Package ios is a collection of interfaces to the local storage subsystem;
// the package includes OS-dependent implementations for those interfaces.
/*
* Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
*/
package ios

Expand All @@ -13,83 +13,48 @@ import (

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
)

// The "sectors" in question are the standard UNIX 512-byte sectors, not any device- or filesystem-specific block size
// (from https://www.kernel.org/doc/Documentation/block/stat.txt)
const sectorSize = int64(512)

// based on https://www.kernel.org/doc/Documentation/iostats.txt
//
// and https://www.kernel.org/doc/Documentation/block/stat.txt
type dblockStat struct {
readComplete int64 // 1 - # of reads completed
readMerged int64 // 2 - # of reads merged
readSectors int64 // 3 - # of sectors read
readMs int64 // 4 - # ms spent reading
writeComplete int64 // 5 - # writes completed
writeMerged int64 // 6 - # writes merged
writeSectors int64 // 7 - # of sectors written
writeMs int64 // 8 - # of milliseconds spent writing
ioPending int64 // 9 - # of I/Os currently in progress
ioMs int64 // 10 - # of milliseconds spent doing I/Os
ioMsWeighted int64 // 11 - weighted # of milliseconds spent doing I/Os
// 12 - 15: discard I/Os, discard merges, discard sectors, discard ticks
// 16, 17: flash I/Os, flash ticks, as per https://github.com/sysstat/sysstat/blob/master/iostat.c
}

// interface guard
var (
_ diskBlockStat = (*dblockStat)(nil)
)

var blockStats = make(diskBlockStats, 10)

// readStats returns disk stats
func readStats(disks, sysfnames cos.StrKVs) diskBlockStats {
for d := range disks {
stat, ok := _read(sysfnames[d])
if !ok {
continue
}
blockStats[d] = stat
func readStats(disks, sysfnames cos.StrKVs, all allBlockStats) {
for disk := range disks {
ds, ok := all[disk]
debug.Assert(ok, disk)
_ = _read(sysfnames[disk], ds)
}
return blockStats
}

// https://www.kernel.org/doc/Documentation/block/stat.txt
func _read(sysfn string) (dblockStat, bool) {
func _read(sysfn string, ds *blockStats) bool {
file, err := os.Open(sysfn)
if err != nil {
nlog.Errorf("%s: %v", sysfn, err)
return dblockStat{}, false
return false
}
scanner := bufio.NewScanner(file)
scanner.Scan()
fields := strings.Fields(scanner.Text())

_ = file.Close()
if len(fields) < 11 {
return dblockStat{}, false
return false
}
return _extact(fields, 0), true
}

func _extact(fields []string, offset int) dblockStat {
return dblockStat{
_exI64(fields[offset]),
_exI64(fields[offset+1]),
_exI64(fields[offset+2]),
_exI64(fields[offset+3]),
_exI64(fields[offset+4]),
_exI64(fields[offset+5]),
_exI64(fields[offset+6]),
_exI64(fields[offset+7]),
_exI64(fields[offset+8]),
_exI64(fields[offset+9]),
_exI64(fields[offset+10]),
*ds = blockStats{
_exI64(fields[0]),
_exI64(fields[1]),
_exI64(fields[2]),
_exI64(fields[3]),
_exI64(fields[4]),
_exI64(fields[5]),
_exI64(fields[6]),
_exI64(fields[7]),
_exI64(fields[8]),
_exI64(fields[9]),
_exI64(fields[10]),
}
return true
}

func _exI64(field string) int64 {
Expand All @@ -98,13 +63,13 @@ func _exI64(field string) int64 {
return val
}

func (dbs dblockStat) Reads() int64 { return dbs.readComplete }
func (dbs dblockStat) ReadBytes() int64 { return dbs.readSectors * sectorSize }
func (dbs dblockStat) Writes() int64 { return dbs.writeComplete }
func (dbs dblockStat) WriteBytes() int64 { return dbs.writeSectors * sectorSize }
func (dbs dblockStat) IOMs() int64 { return dbs.ioMs }
func (dbs dblockStat) WriteMs() int64 { return dbs.writeMs }
func (dbs dblockStat) ReadMs() int64 { return dbs.readMs }
func (ds *blockStats) Reads() int64 { return ds.readComplete }
func (ds *blockStats) ReadBytes() int64 { return ds.readSectors * sectorSize }
func (ds *blockStats) Writes() int64 { return ds.writeComplete }
func (ds *blockStats) WriteBytes() int64 { return ds.writeSectors * sectorSize }
func (ds *blockStats) IOMs() int64 { return ds.ioMs }
func (ds *blockStats) WriteMs() int64 { return ds.writeMs }
func (ds *blockStats) ReadMs() int64 { return ds.readMs }

// NVMe multipathing
// * nvmeInN: instance I namespace N
Expand Down Expand Up @@ -143,8 +108,11 @@ func icn(disk, dir string) (cdisk string) {
}

func icnPath(dir, cdir, mountpath string) bool {
stats, ok := _read(dir)
cstats, cok := _read(cdir)
var (
stats, cstats blockStats
)
ok := _read(dir, &stats)
cok := _read(cdir, &cstats)
if !cok {
return false
}
Expand All @@ -167,8 +135,11 @@ func icnPath(dir, cdir, mountpath string) bool {
err = os.Remove(fqn)
debug.AssertNoErr(err)

stats2, ok := _read(dir)
cstats2, cok := _read(cdir)
var (
stats2, cstats2 blockStats
)
ok = _read(dir, &stats2)
cok = _read(cdir, &cstats2)
debug.Assert(ok && cok)
return stats.writeComplete == stats2.writeComplete && cstats.writeComplete < cstats2.writeComplete
}
25 changes: 13 additions & 12 deletions ios/iostat.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type (
mpath2disks map[string]FsDisks
disk2mpath cos.StrKVs
disk2sysfn cos.StrKVs
blockStats allBlockStats
lsblk ratomic.Pointer[LsBlk]
cache ratomic.Pointer[cache]
cacheHst [16]*cache
Expand Down Expand Up @@ -103,6 +104,7 @@ func New(num int) IOS {
mpath2disks: make(map[string]FsDisks, num),
disk2mpath: make(cos.StrKVs, num),
disk2sysfn: make(cos.StrKVs, num),
blockStats: make(allBlockStats, num),
}
for i := 0; i < len(ios.cacheHst); i++ {
ios.cacheHst[i] = newCache(num)
Expand Down Expand Up @@ -185,6 +187,7 @@ func (ios *ios) _add(mpath string, fsdisks FsDisks, testingEnv bool) (err error)
return
}
ios.disk2mpath[disk] = mpath
ios.blockStats[disk] = &blockStats{}
}
for disk, mountpath := range ios.disk2mpath {
if _, ok := ios.disk2sysfn[disk]; !ok {
Expand Down Expand Up @@ -265,6 +268,7 @@ func (ios *ios) _delDisk(mpath, disk string) {
}
debug.Assertf(mp == mpath, "(mpath %s => disk %s => mpath %s) violation", mp, disk, mpath)
delete(ios.disk2mpath, disk)
delete(ios.blockStats, disk)
}

//
Expand Down Expand Up @@ -365,24 +369,21 @@ func (ios *ios) _ref(config *cmn.Config) (ncache *cache, maxUtil int64, missingI
}
}

osDiskStats := readStats(ios.disk2mpath, ios.disk2sysfn)
readStats(ios.disk2mpath, ios.disk2sysfn, ios.blockStats)
for disk, mpath := range ios.disk2mpath {
ncache.rbps[disk] = 0
ncache.wbps[disk] = 0
ncache.util[disk] = 0
ncache.ravg[disk] = 0
ncache.wavg[disk] = 0
osDisk, ok := osDiskStats[disk]
if !ok {
continue
}
ncache.ioms[disk] = osDisk.IOMs()
ncache.rms[disk] = osDisk.ReadMs()
ncache.rbytes[disk] = osDisk.ReadBytes()
ncache.reads[disk] = osDisk.Reads()
ncache.wms[disk] = osDisk.WriteMs()
ncache.wbytes[disk] = osDisk.WriteBytes()
ncache.writes[disk] = osDisk.Writes()
ds := ios.blockStats[disk]
ncache.ioms[disk] = ds.IOMs()
ncache.rms[disk] = ds.ReadMs()
ncache.rbytes[disk] = ds.ReadBytes()
ncache.reads[disk] = ds.Reads()
ncache.wms[disk] = ds.WriteMs()
ncache.wbytes[disk] = ds.WriteBytes()
ncache.writes[disk] = ds.Writes()

if _, ok := statsCache.ioms[disk]; !ok {
missingInfo = true
Expand Down

0 comments on commit d03ad79

Please sign in to comment.