Skip to content

Commit 026de6b

Browse files
committed
retrieve proper instance IDs in GCP
Signed-off-by: Dmitry Shmulevich <[email protected]>
1 parent c2dc4a2 commit 026de6b

File tree

4 files changed

+185
-66
lines changed

4 files changed

+185
-66
lines changed

pkg/providers/gcp/imds.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package gcp
18+
19+
import (
20+
"bufio"
21+
"context"
22+
"fmt"
23+
"strings"
24+
25+
"k8s.io/klog/v2"
26+
27+
"github.com/NVIDIA/topograph/internal/exec"
28+
)
29+
30+
const (
31+
IMDSURL = "http://metadata.google.internal/computeMetadata/v1"
32+
)
33+
34+
func instanceToNodeMap(ctx context.Context, nodes []string) (map[string]string, error) {
35+
url := fmt.Sprintf("%s/instance/id", IMDSURL)
36+
args := []string{"-w", strings.Join(nodes, ","), fmt.Sprintf("echo $(curl -s -H \"Metadata-Flavor: Google\" %s)", url)}
37+
stdout, err := exec.Exec(ctx, "pdsh", args, nil)
38+
if err != nil {
39+
return nil, err
40+
}
41+
42+
i2n := map[string]string{}
43+
scanner := bufio.NewScanner(stdout)
44+
for scanner.Scan() {
45+
arr := strings.Split(scanner.Text(), ": ")
46+
if len(arr) == 2 {
47+
node, instance := arr[0], arr[1]
48+
klog.V(4).Infoln("Node name: ", node, "Instance ID: ", instance)
49+
i2n[instance] = node
50+
}
51+
}
52+
53+
if err := scanner.Err(); err != nil {
54+
return nil, err
55+
}
56+
57+
return i2n, nil
58+
}
59+
60+
func getRegion(ctx context.Context) (string, error) {
61+
url := fmt.Sprintf("%s/zone", IMDSURL)
62+
args := []string{"-s", "-H", "Metadata-Flavor: Google", url}
63+
stdout, err := exec.Exec(ctx, "curl", args, nil)
64+
if err != nil {
65+
return "", err
66+
}
67+
68+
return stdout.String(), nil
69+
}

pkg/providers/gcp/instance_topology.go

+87-46
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ package gcp
1919
import (
2020
"context"
2121
"fmt"
22+
"strconv"
2223
"strings"
2324
"time"
2425

26+
compute "cloud.google.com/go/compute/apiv1"
2527
"cloud.google.com/go/compute/apiv1/computepb"
2628
"cloud.google.com/go/compute/metadata"
29+
"github.com/agrea/ptr"
2730
"google.golang.org/api/iterator"
31+
"k8s.io/klog/v2"
2832

2933
"github.com/NVIDIA/topograph/pkg/topology"
3034
)
@@ -39,72 +43,63 @@ type InstanceInfo struct {
3943
name string
4044
}
4145

42-
func (p *Provider) generateInstanceTopology(ctx context.Context, instanceToNodeMap map[string]string) (*InstanceTopology, error) {
46+
func (p *Provider) generateInstanceTopology(ctx context.Context, pageSize *int, cis []topology.ComputeInstances) (*InstanceTopology, error) {
47+
insTop := &InstanceTopology{
48+
instances: []*InstanceInfo{},
49+
}
50+
51+
maxRes := castPageSize(pageSize)
52+
for _, ci := range cis {
53+
err := p.generateRegionInstanceTopology(ctx, insTop, maxRes, &ci)
54+
if err != nil {
55+
return nil, err
56+
}
57+
}
58+
59+
return insTop, nil
60+
}
61+
62+
func (p *Provider) generateRegionInstanceTopology(ctx context.Context, insTop *InstanceTopology, maxRes *uint32, ci *topology.ComputeInstances) error {
4363
client, err := p.clientFactory()
4464
if err != nil {
45-
return nil, err
65+
return fmt.Errorf("unable to get client: %v", err)
4666
}
4767

4868
projectID, err := metadata.ProjectIDWithContext(ctx)
4969
if err != nil {
50-
return nil, fmt.Errorf("unable to get project ID: %s", err.Error())
70+
return fmt.Errorf("unable to get project ID: %v", err)
5171
}
52-
listZoneRequest := computepb.ListZonesRequest{Project: projectID}
53-
zones := make([]string, 0)
5472

55-
timeNow := time.Now()
56-
res := client.Zones.List(ctx, &listZoneRequest)
57-
requestLatency.WithLabelValues("ListZones").Observe(time.Since(timeNow).Seconds())
73+
klog.InfoS("Getting instance topology", "region", ci.Region, "project", projectID)
5874

59-
for {
60-
zone, err := res.Next()
61-
if err == iterator.Done {
62-
break
63-
}
64-
zones = append(zones, *zone.Name)
75+
req := computepb.ListInstancesRequest{
76+
Project: projectID,
77+
Zone: ci.Region,
78+
MaxResults: maxRes,
79+
PageToken: nil,
6580
}
6681

67-
instanceTopology := &InstanceTopology{instances: make([]*InstanceInfo, 0)}
82+
var cycle int
83+
for {
84+
cycle++
85+
klog.V(4).Infof("Starting cycle %d", cycle)
6886

69-
for _, zone := range zones {
7087
timeNow := time.Now()
71-
listInstanceRequest := computepb.ListInstancesRequest{Project: projectID, Zone: zone}
88+
resp := client.Instances.List(ctx, &req)
7289
requestLatency.WithLabelValues("ListInstances").Observe(time.Since(timeNow).Seconds())
7390

74-
resInstance := client.Instances.List(ctx, &listInstanceRequest)
75-
for {
76-
instance, err := resInstance.Next()
77-
if err == iterator.Done {
78-
break
79-
}
80-
_, isNodeInCluster := instanceToNodeMap[*instance.Name]
91+
processInstanceList(insTop, resp, ci)
8192

82-
if instance.ResourceStatus == nil {
83-
resourceStatusNotFound.WithLabelValues(*instance.Name).Set(1)
84-
continue
85-
}
86-
resourceStatusNotFound.WithLabelValues(*instance.Name).Set(0)
93+
klog.V(4).Infof("Processed %d nodes", len(insTop.instances))
8794

88-
if instance.ResourceStatus.PhysicalHost == nil {
89-
physicalHostNotFound.WithLabelValues(*instance.Name).Set(1)
90-
continue
91-
}
92-
physicalHostNotFound.WithLabelValues(*instance.Name).Set(0)
93-
94-
if isNodeInCluster {
95-
tokens := strings.Split(*instance.ResourceStatus.PhysicalHost, "/")
96-
physicalHostIDChunks.WithLabelValues(*instance.Name).Set(float64(getTokenCount(tokens)))
97-
instanceObj := &InstanceInfo{
98-
name: *instance.Name,
99-
clusterID: tokens[1],
100-
rackID: tokens[2],
101-
}
102-
instanceTopology.instances = append(instanceTopology.instances, instanceObj)
103-
}
95+
if token := resp.PageInfo().Token; token == "" {
96+
break
97+
} else {
98+
req.PageToken = &token
10499
}
105100
}
106101

107-
return instanceTopology, nil
102+
return nil
108103
}
109104

110105
func (cfg *InstanceTopology) toGraph() (*topology.Vertex, error) {
@@ -156,6 +151,44 @@ func (cfg *InstanceTopology) toGraph() (*topology.Vertex, error) {
156151
return root, nil
157152
}
158153

154+
func processInstanceList(insTop *InstanceTopology, resp *compute.InstanceIterator, ci *topology.ComputeInstances) {
155+
for {
156+
instance, err := resp.Next()
157+
if err == iterator.Done {
158+
return
159+
}
160+
instanceId := strconv.FormatUint(*instance.Id, 10)
161+
klog.Infof("Checking INSTANCE %s", instanceId)
162+
if _, ok := ci.Instances[instanceId]; ok {
163+
klog.Infof("FOUND INSTANCE %s", instanceId)
164+
if instance.ResourceStatus == nil {
165+
klog.Infof("ResourceStatus is not set for INSTANCE %s", instanceId)
166+
resourceStatusNotFound.WithLabelValues(instanceId).Set(1)
167+
continue
168+
}
169+
resourceStatusNotFound.WithLabelValues(instanceId).Set(0)
170+
171+
if instance.ResourceStatus.PhysicalHost == nil {
172+
klog.Infof("PhysicalHost is not set for INSTANCE %s", instanceId)
173+
physicalHostNotFound.WithLabelValues(instanceId).Set(1)
174+
continue
175+
}
176+
physicalHostNotFound.WithLabelValues(instanceId).Set(0)
177+
178+
tokens := strings.Split(*instance.ResourceStatus.PhysicalHost, "/")
179+
physicalHostIDChunks.WithLabelValues(instanceId).Set(float64(getTokenCount(tokens)))
180+
instanceObj := &InstanceInfo{
181+
name: instanceId,
182+
clusterID: tokens[1],
183+
rackID: tokens[2],
184+
}
185+
insTop.instances = append(insTop.instances, instanceObj)
186+
} else {
187+
klog.Infof("INSTANCE %s NOT IN MAP", instanceId)
188+
}
189+
}
190+
}
191+
159192
func getTokenCount(tokens []string) int {
160193
c := 0
161194
for _, q := range tokens {
@@ -165,3 +198,11 @@ func getTokenCount(tokens []string) int {
165198
}
166199
return c
167200
}
201+
202+
func castPageSize(val *int) *uint32 {
203+
if val == nil {
204+
return nil
205+
}
206+
207+
return ptr.Uint32(uint32(*val))
208+
}

pkg/providers/gcp/instance_topology_test.go

+23
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"strings"
2121
"testing"
2222

23+
"github.com/agrea/ptr"
2324
"github.com/stretchr/testify/require"
2425
)
2526

@@ -58,3 +59,25 @@ func TestGetTokenCount(t *testing.T) {
5859
})
5960
}
6061
}
62+
63+
func TestCastPageSize(t *testing.T) {
64+
testCases := []struct {
65+
name string
66+
input *int
67+
pageSize *uint32
68+
}{
69+
{
70+
name: "Case 1: nil",
71+
},
72+
{
73+
name: "Case 2: not nil",
74+
input: ptr.Int(5),
75+
pageSize: ptr.Uint32(5),
76+
},
77+
}
78+
for _, tc := range testCases {
79+
t.Run(tc.name, func(t *testing.T) {
80+
require.Equal(t, tc.pageSize, castPageSize(tc.input))
81+
})
82+
}
83+
}

pkg/providers/gcp/provider.go

+6-20
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,8 @@ func New(clientFactory ClientFactory) (*Provider, error) {
8181
}, nil
8282
}
8383

84-
func (p *Provider) GenerateTopologyConfig(ctx context.Context, _ *int, instances []topology.ComputeInstances) (*topology.Vertex, error) {
85-
if len(instances) > 1 {
86-
return nil, fmt.Errorf("GCP does not support mult-region topology requests")
87-
}
88-
89-
var instanceToNode map[string]string
90-
if len(instances) == 1 {
91-
instanceToNode = instances[0].Instances
92-
}
93-
94-
cfg, err := p.generateInstanceTopology(ctx, instanceToNode)
84+
func (p *Provider) GenerateTopologyConfig(ctx context.Context, pageSize *int, instances []topology.ComputeInstances) (*topology.Vertex, error) {
85+
cfg, err := p.generateInstanceTopology(ctx, pageSize, instances)
9586
if err != nil {
9687
return nil, err
9788
}
@@ -103,17 +94,12 @@ func (p *Provider) GenerateTopologyConfig(ctx context.Context, _ *int, instances
10394

10495
// Instances2NodeMap implements slurm.instanceMapper
10596
func (p *Provider) Instances2NodeMap(ctx context.Context, nodes []string) (map[string]string, error) {
106-
i2n := make(map[string]string)
107-
for _, node := range nodes {
108-
i2n[node] = node
109-
}
110-
111-
return i2n, nil
97+
return instanceToNodeMap(ctx, nodes)
11298
}
11399

114100
// GetComputeInstancesRegion implements slurm.instanceMapper
115-
func (p *Provider) GetComputeInstancesRegion(_ context.Context) (string, error) {
116-
return "", nil
101+
func (p *Provider) GetComputeInstancesRegion(ctx context.Context) (string, error) {
102+
return getRegion(ctx)
117103
}
118104

119105
// GetNodeRegion implements k8s.k8sNodeInfo
@@ -123,5 +109,5 @@ func (p *Provider) GetNodeRegion(node *v1.Node) (string, error) {
123109

124110
// GetNodeInstance implements k8s.k8sNodeInfo
125111
func (p *Provider) GetNodeInstance(node *v1.Node) (string, error) {
126-
return node.Labels["kubernetes.io/hostname"], nil
112+
return node.Annotations["container.googleapis.com/instance_id"], nil
127113
}

0 commit comments

Comments
 (0)