15
15
package benchi
16
16
17
17
import (
18
+ "bufio"
18
19
"bytes"
19
20
"context"
20
21
"encoding/csv"
@@ -37,6 +38,7 @@ import (
37
38
"github.com/conduitio/benchi/metrics/docker"
38
39
"github.com/conduitio/benchi/metrics/kafka"
39
40
"github.com/conduitio/benchi/metrics/prometheus"
41
+ "github.com/docker/docker/api/types/image"
40
42
"github.com/docker/docker/client"
41
43
"github.com/docker/docker/errdefs"
42
44
"github.com/sourcegraph/conc/pool"
@@ -372,7 +374,36 @@ func (r *TestRunner) runPreInfrastructure(ctx context.Context) (err error) {
372
374
return fmt .Errorf ("failed to create output folder %q: %w" , r .resultsDir , err )
373
375
}
374
376
375
- // TODO pull all images
377
+ // Pull infrastructure images
378
+ logger .Info ("Pulling docker images for infrastructure containers" , "containers" , r .infrastructureContainers )
379
+ err = dockerutil .ComposePull (
380
+ ctx ,
381
+ dockerutil.ComposeOptions {
382
+ File : collectDockerComposeFiles (r .infrastructure ),
383
+ },
384
+ dockerutil.ComposePullOptions {},
385
+ )
386
+ if err != nil {
387
+ return fmt .Errorf ("failed to pull infrastructure images: %w" , err )
388
+ }
389
+
390
+ // Pull tool images
391
+ logger .Info ("Pulling docker images for tool containers" , "containers" , r .toolContainers )
392
+ err = dockerutil .ComposePull (
393
+ ctx ,
394
+ dockerutil.ComposeOptions {
395
+ File : collectDockerComposeFiles (r .tools ),
396
+ },
397
+ dockerutil.ComposePullOptions {},
398
+ )
399
+ if err != nil {
400
+ return fmt .Errorf ("failed to pull tool images: %w" , err )
401
+ }
402
+
403
+ err = r .pullHookImages (ctx , logger , r .hooks .All ())
404
+ if err != nil {
405
+ return fmt .Errorf ("failed to pull docker images for hooks: %w" , err )
406
+ }
376
407
377
408
return r .runHooks (ctx , logger , r .hooks .PreInfrastructure )
378
409
}
@@ -783,6 +814,52 @@ func (r *TestRunner) runHook(ctx context.Context, logger *slog.Logger, hook conf
783
814
return dockerutil .RunInDockerNetwork (ctx , logger , image , NetworkName , hook .Run )
784
815
}
785
816
817
+ func (r * TestRunner ) pullHookImages (ctx context.Context , logger * slog.Logger , hooks []config.TestHook ) error {
818
+ var imgs []string
819
+ for _ , hook := range hooks {
820
+ img := hook .Image
821
+ if img == "" {
822
+ img = DefaultHookImage
823
+ }
824
+ if ! slices .Contains (imgs , img ) {
825
+ imgs = append (imgs , img )
826
+ }
827
+ }
828
+
829
+ logger .Info ("Pulling docker images for hooks" , "images" , imgs )
830
+ for _ , img := range imgs {
831
+ resp , err := r .dockerClient .ImagePull (ctx , img , image.PullOptions {})
832
+ if err != nil {
833
+ return fmt .Errorf ("failed to pull docker image %q: %w" , img , err )
834
+ }
835
+ scanner := bufio .NewScanner (resp )
836
+ tmp := make (map [string ]any )
837
+ logAttrs := make ([]any , 0 )
838
+ for scanner .Scan () {
839
+ clear (tmp )
840
+ logAttrs = logAttrs [:0 ]
841
+
842
+ body := scanner .Bytes ()
843
+ if err := json .Unmarshal (body , & tmp ); err != nil {
844
+ logger .Warn ("Failed to unmarshal docker image pull response" , "image" , img , "error" , err )
845
+ tmp ["response" ] = string (body )
846
+ }
847
+
848
+ for k , v := range tmp {
849
+ logAttrs = append (logAttrs , slog .Any (k , v ))
850
+ }
851
+
852
+ logger .Info ("image pull response" , logAttrs ... )
853
+ }
854
+ if err := scanner .Err (); err != nil {
855
+ return fmt .Errorf ("failed to read image pull response: %w" , err )
856
+ }
857
+ _ = resp .Close ()
858
+ }
859
+
860
+ return nil
861
+ }
862
+
786
863
func collectDockerComposeFiles (cfgs []config.ServiceConfig ) []string {
787
864
paths := make ([]string , len (cfgs ))
788
865
for i , cfg := range cfgs {
0 commit comments