Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docker_images/power_schedule/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def setUp(self) -> None:
'stop_instance': 0,
'error': 0,
'not_active': 0,
'reason': None
'reason': None,
'resource_errors': []
}

def test_outdated_schedule(self):
Expand Down Expand Up @@ -445,6 +446,12 @@ def raise_exc(*args):
result = self.default_result.copy()
result['error'] = 1
result['reason'] = 'error'
result['resource_errors'] = [{
'cloud_resource_id': 'cloud_resource_id',
'resource_type': 'Instance',
'action': 'stop_instance',
'error': 'error',
}]
self.assertEqual(result, self.worker.result)
self.worker.rest_cl.power_schedule_update.assert_called_once()
self.assertIn(
Expand Down
242 changes: 202 additions & 40 deletions docker_images/power_schedule/worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python
import base64
import os
import urllib3
from collections import defaultdict
Expand Down Expand Up @@ -107,7 +108,8 @@ def default_result():
'stop_instance': 0,
'error': 0,
'not_active': 0,
'reason': None
'reason': None,
'resource_errors': [],
}

def is_executable_ps(self, power_schedule: dict):
Expand Down Expand Up @@ -142,9 +144,14 @@ def get_action(self, schedule):
now_dt = datetime.now(tz=pytz.utc)
last_eval_dt = datetime.fromtimestamp(
schedule['last_eval'], tz=pytz.utc)
local_now = datetime.now(tz=local_tz)
current_dow = local_now.weekday() # 0=Monday, 6=Sunday
times_today = []
time_action_map = {}
for trigger in schedule['triggers']:
days_of_week = trigger.get('days_of_week')
if days_of_week is not None and current_dow not in days_of_week:
continue
action = trigger['action']
time = self._local_time_to_utc(trigger['time'], local_tz)
if time in time_action_map:
Expand All @@ -154,6 +161,9 @@ def get_action(self, schedule):
times_today.append(time)
time_action_map[time] = action
# collect power on/off segments during day
if not times_today:
self.result['reason'] = PowerScheduleReasons.NO_CHANGES
return None
times_today = sorted(times_today)
time_periods = zip(times_today[:-1], times_today[1:])

Expand Down Expand Up @@ -221,13 +231,173 @@ def _cloud_action(cloud_adapter, resource_data, action):

def cloud_action(self, action, resource, cloud_type, cloud_adapter):
resource_data = self.get_resource_data(resource, cloud_type)
self._cloud_action(cloud_adapter, resource_data, action)
method = action
if cloud_type == 'aws_cnr':
rtype = resource.get('resource_type')
if rtype == 'RDS Instance':
cluster_id = resource.get('meta', {}).get('source_cluster_id')
if cluster_id:
aurora_map = {
'start_instance': 'start_aurora_cluster',
'stop_instance': 'stop_aurora_cluster',
}
method = aurora_map[action]
resource_data = ([cluster_id], resource.get('region'))
else:
rds_map = {
'start_instance': 'start_rds_instance',
'stop_instance': 'stop_rds_instance',
}
method = rds_map[action]
try:
self._cloud_action(cloud_adapter, resource_data, method)
except ResourceNotFound:
try:
self.rest_cl.cloud_resource_update(
str(resource['_id']), {'active': False})
except Exception as upd_exc:
LOG.warning('Failed to deactivate stale resource %s: %s',
resource.get('cloud_resource_id'), upd_exc)
raise
self.result[action] += 1

def _execute_parallel(self, cloud_adapter, action, resources, cloud_type):
errors = []
if not resources:
return errors
executor = ThreadPoolExecutor(max_workers=THREADS_NUM)
futures = {
executor.submit(
self.cloud_action, action=action, resource=r,
cloud_type=cloud_type, cloud_adapter=cloud_adapter): r
for r in resources
}
for future in as_completed(futures):
resource = futures[future]
try:
future.result()
except Exception as exc:
self.result['error'] += 1
if isinstance(exc, (ResourceNotFound, InvalidResourceStateException)):
LOG.warning('Action %s failed as resource '
'doesn\'t exist or is in a wrong '
'state: %s', action, str(exc))
else:
LOG.exception('Action %s failed', action)
errors.append(exc)
self.result['resource_errors'].append({
'cloud_resource_id': resource.get('cloud_resource_id'),
'resource_type': resource.get('resource_type'),
'action': action,
'error': str(exc),
})
return errors

def _execute_aws_ordered(self, cloud_adapter, action, resources):
ec2 = [r for r in resources if r.get('resource_type') == 'Instance']
rds = [r for r in resources
if r.get('resource_type') == 'RDS Instance'
and not r.get('meta', {}).get('source_cluster_id')]
# Deduplicate Aurora: one operation per cluster, not per instance
aurora_by_cluster = {}
for r in resources:
if r.get('resource_type') == 'RDS Instance':
cluster_id = r.get('meta', {}).get('source_cluster_id')
if cluster_id and cluster_id not in aurora_by_cluster:
aurora_by_cluster[cluster_id] = r
aurora = list(aurora_by_cluster.values())
# PaaS group: start before EC2, stop after EC2
paas_group = rds + aurora

if not ec2 or not paas_group:
return self._execute_parallel(
cloud_adapter, action, resources, 'aws_cnr')

errors = []
if action == 'stop_instance':
# Phase 1: stop EC2
errors += self._execute_parallel(
cloud_adapter, action, ec2, 'aws_cnr')
# Wait for EC2 to reach stopped before touching PaaS
by_region = defaultdict(list)
for r in ec2:
by_region[r.get('region')].append(r['cloud_resource_id'])
for region, ids in by_region.items():
try:
cloud_adapter.wait_instances_stopped(ids, region)
except Exception as exc:
LOG.warning('Timed out waiting for EC2 instances to stop '
'in %s: %s', region, exc)
# Phase 2: stop RDS + Aurora
errors += self._execute_parallel(
cloud_adapter, action, paas_group, 'aws_cnr')

elif action == 'start_instance':
# Phase 1: start RDS + Aurora
errors += self._execute_parallel(
cloud_adapter, action, paas_group, 'aws_cnr')
# Wait for RDS instances
for r in rds:
try:
cloud_adapter.wait_rds_available(
r['cloud_resource_id'], r.get('region'))
except Exception as exc:
LOG.warning('Timed out waiting for RDS instance %s: %s',
r['cloud_resource_id'], exc)
# Wait for Aurora clusters
for cluster_id, r in aurora_by_cluster.items():
try:
cloud_adapter.wait_aurora_cluster_available(
cluster_id, r.get('region'))
except Exception as exc:
LOG.warning('Timed out waiting for Aurora cluster %s: %s',
cluster_id, exc)
# Phase 2: start EC2
errors += self._execute_parallel(cloud_adapter, action, ec2, 'aws_cnr')

return errors

def _get_tag_selector_resources(self, power_schedule):
tag_selector = power_schedule.get('tag_selector')
if not tag_selector or not tag_selector.get('tags'):
return []
tags = tag_selector['tags']
resource_types = tag_selector.get(
'resource_types') or ['Instance', 'RDS Instance']
_, org_cloud_accounts = self.rest_cl.cloud_account_list(
power_schedule['organization_id'])
aws_accounts = [
ca for ca in org_cloud_accounts.get('cloud_accounts', [])
if ca.get('type') == 'aws_cnr'
]
aws_ca_ids = [ca['id'] for ca in aws_accounts]
if not aws_ca_ids:
return []
mongo_q = {
'active': True,
'resource_type': {'$in': resource_types},
'cloud_account_id': {'$in': aws_ca_ids},
}
for key, value in tags.items():
encoded_key = base64.b64encode(key.encode('utf-8')).decode('utf-8')
mongo_q[f'tags.{encoded_key}'] = value
resources = list(self.mongo_cl.restapi.resources.find(mongo_q))

return resources

def process_resources(self, power_schedule, action):
power_schedule_id = power_schedule['id']
resources = list(self.mongo_cl.restapi.resources.find(
explicit_resources = list(self.mongo_cl.restapi.resources.find(
{'power_schedule': power_schedule_id}))
tag_resources = self._get_tag_selector_resources(power_schedule)

explicit_ids = {r['_id'] for r in explicit_resources}
extra_tag_resources = [
r for r in tag_resources
if r.get('_id') not in explicit_ids]

resources = explicit_resources + extra_tag_resources

if not resources:
self.result['reason'] = PowerScheduleReasons.NO_RESOURCES
return
Expand All @@ -240,45 +410,37 @@ def process_resources(self, power_schedule, action):
cloud_account_id = resource.get('cloud_account_id')
cloud_acc_resources[cloud_account_id].append(resource)

all_errors = []
for cloud_account_id, resources in cloud_acc_resources.items():
_, cloud_acc = self.rest_cl.cloud_account_get(cloud_account_id)
cloud_type = cloud_acc['type']
config = cloud_acc['config']
config['type'] = cloud_acc['type']
config['type'] = cloud_type
cloud_adapter = Cloud.get_adapter(config)
executor = ThreadPoolExecutor(max_workers=THREADS_NUM)
futures = []
for resource in resources:
futures.append(executor.submit(
self.cloud_action, action=action, resource=resource,
cloud_type=cloud_acc['type'], cloud_adapter=cloud_adapter))
# handle all instances and raise exception in the end
errors = []
for result in as_completed(futures):
try:
result.result()
except Exception as exc:
self.result['error'] += 1
if (isinstance(exc, ResourceNotFound) or
isinstance(exc, InvalidResourceStateException)):
LOG.warning('Action %s failed as resource '
'doesn\'t exist or is in a wrong '
'state: %s', action, str(exc))
else:
LOG.exception('Action %s failed', action)
errors.append(exc)
meta = {
'success_count': (self.result['start_instance'] +
self.result['stop_instance']),
'error_count': self.result['error'],
'not_active_count': self.result['not_active'],
'vm_action': 'on' if action == 'start_instance' else 'off'
}
self.publish_activities_task(
power_schedule['organization_id'], power_schedule_id,
'power_schedule', power_schedule['name'],
'power_schedule_processed', meta)
if errors:
raise errors[0]

if cloud_type == 'aws_cnr':
errors = self._execute_aws_ordered(cloud_adapter, action, resources)
else:
errors = self._execute_parallel(
cloud_adapter, action, resources, cloud_type)

all_errors.extend(errors)

meta = {
'success_count': (self.result['start_instance'] +
self.result['stop_instance']),
'error_count': self.result['error'],
'not_active_count': self.result['not_active'],
'vm_action': 'on' if action == 'start_instance' else 'off'
}
self.publish_activities_task(
power_schedule['organization_id'], power_schedule_id,
'power_schedule', power_schedule['name'],
'power_schedule_processed', meta)
if all_errors:
for exc in all_errors:
LOG.error('Unhandled cloud action error: %s', exc)
raise all_errors[0]

def process_schedule(self, power_schedule_id):
LOG.info('Start processing for schedule %s', power_schedule_id)
Expand Down Expand Up @@ -317,9 +479,9 @@ def process_task(self, body, message):
PowerScheduleReasons.OUTDATED,
PowerScheduleReasons.DISABLED_ORG]:
updates['last_run'] = now_ts
# we should reset error only if instances have been powered on/off
# during this run
updates['last_run_error'] = None
resource_errors = self.result.get('resource_errors', [])
updates['last_run_details'] = resource_errors if resource_errors else None
if error:
updates['last_run_error'] = error
self.rest_cl.power_schedule_update(power_schedule_id, updates)
Expand Down
11 changes: 7 additions & 4 deletions docker_images/resource_discovery/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from tools.cloud_adapter.cloud import Cloud as CloudAdapter
from tools.cloud_adapter.exceptions import InvalidResourceTypeException
from tools.cloud_adapter.model import (
ResourceTypes, RES_MODEL_MAP, InstanceResource, RdsInstanceResource
ResourceTypes, RES_MODEL_MAP, InstanceResource, RdsInstanceResource,
RedshiftClusterResource, EmrApplicationResource
)
from tools.optscale_time import utcnow, utcnow_timestamp
from optscale_client.config_client.client import Client as ConfigClient
Expand Down Expand Up @@ -158,9 +159,11 @@ def process_resource_obj(self, resources):
flavors[flavor_name] = flavor
resource.cpu_count = flavor['cpu']
resource.ram = flavor['ram'] * BYTES_IN_MB
if not resource.architecture and resource.cloud_type in [
'aws_cnr', 'azure_cnr', 'alibaba_cnr'
]:
if (not resource.architecture and
isinstance(resource, InstanceResource) and
resource.cloud_type in [
'aws_cnr', 'azure_cnr', 'alibaba_cnr'
]):
flavor_arch = flavor_archs.get(flavor_name)
if not flavor_arch:
_, arch_info = self.insider_cl.get_architecture(
Expand Down
2 changes: 1 addition & 1 deletion ngui/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ FROM source AS builder

WORKDIR /usr/src/app

RUN pnpm run -r build
RUN NODE_OPTIONS="--max-old-space-size=3000" pnpm run -r build

# Prepare production dependencies for the server
RUN pnpm deploy --filter=server --prod ./prod/server && \
Expand Down
6 changes: 6 additions & 0 deletions ngui/ui/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ import {
updatePowerSchedule,
attachInstancesToSchedule,
removeInstancesFromSchedule,
powerScheduleTagAction,
getPowerScheduleResourcesLiveState,
runPowerSchedule,
createSurvey,
getMlTaskRunsBulk,
getMlLeaderboards,
Expand Down Expand Up @@ -400,6 +403,9 @@ export {
updatePowerSchedule,
attachInstancesToSchedule,
removeInstancesFromSchedule,
powerScheduleTagAction,
getPowerScheduleResourcesLiveState,
runPowerSchedule,
updateMlLeaderboardTemplate,
createSurvey,
getMlTaskRunsBulk,
Expand Down
Loading