Skip to content

Commit f6457a9

Browse files
committed
CLI - use library.storage
Remove S3Handler, replace with StorageS3 upload_file caught errors, returned None on success, False on fail download_file caught errors, returned True on success, False on fail storage get/put doesn't catch, and doesn't return anything but, users of `upload_file`: * `PackageS3` - doesn't catch, ignores return * gather will now fail if it fails to save the result * `ReportSaverS3` - catches right outside, ignores return users of `download_file`: * `ExtractorS3` - catches right outside, ignores return * we no longer try to process_tarballs on download failure update ExtractorS3, PackageS3, ReportSaverS3 to use StorageS3 instead of S3Handler ExtractorS3 - try -> get -> tempdir -> unpack Extractor - align ExtractorDirectory, ExtractorS3, both using storage directory uses ship_path as the base path for storage, so we use data/Y/m/d/ as the path prefix, relative to base path s3 uses ship_path as part of the path prefix, eg ship_path/data/Y/m/d/ Issue: AAP-54345
1 parent eaf3748 commit f6457a9

File tree

9 files changed

+90
-153
lines changed

9 files changed

+90
-153
lines changed

metrics_utility/automation_controller_billing/base/s3_handler.py

Lines changed: 0 additions & 88 deletions
This file was deleted.

metrics_utility/automation_controller_billing/extract/base.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def load_config(self, file_path):
5656
with open(file_path) as f:
5757
return json.loads(f.read())
5858
except FileNotFoundError:
59-
logger.warning(f'{self.LOG_PREFIX} missing required file under path: {file_path} and date: {self.date}')
59+
logger.warning(f'{self.LOG_PREFIX} missing required file under path {file_path}')
6060

6161
def process_tarballs(self, path, temp_dir, enabled_set=None):
6262
_safe_extract(path, temp_dir, enabled_set=enabled_set)
@@ -115,16 +115,6 @@ def csv_enabled(self, name):
115115

116116
return name in self.enabled_set
117117

118-
def get_path_prefix(self, date):
119-
"""Return the data/Y/m/d path"""
120-
ship_path = self.extra_params['ship_path']
121-
122-
year = date.strftime('%Y')
123-
month = date.strftime('%m')
124-
day = date.strftime('%d')
125-
126-
return f'{ship_path}/data/{year}/{month}/{day}'
127-
128118
def sheet_enabled(self, sheets_required):
129119
"""
130120
Checks if any sheets_required item is in METRICS_UTILITY_OPTIONAL_CCSP_REPORT_SHEETS
@@ -143,17 +133,23 @@ def filter_tarball_paths(self, paths, collections):
143133
if collections is None:
144134
return paths
145135

136+
# these are in *every* tarball, thus not a valid thing to filter by
146137
if 'data_collection_status' in collections:
147138
raise MetricsException('data_collection_status is not a valid tarball name filter')
148-
149139
if 'config' in collections:
150140
raise MetricsException('config is not a valid tarball name filter')
151141

152142
def match(s):
153-
# include all files produced by 0.6.0 and lower, and anything with an unexpected name
154-
if re.search(r'-\d+.tar.gz$', s):
143+
# require at least .tar.gz
144+
if not re.search(r'\.tar\.gz$', s):
145+
return False
146+
147+
# include all files produced by 0.6.0 and lower
148+
if re.search(r'-\d+\.tar\.gz$', s):
155149
return True
156-
if re.search(r'-\d+-\w+.tar.gz$', s) is None:
150+
151+
# include anything with an unexpected name
152+
if not re.search(r'-\d+-\w+\.tar\.gz$', s):
157153
return True
158154

159155
# should not happen, but make sure we're not ignoring data if it does
Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,38 @@
1-
import os
21
import tempfile
32

43
from metrics_utility.automation_controller_billing.extract.base import Base
4+
from metrics_utility.library.storage import StorageDirectory
55
from metrics_utility.logger import logger
66

77

88
class ExtractorDirectory(Base):
99
LOG_PREFIX = '[ExtractorDirectory]'
1010

11+
def __init__(self, extra_params):
12+
super().__init__(extra_params)
13+
14+
self.storage = StorageDirectory(base_path=extra_params.get('ship_path'))
15+
1116
def iter_batches(self, date, collections, optional):
1217
# Read tarball in memory in batches
1318
logger.debug(f'{self.LOG_PREFIX} Processing {date}')
14-
paths = self.fetch_partition_paths(date, collections)
15-
16-
for path in paths:
17-
if not path.endswith('.tar.gz'):
18-
continue
19-
20-
with tempfile.TemporaryDirectory(prefix='automation_controller_billing_data_') as temp_dir:
21-
try:
22-
yield self.process_tarballs(path, temp_dir, enabled_set=(collections or []) + (optional or []))
19+
enabled_set = (collections or []) + (optional or [])
2320

24-
except Exception as e:
25-
logger.exception(f'{self.LOG_PREFIX} ERROR: Extracting {path} failed with {e}')
21+
for path in self._fetch_partition_paths(date, collections):
22+
try:
23+
with self.storage.get(path) as local_path:
24+
with tempfile.TemporaryDirectory(prefix='automation_controller_billing_data_') as temp_dir:
25+
yield self.process_tarballs(local_path, temp_dir, enabled_set=enabled_set)
26+
except Exception as e:
27+
logger.exception(f'{self.LOG_PREFIX} ERROR: Extracting {path} failed with {e}')
2628

27-
def fetch_partition_paths(self, date, collections):
28-
prefix = self.get_path_prefix(date)
29+
def _fetch_partition_paths(self, date, collections):
30+
year = date.strftime('%Y')
31+
month = date.strftime('%m')
32+
day = date.strftime('%d')
2933

30-
try:
31-
paths = [os.path.join(prefix, f) for f in os.listdir(prefix) if os.path.isfile(os.path.join(prefix, f))]
32-
except FileNotFoundError:
33-
paths = []
34+
# relative to storage base_path (=ship_path)
35+
prefix = f'data/{year}/{month}/{day}'
3436

37+
paths = self.storage.list_files(prefix)
3538
return self.filter_tarball_paths(paths, collections)
Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
import os
21
import tempfile
32

4-
from metrics_utility.automation_controller_billing.base.s3_handler import S3Handler
53
from metrics_utility.automation_controller_billing.extract.base import Base
4+
from metrics_utility.library.storage import StorageS3
65
from metrics_utility.logger import logger
76

87

@@ -12,27 +11,34 @@ class ExtractorS3(Base):
1211
def __init__(self, extra_params):
1312
super().__init__(extra_params)
1413

15-
self.s3_handler = S3Handler(params=self.extra_params)
14+
self.storage = StorageS3(
15+
bucket=extra_params.get('bucket_name'),
16+
endpoint=extra_params.get('bucket_endpoint'),
17+
region=extra_params.get('bucket_region'),
18+
access_key=extra_params.get('bucket_access_key'),
19+
secret_key=extra_params.get('bucket_secret_key'),
20+
)
1621

1722
def iter_batches(self, date, collections, optional):
1823
# Read tarball in memory in batches
1924
logger.debug(f'{self.LOG_PREFIX} Processing {date}')
20-
s3_paths = self.fetch_partition_paths(date, collections)
25+
enabled_set = (collections or []) + (optional or [])
2126

22-
for s3_path in s3_paths:
23-
with tempfile.TemporaryDirectory(prefix='automation_controller_billing_data_') as temp_dir:
24-
try:
25-
local_path = os.path.join(temp_dir, 'source_tarball')
26-
self.s3_handler.download_file(s3_path, local_path)
27+
for s3_path in self._fetch_partition_paths(date, collections):
28+
try:
29+
with self.storage.get(s3_path) as local_path:
30+
with tempfile.TemporaryDirectory(prefix='automation_controller_billing_data_') as temp_dir:
31+
yield self.process_tarballs(local_path, temp_dir, enabled_set=enabled_set)
32+
except Exception as e:
33+
logger.exception(f'{self.LOG_PREFIX} ERROR: Extracting {s3_path} failed with {e}')
2734

28-
yield self.process_tarballs(local_path, temp_dir, enabled_set=(collections or []) + (optional or []))
35+
def _fetch_partition_paths(self, date, collections):
36+
year = date.strftime('%Y')
37+
month = date.strftime('%m')
38+
day = date.strftime('%d')
2939

30-
except Exception as e:
31-
logger.exception(f'{self.LOG_PREFIX} ERROR: Extracting {s3_path} failed with {e}')
32-
33-
def fetch_partition_paths(self, date, collections):
34-
# FIXME: apply collections= filtering, so we don't download files from S3 if we know they don't have the right thing
35-
prefix = self.get_path_prefix(date)
36-
paths = self.s3_handler.list_files(prefix)
40+
ship_path = self.extra_params.get('ship_path')
41+
prefix = f'{ship_path}/data/{year}/{month}/{day}'
3742

43+
paths = self.storage.list_files(prefix)
3844
return self.filter_tarball_paths(paths, collections)

metrics_utility/automation_controller_billing/package/package_directory.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import os
2-
import shutil
32

43
from django.conf import settings
54

65
import metrics_utility.base as base
76

7+
from metrics_utility.library.storage import StorageDirectory
88
from metrics_utility.logger import logger
99

1010

@@ -58,8 +58,11 @@ def ship(self):
5858
since, _ = self._batch_since_and_until()
5959
destination_path = self._destination_path(self.collector.billing_provider_params['ship_path'], since, os.path.basename(self.tar_path))
6060

61-
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
62-
shutil.copyfile(self.tar_path, destination_path)
61+
params = self.collector.billing_provider_params
62+
storage = StorageDirectory(
63+
base_path=params.get('ship_path'),
64+
)
65+
storage.put(destination_path, filename=self.tar_path)
6366

6467
logger.debug(f'tarball saved to: {destination_path}')
6568

metrics_utility/automation_controller_billing/package/package_s3.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import metrics_utility.base as base
66

7-
from metrics_utility.automation_controller_billing.base.s3_handler import S3Handler
7+
from metrics_utility.library.storage import StorageS3
88
from metrics_utility.logger import logger
99

1010

@@ -58,8 +58,15 @@ def ship(self):
5858
since, _ = self._batch_since_and_until()
5959
destination_path = self._destination_path(self.collector.billing_provider_params['ship_path'], since, os.path.basename(self.tar_path))
6060

61-
s3_handler = S3Handler(params=self.collector.billing_provider_params)
62-
s3_handler.upload_file(self.tar_path, object_name=destination_path)
61+
params = self.collector.billing_provider_params
62+
storage = StorageS3(
63+
bucket=params.get('bucket_name'),
64+
endpoint=params.get('bucket_endpoint'),
65+
region=params.get('bucket_region'),
66+
access_key=params.get('bucket_access_key'),
67+
secret_key=params.get('bucket_secret_key'),
68+
)
69+
storage.put(destination_path, filename=self.tar_path)
6370

6471
logger.debug(f'tarball saved to: {destination_path}')
6572

metrics_utility/automation_controller_billing/report_saver/report_saver_directory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import os
22

33

4+
# Not wrapping StorageDirectory just because .. why, but:
5+
# FIXME: replace ReportSaver directly with storage
6+
7+
48
class ReportSaverDirectory:
59
def __init__(self, extra_params):
610
self.extra_params = extra_params
Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
import tempfile
33

4-
from metrics_utility.automation_controller_billing.base.s3_handler import S3Handler
4+
from metrics_utility.library.storage import StorageS3
55
from metrics_utility.logger import logger
66

77

@@ -11,22 +11,28 @@ class ReportSaverS3:
1111
def __init__(self, extra_params):
1212
self.extra_params = extra_params
1313

14+
# FIXME: remove once build_report no longer uses it
1415
self.report_spreadsheet_destination_path = self.extra_params['report_spreadsheet_destination_path']
1516

16-
self.s3_handler = S3Handler(params=self.extra_params)
17+
self.dest_path = extra_params['report_spreadsheet_destination_path']
18+
self.storage = StorageS3(
19+
bucket=extra_params.get('bucket_name'),
20+
endpoint=extra_params.get('bucket_endpoint'),
21+
region=extra_params.get('bucket_region'),
22+
access_key=extra_params.get('bucket_access_key'),
23+
secret_key=extra_params.get('bucket_secret_key'),
24+
)
1725

1826
def report_exist(self):
19-
return len([file for file in self.s3_handler.list_files(self.report_spreadsheet_destination_path)]) > 0
27+
return self.storage.exists(self.dest_path)
2028

2129
def save(self, report_spreadsheet):
2230
with tempfile.TemporaryDirectory(prefix='report_saver_billing_data_') as temp_dir:
2331
try:
2432
local_report_path = os.path.join(temp_dir, 'report')
2533
report_spreadsheet.save(local_report_path)
26-
27-
self.s3_handler.upload_file(local_report_path, self.report_spreadsheet_destination_path)
28-
34+
self.storage.put(self.dest_path, filename=local_report_path)
2935
except Exception as e:
30-
logger.exception(f'{self.LOG_PREFIX} ERROR: Saving report to S3 into path {self.report_spreadsheet_destination_path} failed with {e}')
36+
logger.exception(f'{self.LOG_PREFIX} ERROR: Saving report to S3 into path {self.dest_path} failed with {e}')
3137

32-
logger.info(f'Report sent into S3 bucket into path: {self.report_spreadsheet_destination_path}')
38+
logger.info(f'Report sent into S3 bucket into path: {self.dest_path}')

metrics_utility/management/validation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def handle_s3_ship_target():
101101
if missing:
102102
raise MissingRequiredEnvVar(f'Missing some required env variables for S3 configuration, namely: {", ".join(missing)}.')
103103

104-
# S3Handler params
104+
# S3Handler params - passed to StorageS3 as bucket, endpoint, region, access_key, secret_key (no ship_path)
105105
return {
106106
'ship_path': ship_path,
107107
'bucket_name': bucket_name,

0 commit comments

Comments
 (0)