Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new resource hint to all sdks for number of cpus per worker machine #28848

Merged
merged 10 commits into from
Oct 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -1982,5 +1982,9 @@ message StandardResourceHints {
// SDKs should convert the size to bytes, but can allow users to specify human-friendly units (e.g. GiB).
// Payload: ASCII encoded string of the base 10 representation of an integer number of bytes.
MIN_RAM_BYTES = 1 [(beam_urn) = "beam:resources:min_ram_bytes:v1"];
// Describes desired number of CPUs available in transform's execution environment.
Abacn marked this conversation as resolved.
Show resolved Hide resolved
// SDKs should accept and validate a positive integer count.
// Payload: ASCII encoded string of the base 10 representation of an integer number of CPUs.
CPU_COUNT = 2 [(beam_urn) = "beam:resources:cpu_count:v1"];
}
}
37 changes: 37 additions & 0 deletions sdks/go/pkg/beam/options/resource/hint.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,40 @@ func (h acceleratorHint) MergeWithOuter(outer Hint) Hint {
func (h acceleratorHint) String() string {
return fmt.Sprintf("accelerator=%v", h.value)
}

// CPUCount hints that this scope should be put in a machine with at least this many CPUs or vCPUs.
//
// Hints are advisory only and runners may not respect them.
//
// See https://beam.apache.org/documentation/runtime/resource-hints/ for more information about
// resource hints.
func CPUCount(v uint64) Hint {
return CPUCountHint{value: uint64(v)}
}

type CPUCountHint struct {
value uint64
}

func (CPUCountHint) URN() string {
return "beam:resources:cpu_count:v1"
}

func (h CPUCountHint) Payload() []byte {
// Go strings are utf8, and if the string is ascii,
// byte conversion handles that directly.
return []byte(strconv.FormatUint(h.value, 10))
}

// MergeWithOuter by keeping the maximum of the two cpu counts.
func (h CPUCountHint) MergeWithOuter(outer Hint) Hint {
// Intentional runtime panic from type assertion to catch hint merge errors.
if outer.(CPUCountHint).value > h.value {
return outer
}
return h
}

func (h CPUCountHint) String() string {
return fmt.Sprintf("cpu_count=%v", humanize.Bytes(uint64(h.value)))
}
45 changes: 41 additions & 4 deletions sdks/go/pkg/beam/options/resource/hint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,38 @@ func TestParseMinRAMHint_panic(t *testing.T) {
ParseMinRAM("a bad byte string")
}

func TestCPUCountHint_MergeWith(t *testing.T) {
low := CPUCountHint{value: 2}
high := CPUCountHint{value: 128}

if got, want := low.MergeWithOuter(high), high; got != want {
t.Errorf("%v.MergeWith(%v) = %v, want %v", low, high, got, want)
}
if got, want := high.MergeWithOuter(low), high; got != want {
t.Errorf("%v.MergeWith(%v) = %v, want %v", high, low, got, want)
}
}

func TestCPUCountHint_Payload(t *testing.T) {
tests := []struct {
value uint64
payload string
}{
{0, "0"},
{2, "2"},
{11, "11"},
{2003, "2003"},
{1.2e7, "12000000"},
}

for _, test := range tests {
h := CPUCountHint{value: test.value}
if got, want := h.Payload(), []byte(test.payload); !bytes.Equal(got, want) {
t.Errorf("%v.Payload() = %v, want %v", h, got, want)
}
}
}

// We copy the URN from the proto for use as a constant rather than perform a direct look up
// each time, or increase initialization time. However we do need to validate that they are
// correct, and match the standard hint urns, so that's done here.
Expand All @@ -130,7 +162,11 @@ func TestStandardHintUrns(t *testing.T) {
}, {
h: MinRAMBytes(2e9),
urn: getStandardURN(pipepb.StandardResourceHints_MIN_RAM_BYTES),
}, {
h: CPUCount(4),
urn: getStandardURN(pipepb.StandardResourceHints_CPU_COUNT),
}}

for _, test := range tests {
if got, want := test.h.URN(), test.urn; got != want {
t.Errorf("Checked urn for %T, got %q, want %q", test.h, got, want)
Expand All @@ -154,12 +190,12 @@ func (h customHint) MergeWithOuter(outer Hint) Hint {
}

func TestHints_Equal(t *testing.T) {
hs := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas"))
hs := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas"), CPUCount(4))

if got, want := hs.Equal(hs), true; got != want {
t.Errorf("Self equal test: hs.Equal(hs) = %v, want %v", got, want)
}
eq := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas"))
eq := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas"), CPUCount(4))
if got, want := hs.Equal(eq), true; got != want {
t.Errorf("identical equal test: hs.Equal(eq) = %v, want %v", got, want)
}
Expand Down Expand Up @@ -223,12 +259,13 @@ func TestHints_MergeWithOuter(t *testing.T) {

func TestHints_Payloads(t *testing.T) {
{
hs := NewHints(MinRAMBytes(2e9), Accelerator("type:jeans;count1;"))
hs := NewHints(MinRAMBytes(2e9), Accelerator("type:jeans;count1;"), CPUCount(4))

got := hs.Payloads()
want := map[string][]byte{
"beam:resources:min_ram_bytes:v1": []byte("2000000000"),
"beam:resources:accelerator:v1": []byte("type:jeans;count1;"),
"beam:resources:cpu_count:v1": []byte("4"),
}
if !reflect.DeepEqual(got, want) {
t.Errorf("hs.Payloads() = %v, want %v", got, want)
Expand All @@ -248,7 +285,7 @@ func TestHints_Payloads(t *testing.T) {
func TestHints_NilHints(t *testing.T) {
var hs1, hs2 Hints

hs := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas"))
hs := NewHints(MinRAMBytes(2e9), Accelerator("type:pants;count1;install-pajamas"), CPUCount(4))

if got, want := hs1.Equal(hs2), true; got != want {
t.Errorf("nils equal test: (nil).Equal(nil) = %v, want %v", got, want)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class ResourceHints {
private static final String MIN_RAM_URN = "beam:resources:min_ram_bytes:v1";
private static final String ACCELERATOR_URN = "beam:resources:accelerator:v1";

private static final String CPU_COUNT_URN = "beam:resources:cpu_count:v1";

// TODO: reference this from a common location in all packages that use this.
private static String getUrn(ProtocolMessageEnum value) {
return value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn);
Expand All @@ -57,19 +59,23 @@ private static String getUrn(ProtocolMessageEnum value) {
static {
checkState(MIN_RAM_URN.equals(getUrn(StandardResourceHints.Enum.MIN_RAM_BYTES)));
checkState(ACCELERATOR_URN.equals(getUrn(StandardResourceHints.Enum.ACCELERATOR)));
checkState(CPU_COUNT_URN.equals(getUrn(StandardResourceHints.Enum.CPU_COUNT)));
}

private static ImmutableMap<String, String> hintNameToUrn =
ImmutableMap.<String, String>builder()
.put("minRam", MIN_RAM_URN)
.put("min_ram", MIN_RAM_URN) // Courtesy alias.
.put("accelerator", ACCELERATOR_URN)
.put("cpuCount", CPU_COUNT_URN)
.put("cpu_count", CPU_COUNT_URN) // Courtesy alias.
.build();

private static ImmutableMap<String, Function<String, ResourceHint>> parsers =
ImmutableMap.<String, Function<String, ResourceHint>>builder()
.put(MIN_RAM_URN, s -> new BytesHint(BytesHint.parse(s)))
.put(ACCELERATOR_URN, s -> new StringHint(s))
.put(CPU_COUNT_URN, s -> new IntHint(IntHint.parse(s)))
.build();

private static final ResourceHints EMPTY = new ResourceHints(ImmutableMap.of());
Expand Down Expand Up @@ -212,6 +218,46 @@ public int hashCode() {
}
}

/*package*/ static class IntHint extends ResourceHint {
private final int value;

@Override
public boolean equals(@Nullable Object other) {
if (other == null) {
return false;
} else if (this == other) {
return true;
} else if (other instanceof IntHint) {
return ((IntHint) other).value == value;
} else {
return false;
}
}

@Override
public int hashCode() {
return Integer.hashCode(value);
}

public IntHint(int value) {
this.value = value;
}

public static int parse(String s) {
return Integer.parseInt(s, 10);
}

@Override
public ResourceHint mergeWithOuter(ResourceHint outer) {
return new IntHint(Math.max(value, ((IntHint) outer).value));
}

@Override
public byte[] toBytes() {
return String.valueOf(value).getBytes(Charsets.US_ASCII);
}
}

/**
* Sets desired minimal available RAM size to have in transform's execution environment.
*
Expand Down Expand Up @@ -264,6 +310,23 @@ public ResourceHints withHint(String urn, ResourceHint hint) {
return new ResourceHints(newHints.build());
}

/**
* Sets desired minimal CPU or vCPU count to have in transform's execution environment.
*
* @param cpuCount specifies a positive CPU count.
*/
public ResourceHints withCPUCount(int cpuCount) {
if (cpuCount <= 0) {
LOG.error(
"Encountered invalid non-positive cpu count hint value {}.\n"
+ "The value is ignored. In the future, The method will require an object Long type "
+ "and throw an IllegalArgumentException for invalid values.",
cpuCount);
return this;
}
return withHint(CPU_COUNT_URN, new IntHint(cpuCount));
}

public Map<String, ResourceHint> hints() {
return hints;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ public void testFromOptions() {
.withHint("beam:resources:bar", new ResourceHints.StringHint("foo")));
options =
PipelineOptionsFactory.fromArgs(
"--resourceHints=min_ram=1KB", "--resourceHints=accelerator=foo")
"--resourceHints=min_ram=1KB", "--resourceHints=accelerator=foo",
"--resourceHints=cpu_count=4")
.as(ResourceHintsOptions.class);
ResourceHints fromOptions = ResourceHints.fromOptions(options);
ResourceHints expect = ResourceHints.create().withMinRam(1000).withAccelerator("foo")
.withCPUCount(4);
assertEquals(
ResourceHints.fromOptions(options),
ResourceHints.create().withMinRam(1000).withAccelerator("foo"));
fromOptions,
expect);
}
}
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/transforms/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
'ResourceHint',
'AcceleratorHint',
'MinRamHint',
'CpuCountHint',
'merge_resource_hints',
'parse_resource_hints',
'resource_hints_from_options',
Expand Down Expand Up @@ -177,6 +178,21 @@ def get_merged_value(
ResourceHint.register_resource_hint('minRam', MinRamHint)


class CpuCountHint(ResourceHint):
"""Describes number of CPUs available in transform's execution environment."""
urn = resource_hints.CPU_COUNT.urn

@classmethod
def get_merged_value(
cls, outer_value, inner_value): # type: (bytes, bytes) -> bytes
return ResourceHint._use_max(outer_value, inner_value)


ResourceHint.register_resource_hint('cpu_count', CpuCountHint)
# Alias for interoperability with SDKs preferring camelCase.
ResourceHint.register_resource_hint('cpuCount', CpuCountHint)
kerrydc marked this conversation as resolved.
Show resolved Hide resolved


def parse_resource_hints(hints): # type: (Dict[Any, Any]) -> Dict[str, bytes]
parsed_hints = {}
for hint, value in hints.items():
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/transforms/resources_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class ResourcesTest(unittest.TestCase):
val='gpu',
urn='beam:resources:accelerator:v1',
bytestr=b'gpu'),
param(
name='cpu_count',
val='4',
urn='beam:resources:cpu_count:v1',
bytestr=b'4'),
])
def test_known_resource_hints(self, name, val, urn, bytestr):
t = PTransform()
Expand All @@ -56,6 +61,7 @@ def test_known_resource_hints(self, name, val, urn, bytestr):
@parameterized.expand([
param(name='min_ram', val='3,500G'),
param(name='accelerator', val=1),
param(name='cpu_count', val=1),
param(name='unknown_hint', val=1)
])
def test_resource_hint_parsing_fails_early(self, name, val):
Expand Down