|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "io" |
| 7 | + "log/slog" |
| 8 | + "math/rand" |
| 9 | + "os" |
| 10 | + "strconv" |
| 11 | + "sync" |
| 12 | + "time" |
| 13 | + |
| 14 | + "github.com/hashicorp/go-cleanhttp" |
| 15 | + "github.com/jedib0t/go-pretty/v6/table" |
| 16 | + "github.com/minio/minio-go/v7" |
| 17 | + "github.com/minio/minio-go/v7/pkg/credentials" |
| 18 | +) |
| 19 | + |
| 20 | +func main() { |
| 21 | + accessKeyID := os.Getenv("S3_ACCESS_KEY_ID") |
| 22 | + secretKey := os.Getenv("S3_ACCESS_KEY_SECRET_KEY") |
| 23 | + region := os.Getenv("S3_REGION") |
| 24 | + endpoint := os.Getenv("S3_ENDPOINT") |
| 25 | + gomaxprocs := os.Getenv("GOMAXPROCS") |
| 26 | + bucket := os.Getenv("S3_BUCKET") |
| 27 | + prefix := os.Getenv("S3_PREFIX") |
| 28 | + objectCount := os.Getenv("MAX_OBJECTS") |
| 29 | + testDuration := os.Getenv("TEST_DURATION") |
| 30 | + useSSL := os.Getenv("USE_SSL") |
| 31 | + |
| 32 | + if accessKeyID == "" || secretKey == "" { |
| 33 | + slog.Error("S3_ACCESS_KEY_ID and S3_ACCESS_KEY_SECRET_KEY are required to run") |
| 34 | + |
| 35 | + os.Exit(1) |
| 36 | + } |
| 37 | + |
| 38 | + if region == "" || endpoint == "" { |
| 39 | + slog.Error("S3_REGION and S3_ENDPOINT are required to run") |
| 40 | + |
| 41 | + os.Exit(1) |
| 42 | + } |
| 43 | + |
| 44 | + if bucket == "" { |
| 45 | + slog.Error("S3_BUCKET is required to run") |
| 46 | + |
| 47 | + os.Exit(1) |
| 48 | + } |
| 49 | + |
| 50 | + threads := 1 |
| 51 | + maxObjects := 1000 |
| 52 | + var err error |
| 53 | + |
| 54 | + if gomaxprocs != "" { |
| 55 | + threads, err = strconv.Atoi(gomaxprocs) |
| 56 | + if err != nil { |
| 57 | + slog.Error("failed to convert GOMAXPROCS to int", "error", err) |
| 58 | + |
| 59 | + os.Exit(1) |
| 60 | + } |
| 61 | + } |
| 62 | + |
| 63 | + if objectCount != "" { |
| 64 | + maxObjects, err = strconv.Atoi(objectCount) |
| 65 | + if err != nil { |
| 66 | + slog.Error("failed to convert MAX_OBJECTS to int", "error", err) |
| 67 | + |
| 68 | + os.Exit(1) |
| 69 | + } |
| 70 | + } |
| 71 | + |
| 72 | + duration := time.Minute * 10 |
| 73 | + |
| 74 | + if testDuration != "" { |
| 75 | + duration, err = time.ParseDuration(testDuration) |
| 76 | + if err != nil { |
| 77 | + slog.Error("failed to parse TEST_DURATION", "error", err) |
| 78 | + |
| 79 | + os.Exit(1) |
| 80 | + } |
| 81 | + } |
| 82 | + |
| 83 | + ssl := true |
| 84 | + if useSSL != "" { |
| 85 | + ssl, err = strconv.ParseBool(useSSL) |
| 86 | + if err != nil { |
| 87 | + slog.Error("failed to parse USE_SSL", "error", err) |
| 88 | + |
| 89 | + os.Exit(1) |
| 90 | + } |
| 91 | + } |
| 92 | + |
| 93 | + ctx, cancel := context.WithCancel(context.Background()) |
| 94 | + transport := cleanhttp.DefaultPooledTransport() |
| 95 | + |
| 96 | + s3Client, err := minio.New(endpoint, &minio.Options{ |
| 97 | + Creds: credentials.NewStaticV4(accessKeyID, secretKey, ""), |
| 98 | + Secure: ssl, |
| 99 | + Transport: transport, |
| 100 | + Region: region, |
| 101 | + BucketLookup: minio.BucketLookupDNS, |
| 102 | + }) |
| 103 | + if err != nil { |
| 104 | + slog.Error("failed to create s3 client", "error", err) |
| 105 | + |
| 106 | + os.Exit(1) |
| 107 | + } |
| 108 | + |
| 109 | + var objects []string |
| 110 | + var sizes []int64 |
| 111 | + |
| 112 | + objChan := s3Client.ListObjects(ctx, bucket, minio.ListObjectsOptions{ |
| 113 | + WithMetadata: true, |
| 114 | + Prefix: prefix, |
| 115 | + MaxKeys: maxObjects, |
| 116 | + }) |
| 117 | + |
| 118 | + for obj := range objChan { |
| 119 | + if obj.Err != nil { |
| 120 | + slog.Error("error listing objects", "error", err) |
| 121 | + |
| 122 | + os.Exit(1) |
| 123 | + } |
| 124 | + |
| 125 | + objects = append(objects, obj.Key) |
| 126 | + sizes = append(sizes, obj.Size) |
| 127 | + } |
| 128 | + |
| 129 | + rng := rand.New(rand.NewSource(time.Now().Unix())) |
| 130 | + timer := time.NewTimer(duration) |
| 131 | + resultChan := make(chan *testResult, threads) |
| 132 | + wg := &sync.WaitGroup{} |
| 133 | + |
| 134 | + params := &testParams{ |
| 135 | + Bucket: bucket, |
| 136 | + RNG: rng, |
| 137 | + Objects: objects, |
| 138 | + ObjectSizes: sizes, |
| 139 | + S3Client: s3Client, |
| 140 | + } |
| 141 | + |
| 142 | + start := time.Now().UTC() |
| 143 | + |
| 144 | + for i := range threads { |
| 145 | + wg.Add(1) |
| 146 | + go func() { |
| 147 | + result := runTest(ctx, params, i) |
| 148 | + resultChan <- result |
| 149 | + wg.Done() |
| 150 | + }() |
| 151 | + } |
| 152 | + |
| 153 | + <-timer.C |
| 154 | + |
| 155 | + slog.Info("test done") |
| 156 | + |
| 157 | + // Tell our tests that we're done and wait |
| 158 | + cancel() |
| 159 | + wg.Wait() |
| 160 | + |
| 161 | + aggregatedBytesRead := 0 |
| 162 | + aggregatedRequestsSent := 0 |
| 163 | + var averageTTLB int64 |
| 164 | + |
| 165 | + for range threads { |
| 166 | + result := <-resultChan |
| 167 | + |
| 168 | + aggregatedBytesRead += int(result.TotalBytesRead) |
| 169 | + aggregatedRequestsSent += int(result.TotalRequestsSent) |
| 170 | + averageTTLB += result.TotalTTLBMS |
| 171 | + } |
| 172 | + |
| 173 | + averageTTLB = averageTTLB / int64(aggregatedRequestsSent) |
| 174 | + |
| 175 | + timeSpent := time.Since(start) |
| 176 | + |
| 177 | + t := table.NewWriter() |
| 178 | + t.SetOutputMirror(os.Stdout) |
| 179 | + t.AppendHeader(table.Row{"", "RESULTS"}) |
| 180 | + t.AppendRow(table.Row{"Time Spent", timeSpent.String()}) |
| 181 | + t.AppendRow(table.Row{"Total Bytes Read", fmt.Sprintf("%d", aggregatedBytesRead)}) |
| 182 | + t.AppendRow(table.Row{"Total Requests Sent", fmt.Sprintf("%d", aggregatedRequestsSent)}) |
| 183 | + t.AppendRow(table.Row{"Average TTLB", fmt.Sprintf("%d", averageTTLB)}) |
| 184 | + |
| 185 | + t.Render() |
| 186 | +} |
| 187 | + |
| 188 | +type testParams struct { |
| 189 | + Bucket string |
| 190 | + RNG *rand.Rand |
| 191 | + Objects []string |
| 192 | + ObjectSizes []int64 |
| 193 | + S3Client *minio.Client |
| 194 | +} |
| 195 | + |
| 196 | +type testResult struct { |
| 197 | + TotalBytesRead int64 |
| 198 | + TotalRequestsSent int64 |
| 199 | + TotalTTLBMS int64 |
| 200 | +} |
| 201 | + |
| 202 | +func runTest(ctx context.Context, params *testParams, id int) *testResult { |
| 203 | + ll := slog.With("ID", id) |
| 204 | + |
| 205 | + ll.Info("starting test") |
| 206 | + |
| 207 | + result := &testResult{} |
| 208 | + |
| 209 | + testCtx := context.Background() |
| 210 | + |
| 211 | + for { |
| 212 | + select { |
| 213 | + case <-ctx.Done(): |
| 214 | + ll.Info("context cancelled, stopping test") |
| 215 | + |
| 216 | + return result |
| 217 | + default: |
| 218 | + // Get our random object |
| 219 | + randObjIndex := params.RNG.Int() % len(params.Objects) |
| 220 | + obj := params.Objects[randObjIndex] |
| 221 | + size := params.ObjectSizes[randObjIndex] |
| 222 | + |
| 223 | + // Get a random 16KiB offset to read |
| 224 | + maxOffset := size / (16 * 1024) |
| 225 | + randObjOffset := params.RNG.Int() % int(maxOffset) |
| 226 | + |
| 227 | + rangeStart := int64(randObjOffset * (16 * 1024)) |
| 228 | + rangeEnd := min(int64((rangeStart + (16 * 1024))), size) |
| 229 | + |
| 230 | + result.TotalRequestsSent++ |
| 231 | + |
| 232 | + start := time.Now().UTC() |
| 233 | + |
| 234 | + reqOpts := minio.GetObjectOptions{} |
| 235 | + reqOpts.SetRange(rangeStart, rangeEnd) |
| 236 | + |
| 237 | + resp, err := params.S3Client.GetObject(testCtx, params.Bucket, obj, reqOpts) |
| 238 | + |
| 239 | + if err != nil { |
| 240 | + ll.Error("failed to fetch range", "error", err) |
| 241 | + |
| 242 | + continue |
| 243 | + } |
| 244 | + |
| 245 | + amount, err := io.Copy(io.Discard, resp) |
| 246 | + resp.Close() |
| 247 | + |
| 248 | + result.TotalBytesRead += amount |
| 249 | + result.TotalTTLBMS += time.Since(start).Milliseconds() |
| 250 | + |
| 251 | + if err != nil { |
| 252 | + ll.Error("failed to discard response body", "error", err) |
| 253 | + |
| 254 | + continue |
| 255 | + } |
| 256 | + } |
| 257 | + } |
| 258 | +} |
0 commit comments