Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9d849fa
add list and improving warm buffers
sanderegg Aug 12, 2025
09291c6
make compatible again
sanderegg Aug 22, 2025
a3767fe
AI first step
sanderegg Aug 22, 2025
8e1e7f1
AI step 2
sanderegg Aug 22, 2025
2304986
change fixture to creation mode
sanderegg Aug 25, 2025
4e8c526
refactor tests
sanderegg Aug 25, 2025
97f3be5
minor
sanderegg Aug 25, 2025
900daae
minor
sanderegg Aug 25, 2025
911f61a
fix fixture
sanderegg Aug 25, 2025
b2b6f4a
removed stupid test
sanderegg Aug 25, 2025
c00e4fc
added test for partial capacity
sanderegg Aug 25, 2025
8c8e983
ruffing
sanderegg Aug 25, 2025
d05cfa5
fixing types
sanderegg Aug 25, 2025
211e4f5
changed ENV to plural
sanderegg Aug 25, 2025
23bed87
minor
sanderegg Aug 26, 2025
7b0de90
minor
sanderegg Aug 26, 2025
d7e16fc
removed unnecessary tests on multiple instance types
sanderegg Aug 26, 2025
396e3d3
minor
sanderegg Aug 26, 2025
807c044
simplify
sanderegg Aug 26, 2025
24f92d2
minor
sanderegg Aug 26, 2025
dec0b09
workaround moto issue
sanderegg Aug 26, 2025
f50fbfd
improve
sanderegg Aug 26, 2025
cd35926
added test for checking distribution
sanderegg Aug 26, 2025
7e4b1b6
do not support distribution among subnets
sanderegg Aug 26, 2025
6517756
revert useless changes
sanderegg Aug 26, 2025
ce8b59f
pyright
sanderegg Aug 28, 2025
c5a6530
types
sanderegg Aug 28, 2025
f0753b0
refactor
sanderegg Aug 28, 2025
0d3b8eb
update function call
sanderegg Aug 28, 2025
aa87e0d
add simple test for subnets
sanderegg Aug 28, 2025
eb2dbb4
Update packages/aws-library/src/aws_library/ec2/_models.py
sanderegg Aug 28, 2025
5247a91
ensure mock does the right thing
sanderegg Aug 28, 2025
735b77b
improve test and mimick what AWS really does
sanderegg Aug 28, 2025
4cda5b3
added 2 tests with regard to multiple zones
sanderegg Aug 28, 2025
76c4563
fixed error handler
sanderegg Aug 28, 2025
04b208a
fixed test
sanderegg Aug 28, 2025
31aa472
renamed function and properly catch error
sanderegg Aug 28, 2025
64afa37
fixed fixture
sanderegg Aug 28, 2025
3da3d58
@GitHK review: refactor
sanderegg Aug 29, 2025
1764ea4
@GitHK review: improve text
sanderegg Aug 29, 2025
db5f577
@pcrespov review: improve message and exception handling
sanderegg Aug 29, 2025
553d30b
adjust test
sanderegg Aug 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions packages/aws-library/src/aws_library/ec2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from ._client import SimcoreEC2API
from ._errors import EC2AccessError, EC2NotConnectedError, EC2RuntimeError
from ._errors import (
EC2AccessError,
EC2InsufficientCapacityError,
EC2NotConnectedError,
EC2RuntimeError,
)
from ._models import (
AWS_TAG_KEY_MAX_LENGTH,
AWS_TAG_KEY_MIN_LENGTH,
Expand All @@ -16,22 +21,22 @@
)

__all__: tuple[str, ...] = (
"AWSTagKey",
"AWSTagValue",
"AWS_TAG_KEY_MIN_LENGTH",
"AWS_TAG_KEY_MAX_LENGTH",
"AWS_TAG_VALUE_MIN_LENGTH",
"AWS_TAG_KEY_MIN_LENGTH",
"AWS_TAG_VALUE_MAX_LENGTH",
"AWS_TAG_VALUE_MIN_LENGTH",
"AWSTagKey",
"AWSTagValue",
"EC2AccessError",
"EC2InstanceBootSpecific",
"EC2InstanceConfig",
"EC2InstanceData",
"EC2InstanceType",
"EC2InsufficientCapacityError",
"EC2NotConnectedError",
"EC2RuntimeError",
"EC2Tags",
"Resources",
"SimcoreEC2API",
)

# nopycln: file
193 changes: 131 additions & 62 deletions packages/aws-library/src/aws_library/ec2/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@
from settings_library.ec2 import EC2Settings
from types_aiobotocore_ec2 import EC2Client
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType
from types_aiobotocore_ec2.type_defs import FilterTypeDef, TagTypeDef
from types_aiobotocore_ec2.type_defs import (
FilterTypeDef,
TagTypeDef,
)

from ._error_handler import ec2_exception_handler
from ._errors import EC2InstanceNotFoundError, EC2TooManyInstancesError
from ._errors import (
EC2InstanceNotFoundError,
EC2InsufficientCapacityError,
EC2SubnetsNotEnoughIPsError,
)
from ._models import (
AWSTagKey,
EC2InstanceConfig,
Expand All @@ -25,7 +32,13 @@
EC2Tags,
Resources,
)
from ._utils import compose_user_data, ec2_instance_data_from_aws_instance
from ._utils import (
check_max_number_of_instances_not_exceeded,
compose_user_data,
ec2_instance_data_from_aws_instance,
get_subnet_azs,
get_subnet_capacity,
)

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -92,6 +105,11 @@ async def get_ec2_instance_capabilities(
list_instances: list[EC2InstanceType] = []
for instance in instance_types.get("InstanceTypes", []):
with contextlib.suppress(KeyError):
assert "InstanceType" in instance # nosec
assert "VCpuInfo" in instance # nosec
assert "DefaultVCpus" in instance["VCpuInfo"] # nosec
assert "MemoryInfo" in instance # nosec
assert "SizeInMiB" in instance["MemoryInfo"] # nosec
list_instances.append(
EC2InstanceType(
name=instance["InstanceType"],
Expand All @@ -118,94 +136,145 @@ async def launch_instances(

Arguments:
instance_config -- The EC2 instance configuration
min_number_of_instances -- the minimal number of instances needed (fails if this amount cannot be reached)
min_number_of_instances -- the minimal number of instances required (fails if this amount cannot be reached)
number_of_instances -- the ideal number of instances needed (it it cannot be reached AWS will return a number >=min_number_of_instances)

Keyword Arguments:
max_total_number_of_instances -- The total maximum allowed number of instances for this given instance_config (default: {10})
max_total_number_of_instances -- The total maximum allowed number of instances for this given instance_config

Raises:
EC2TooManyInstancesError:
EC2TooManyInstancesError: max_total_number_of_instances would be exceeded
EC2SubnetsNotEnoughIPsError: not enough IPs in the subnets
EC2InsufficientCapacityError: not enough capacity in the subnets


Returns:
The created instance data infos
"""

with log_context(
_logger,
logging.INFO,
msg=f"launch {number_of_instances} AWS instance(s) {instance_config.type.name} with {instance_config.tags=}",
msg=f"launch {number_of_instances} AWS instance(s) {instance_config.type.name}"
f" with {instance_config.tags=} in {instance_config.subnet_ids=}",
):
# first check the max amount is not already reached
current_instances = await self.get_instances(
key_names=[instance_config.key_name], tags=instance_config.tags
await check_max_number_of_instances_not_exceeded(
self,
instance_config,
required_number_instances=number_of_instances,
max_total_number_of_instances=max_total_number_of_instances,
)
if (
len(current_instances) + number_of_instances
> max_total_number_of_instances
):
raise EC2TooManyInstancesError(
num_instances=max_total_number_of_instances

# NOTE: checking subnets capacity is not strictly needed as AWS will do it for us
# but it gives us a chance to give early feedback to the user
# and avoid trying to launch instances in subnets that are already full
# and also allows to circumvent a moto bug that does not raise
# InsufficientInstanceCapacity when a subnet is full
subnet_id_to_available_ips = await get_subnet_capacity(
self.client, subnet_ids=instance_config.subnet_ids
)

total_available_ips = sum(subnet_id_to_available_ips.values())
if total_available_ips < min_number_of_instances:
raise EC2SubnetsNotEnoughIPsError(
subnet_ids=instance_config.subnet_ids,
instance_type=instance_config.type.name,
available_ips=total_available_ips,
)

# now let's not try to run instances in subnets that have not enough IPs
subnet_ids_with_capacity = [
subnet_id
for subnet_id, capacity in subnet_id_to_available_ips.items()
if capacity >= min_number_of_instances
]

resource_tags: list[TagTypeDef] = [
{"Key": tag_key, "Value": tag_value}
for tag_key, tag_value in instance_config.tags.items()
]

instances = await self.client.run_instances(
ImageId=instance_config.ami_id,
MinCount=min_number_of_instances,
MaxCount=number_of_instances,
IamInstanceProfile=(
{"Arn": instance_config.iam_instance_profile}
if instance_config.iam_instance_profile
else {}
),
InstanceType=instance_config.type.name,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_config.key_name,
TagSpecifications=[
{"ResourceType": "instance", "Tags": resource_tags},
{"ResourceType": "volume", "Tags": resource_tags},
{"ResourceType": "network-interface", "Tags": resource_tags},
],
UserData=compose_user_data(instance_config.startup_script),
NetworkInterfaces=[
{
"AssociatePublicIpAddress": True,
"DeviceIndex": 0,
"SubnetId": instance_config.subnet_id,
"Groups": instance_config.security_group_ids,
}
],
)
instance_ids = [i["InstanceId"] for i in instances["Instances"]]
_logger.info(
"%s New instances launched: %s, waiting for them to start now...",
len(instance_ids),
instance_ids,
)
# Try each subnet in order until one succeeds
for subnet_id in subnet_ids_with_capacity:
try:
_logger.debug(
"Attempting to launch instances in subnet %s", subnet_id
)

# wait for the instance to be in a pending state
# NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html
waiter = self.client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=instance_ids)
_logger.debug("instances %s exists now.", instance_ids)
instances = await self.client.run_instances(
ImageId=instance_config.ami_id,
MinCount=min_number_of_instances,
MaxCount=number_of_instances,
IamInstanceProfile=(
{"Arn": instance_config.iam_instance_profile}
if instance_config.iam_instance_profile
else {}
),
InstanceType=instance_config.type.name,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_config.key_name,
TagSpecifications=[
{"ResourceType": "instance", "Tags": resource_tags},
{"ResourceType": "volume", "Tags": resource_tags},
{
"ResourceType": "network-interface",
"Tags": resource_tags,
},
],
UserData=compose_user_data(instance_config.startup_script),
NetworkInterfaces=[
{
"AssociatePublicIpAddress": True,
"DeviceIndex": 0,
"SubnetId": subnet_id,
"Groups": instance_config.security_group_ids,
}
],
)
# If we get here, the launch succeeded
break
except botocore.exceptions.ClientError as exc:
error_code = exc.response.get("Error", {}).get("Code")
if error_code == "InsufficientInstanceCapacity":
_logger.warning(
"Insufficient capacity in subnet %s for instance type %s, trying next subnet",
subnet_id,
instance_config.type.name,
)
continue
# For any other ClientError, re-raise to let the decorator handle it
raise

else:
subnet_zones = await get_subnet_azs(
self.client, subnet_ids=subnet_ids_with_capacity
)
raise EC2InsufficientCapacityError(
availability_zones=subnet_zones,
instance_type=instance_config.type.name,
)
instance_ids = [
i["InstanceId"] # pyright: ignore[reportTypedDictNotRequiredAccess]
for i in instances["Instances"]
]
with log_context(
_logger,
logging.INFO,
msg=f"{len(instance_ids)} instances: {instance_ids=} launched. Wait to reach pending state",
):
# wait for the instance to be in a pending state
# NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html
waiter = self.client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=instance_ids)

# NOTE: waiting for pending ensure we get all the IPs back
# NOTE: waiting for pending ensures we get all the IPs back
described_instances = await self.client.describe_instances(
InstanceIds=instance_ids
)
assert "Instances" in described_instances["Reservations"][0] # nosec
instance_datas = [
return [
await ec2_instance_data_from_aws_instance(self, i)
for i in described_instances["Reservations"][0]["Instances"]
]
_logger.info(
"%s are pending now",
f"{instance_ids=}",
)
return instance_datas

@ec2_exception_handler(_logger)
async def get_instances(
Expand Down
52 changes: 39 additions & 13 deletions packages/aws-library/src/aws_library/ec2/_error_handler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import functools
import logging
import re
from collections.abc import Callable, Coroutine
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar
from typing import (
TYPE_CHECKING,
Any,
Concatenate,
Final,
ParamSpec,
TypeVar,
cast,
)

from botocore import exceptions as botocore_exc

from ._errors import (
EC2AccessError,
EC2InstanceNotFoundError,
EC2InstanceTypeInvalidError,
EC2InsufficientCapacityError,
EC2NotConnectedError,
EC2RuntimeError,
EC2TimeoutError,
Expand All @@ -26,30 +36,46 @@
Self = TypeVar("Self", bound="SimcoreEC2API")


_INSUFFICIENT_CAPACITY_ERROR_MSG_PATTERN: Final[re.Pattern] = re.compile(
r"sufficient (?P<instance_type>\S+) capacity in the Availability Zone you requested "
r"\((?P<failed_az>\S+)\)"
)


def _map_botocore_client_exception(
botocore_error: botocore_exc.ClientError,
*args, # pylint: disable=unused-argument # noqa: ARG001
**kwargs, # pylint: disable=unused-argument # noqa: ARG001
) -> EC2AccessError:
status_code = int(
botocore_error.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
or botocore_error.response.get("Error", {}).get("Code", -1)
# see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services
status_code = cast(
int,
botocore_error.response.get("ResponseMetadata", {}).get("HTTPStatusCode", "-1"),
)
error_code = botocore_error.response.get("Error", {}).get("Code", "Unknown")
error_msg = botocore_error.response.get("Error", {}).get("Message", "Unknown")
operation_name = botocore_error.operation_name
match status_code, operation_name:
case 400, "StartInstances":
match error_code:
case "InvalidInstanceID.NotFound":
return EC2InstanceNotFoundError()
case 400, "StopInstances":
return EC2InstanceNotFoundError()
case 400, "TerminateInstances":
return EC2InstanceNotFoundError()
case 400, "DescribeInstanceTypes":
case "InvalidInstanceType":
return EC2InstanceTypeInvalidError()
case "InsufficientInstanceCapacity":
availability_zone = "unknown"
instance_type = "unknown"
if match := re.search(_INSUFFICIENT_CAPACITY_ERROR_MSG_PATTERN, error_msg):
instance_type = match.group("instance_type")
availability_zone = match.group("failed_az")

raise EC2InsufficientCapacityError(
availability_zones=availability_zone, instance_type=instance_type
)
case _:
return EC2AccessError(
status_code=status_code,
operation_name=operation_name,
code=status_code,
error=f"{botocore_error}",
code=error_code,
error=error_msg,
)


Expand Down
Loading
Loading