Skip to content

Commit 8893182

Browse files
Federico CerattoFederico Ceratto
Federico Ceratto
authored and
Federico Ceratto
committed
FP 0.27 Stream cans: parallel stream + process
Add flags to prevent writing to db and to disk Generate ETA metric Fix generator type hints Add unit test
1 parent bdd326c commit 8893182

File tree

5 files changed

+130
-90
lines changed

5 files changed

+130
-90
lines changed

af/fastpath/debian/changelog

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
fastpath (0.27) unstable; urgency=medium
2+
3+
* Stream cans
4+
5+
-- Federico Ceratto <[email protected]> Fri, 01 May 2020 13:36:08 +0100
6+
17
fastpath (0.26) unstable; urgency=medium
28

39
* Improve file trimming

af/fastpath/fastpath/core.py

Lines changed: 29 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ def setup():
106106
ap.add_argument(
107107
"--stop-after", type=int, help="Stop after feeding N measurements", default=None
108108
)
109+
ap.add_argument("--no-write-msmt", action="store_true",
110+
help="Do not write measurement on disk")
111+
ap.add_argument("--no-write-to-db", action="store_true",
112+
help="Do not insert measurement in database")
109113
conf = ap.parse_args()
110114

111115
if conf.devel or conf.stdout or no_journal_handler:
@@ -241,39 +245,6 @@ def clean_caches():
241245
}
242246

243247

244-
@metrics.timer("load_s3_measurements")
245-
def load_s3_measurements(day) -> Iterator[MsmtTup]:
246-
# TODO: move this into s3feeder
247-
t0 = time.time()
248-
path = conf.s3cachedir / str(day)
249-
log.info("Scanning %s", path.absolute())
250-
files = []
251-
with os.scandir(path) as d:
252-
for e in d:
253-
if not e.is_file():
254-
continue
255-
if e.name == "index.json.gz":
256-
continue
257-
files.append(e)
258-
259-
fcnt = 0
260-
for e in sorted(files, key=lambda f: f.name):
261-
fcnt += 1
262-
log.info(f"Ingesting [{fcnt}/{len(files)}] {e.name}")
263-
fn = os.path.join(path, e.name)
264-
try:
265-
for measurement_tup in s3feeder.load_multiple(fn):
266-
yield measurement_tup
267-
except:
268-
log.error(f"Ingesting [{fcnt}/{len(files)}] {e.name}", exc_info=True)
269-
270-
remaining = (time.time() - t0) * (len(files) - fcnt) / fcnt
271-
metrics.gauge("load_s3_measurements_eta", remaining)
272-
metrics.gauge("load_s3_measurements_remaining_files", len(files) - fcnt)
273-
remaining_td = timedelta(seconds=remaining)
274-
log.info("load_s3_measurements remaining time: %s", remaining_td)
275-
276-
277248
def prepare_for_json_normalize(report):
278249
try:
279250
d = report["test_keys"]["control"]["tcp_connect"]
@@ -296,22 +267,9 @@ def fetch_measurements(start_day, end_day) -> Iterator[MsmtTup]:
296267
# --start-day -> Run on old cans, then over SSH
297268
# --start-day and --end-day -> Run on old cans
298269
# --start-day and --end-day with the same date -> NOP
299-
today = date.today()
300-
if start_day and start_day < today:
301-
# TODO: fetch day N+1 while processing day N
302-
log.info("Fetching older cans from S3")
303-
day = start_day
304-
while day < (end_day or today):
305-
log.info("Processing %s", day)
306-
s3feeder.fetch_cans_for_a_day_with_cache(conf, day)
307-
for measurement_tup in load_s3_measurements(day):
308-
yield measurement_tup
309-
310-
day += timedelta(days=1)
311-
312-
if end_day:
313-
log.info("Reached {end_day}, exiting")
314-
return
270+
271+
for measurement_tup in s3feeder.stream_cans(conf, start_day, end_day):
272+
yield measurement_tup
315273

316274
if conf.nossh:
317275
log.info("Not fetching over SSH")
@@ -1295,7 +1253,11 @@ def file_trimmer(conf):
12951253
def msm_processor(queue):
12961254
"""Measurement processor worker
12971255
"""
1298-
db.setup(conf)
1256+
if conf.no_write_to_db:
1257+
log.warning("not writing to database")
1258+
else:
1259+
db.setup(conf)
1260+
12991261
while True:
13001262
msm_tup = queue.get()
13011263
if msm_tup is None:
@@ -1314,7 +1276,8 @@ def msm_processor(queue):
13141276
sshfeeder.log_ingestion_delay(measurement)
13151277
log.debug(f"Processing {tid} {rid} {inp}")
13161278
fn = generate_filename(tid)
1317-
writeout_measurement(msm_jstr, fn, conf.update, tid)
1279+
if not conf.no_write_msmt:
1280+
writeout_measurement(msm_jstr, fn, conf.update, tid)
13181281
if measurement.get("test_name", None) == "web_connectivity":
13191282
matches = match_fingerprints(measurement)
13201283
else:
@@ -1330,16 +1293,17 @@ def msm_processor(queue):
13301293
log.debug(
13311294
f"Storing {tid} {rid} {inp} A{int(anomaly)} F{int(failure)} C{int(confirmed)}"
13321295
)
1333-
db.upsert_summary(
1334-
measurement,
1335-
scores,
1336-
anomaly,
1337-
confirmed,
1338-
failure,
1339-
tid,
1340-
fn,
1341-
conf.update,
1342-
)
1296+
if not conf.no_write_to_db:
1297+
db.upsert_summary(
1298+
measurement,
1299+
scores,
1300+
anomaly,
1301+
confirmed,
1302+
failure,
1303+
tid,
1304+
fn,
1305+
conf.update,
1306+
)
13431307
except Exception as e:
13441308
log.exception(e)
13451309
metrics.incr("unhandled_exception")
@@ -1366,7 +1330,10 @@ def core():
13661330
scores = None
13671331

13681332
# Spawn file trimmer process
1369-
mp.Process(target=file_trimmer, args=(conf,)).start()
1333+
if conf.no_write_msmt:
1334+
log.warning("Not trimming old measurement on disk")
1335+
else:
1336+
mp.Process(target=file_trimmer, args=(conf,)).start()
13701337

13711338
# Spawn worker processes
13721339
# 'queue' is a singleton from the portable_queue module

af/fastpath/fastpath/db.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
from textwrap import dedent
1111
import logging
12-
import os
13-
import time
1412

1513
import psycopg2 # debdeps: python3-psycopg2
1614
from psycopg2.extras import Json

af/fastpath/fastpath/s3feeder.py

Lines changed: 71 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
1616
"""
1717

18-
from typing import Iterator
18+
from datetime import date, timedelta
19+
from typing import Generator
20+
from pathlib import Path
1921
import logging
2022
import os
2123
import time
@@ -42,14 +44,11 @@
4244
logging.getLogger(l).setLevel(logging.INFO)
4345

4446

45-
def load_multiple(fn, touch=True) -> Iterator[MsmtTup]:
47+
def load_multiple(fn: str) -> Generator[MsmtTup, None, None]:
4648
"""Load contents of cans. Decompress tar archives if found.
4749
Yields measurements one by one as:
4850
(string of JSON, None) or (None, msmt dict)
4951
"""
50-
if touch:
51-
os.utime(fn) # update access time - used for cache cleanup
52-
5352
# TODO: handle:
5453
# RuntimeError: LZ4F_decompress failed with code: ERROR_decompressionFailed
5554
if fn.endswith(".tar.lz4"):
@@ -81,10 +80,10 @@ def load_multiple(fn, touch=True) -> Iterator[MsmtTup]:
8180
elif fn.endswith(".yaml.lz4"):
8281
with lz4frame.open(fn) as f:
8382
raise Exception("Unsupported format: YAML")
84-
bucket_tstamp = "FIXME"
85-
for msm in iter_yaml_msmt_normalized(f, bucket_tstamp):
86-
metrics.incr("yaml_normalization")
87-
yield (None, msm)
83+
# bucket_tstamp = "FIXME"
84+
# for msm in iter_yaml_msmt_normalized(f, bucket_tstamp):
85+
# metrics.incr("yaml_normalization")
86+
# yield (None, msm)
8887

8988
else:
9089
raise RuntimeError(fn)
@@ -94,16 +93,6 @@ def create_s3_client():
9493
return boto3.Session(profile_name=AWS_PROFILE).client("s3")
9594

9695

97-
def list_cans_on_s3(prefix):
98-
# TODO list files based on date and return them
99-
s3 = create_s3_client()
100-
r = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix="canned/" + prefix)
101-
for filedesc in r["Contents"]:
102-
fn = filedesc["Key"]
103-
size = filedesc["Size"]
104-
print("%s size %d MB" % (fn, size / 1024 / 1024))
105-
106-
10796
def list_cans_on_s3_for_a_day(s3, day):
10897
prefix = f"{day}/"
10998
r = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix="canned/" + prefix)
@@ -116,10 +105,11 @@ def list_cans_on_s3_for_a_day(s3, day):
116105

117106

118107
@metrics.timer("fetch_cans")
119-
def fetch_cans(s3, conf, files):
108+
def fetch_cans(s3, conf, files) -> Generator[Path, None, None]:
120109
"""
121110
Download cans to a local directory
122111
fnames = [("2013-09-12/20130912T150305Z-MD-AS1547-http_", size), ... ]
112+
yield each can file Path
123113
"""
124114
# fn: can filename without path
125115
# diskf: File in the s3cachedir directory
@@ -144,7 +134,7 @@ def _cb(bytes_count):
144134
_cb.count += bytes_count
145135
_cb.total_count += bytes_count
146136
metrics.gauge("s3_download_percentage", _cb.total_count / _cb.total_size * 100)
147-
log.debug("s3_download_percentage %d", _cb.total_count / _cb.total_size * 100)
137+
# log.debug("s3_download_percentage %d", _cb.total_count / _cb.total_size * 100)
148138
try:
149139
speed = _cb.count / 131_072 / (time.time() - _cb.start_time)
150140
metrics.gauge("s3_download_speed_avg_Mbps", speed)
@@ -169,16 +159,71 @@ def _cb(bytes_count):
169159
metrics.gauge("fetching", 0)
170160
tmpf.rename(diskf)
171161
assert size == diskf.stat().st_size
162+
yield diskf
172163

173164
metrics.gauge("s3_download_speed_avg_Mbps", 0)
174165

175166

167+
# TODO: merge with stream_daily_cans, add caching to the latter to be used
168+
# during functional tests
176169
@metrics.timer("fetch_cans_for_a_day_with_cache")
177170
def fetch_cans_for_a_day_with_cache(conf, day):
178171
s3 = create_s3_client()
179172
fns = list_cans_on_s3_for_a_day(s3, day)
180-
#can_names = [
181-
# "2020-03-03/web_connectivity.14.tar.lz4",
182-
#]
183-
#fns = [(name, size) for name, size in fns if name in can_names]
184-
fetch_cans(s3, conf, fns)
173+
list(fetch_cans(s3, conf, fns))
174+
175+
176+
def _calculate_etr(t0, now, start_day, day, stop_day, can_num, can_tot_count) -> int:
177+
"""Estimate total runtime in seconds.
178+
stop_day is not included, can_num starts from 0
179+
"""
180+
tot_days_count = (stop_day - start_day).days
181+
elapsed = now - t0
182+
days_done = (day - start_day).days
183+
fraction_of_day_done = (can_num + 1) / float(can_tot_count)
184+
etr = elapsed * tot_days_count / (days_done + fraction_of_day_done)
185+
return etr
186+
187+
188+
def _update_eta(t0, start_day, day, stop_day, can_num, can_tot_count):
189+
"""Generate metric process_s3_measurements_eta expressed as epoch
190+
"""
191+
try:
192+
now = time.time()
193+
etr = _calculate_etr(t0, now, start_day, day, stop_day, can_num, can_tot_count)
194+
eta = t0 + etr
195+
metrics.gauge("process_s3_measurements_eta", eta)
196+
except:
197+
pass
198+
199+
200+
def stream_cans(conf, start_day: date, end_day: date) -> Generator[MsmtTup, None, None]:
201+
"""Stream cans from S3
202+
"""
203+
today = date.today()
204+
if not start_day or start_day >= today:
205+
return
206+
207+
log.info("Fetching older cans from S3")
208+
t0 = time.time()
209+
day = start_day
210+
s3 = create_s3_client()
211+
# the last day is not included
212+
stop_day = end_day if end_day < today else today
213+
while day < stop_day:
214+
log.info("Processing day %s", day)
215+
cans_fns = list_cans_on_s3_for_a_day(s3, day)
216+
for cn, can_f in enumerate(fetch_cans(s3, conf, cans_fns)):
217+
try:
218+
_update_eta(t0, start_day, day, stop_day, cn, len(cans_fns))
219+
log.info("can %s ready", can_f.name)
220+
for msmt_tup in load_multiple(can_f.as_posix()):
221+
yield msmt_tup
222+
except Exception as e:
223+
log.error(str(e), exc_info=True)
224+
225+
day += timedelta(days=1)
226+
227+
if end_day:
228+
log.info("Reached {end_day}, stream_cans finished")
229+
return

af/fastpath/fastpath/tests/test_unit.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
# Fastpath - unit tests
33
#
44

5+
from datetime import date
6+
57
import fastpath.core as fp
8+
import fastpath.s3feeder as s3feeder
69
import ujson
710

811

@@ -165,3 +168,24 @@ def test_bug_backend352():
165168
"blocking_isp": 0.0,
166169
"blocking_local": 0.0,
167170
}
171+
172+
173+
def test_s3feeder_eta():
174+
t0 = 1588200000
175+
now = t0 + 3600
176+
start_day = date(2020, 1, 1)
177+
day = date(2020, 1, 1)
178+
stop_day = date(2020, 1, 2)
179+
180+
etr = s3feeder._calculate_etr(t0, now, start_day, day, stop_day, 0, 4)
181+
assert etr / 3600 == 4
182+
etr = s3feeder._calculate_etr(t0, now, start_day, day, stop_day, 3, 4)
183+
assert etr / 3600 == 1
184+
etr = s3feeder._calculate_etr(
185+
t0, now, start_day, date(2020, 1, 2), date(2020, 1, 5), -1, 9
186+
)
187+
assert etr / 3600 == 4.0
188+
etr = s3feeder._calculate_etr(
189+
t0, now, start_day, date(2020, 1, 4), date(2020, 1, 5), 9, 10
190+
)
191+
assert etr / 3600 == 1.0

0 commit comments

Comments
 (0)