From f043c0ef248a4d7d5cdf23f8dc430470380cbf47 Mon Sep 17 00:00:00 2001 From: Sawood Alam Date: Mon, 9 Jul 2018 00:52:37 -0400 Subject: [PATCH 1/2] Enable concurrent read from IPFS in replay, #379 --- ipwb/replay.py | 56 ++++++++++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/ipwb/replay.py b/ipwb/replay.py index 4988232a..94121cef 100755 --- a/ipwb/replay.py +++ b/ipwb/replay.py @@ -20,7 +20,6 @@ import surt import re import traceback -import signal from pywb.utils.binsearch import iter_exact from pywb.utils.canonicalize import unsurt # from pywb.utils.canonicalize import canonicalize as surt @@ -37,6 +36,9 @@ import requests +from threading import Thread +import time + import util as ipwbUtils from util import IPFSAPI_IP, IPFSAPI_PORT, IPWBREPLAY_IP, IPWBREPLAY_PORT from util import INDEX_FILE @@ -525,6 +527,13 @@ def getRequestedSetting(requestedSetting): return Response(ipwbUtils.getIPFSAPIHostAndPort() + '/webui') +# Lookup digest in IPFS and populate the 'message' dic using specified key. +# The key here could either be 'header' or 'payload'. +# Using the mutable 'message' dict instead of returning a value due to the +# asynchronous nature of threads which is being utilized to call this function. +def load_from_ipfs(digest, message, key): + message[key] = IPFS_API.cat(digest) + @app.route('/', defaults={'path': ''}) @app.route('/') def show_uri(path, datetime=None): @@ -578,24 +587,29 @@ def show_uri(path, datetime=None): digests = jObj['locator'].split('/') - class HashNotFoundError(Exception): - pass - - payload = None header = None + payload = None try: - def handler(signum, frame): - raise HashNotFoundError() - - if os.name != 'nt': # Bug #310 - signal.signal(signal.SIGALRM, handler) - signal.alarm(10) - - payload = IPFS_API.cat(digests[-1]) - header = IPFS_API.cat(digests[-2]) - - if os.name != 'nt': # Bug #310 - signal.alarm(0) + message = {'header': None, 'payload': None} + fetchHeader = Thread(target=load_from_ipfs, + args=(digests[-2], message, 'header')) + fetchPayload = Thread(target=load_from_ipfs, + args=(digests[-1], message, 'payload')) + IPFSTIMEOUT = 10 + fetch_start = time.time() + fetchHeader.start() + fetchPayload.start() + fetchHeader.join(IPFSTIMEOUT) + fetchPayload.join(IPFSTIMEOUT - (time.time() - fetch_start)) + header = message['header'] + payload = message['payload'] + if (time.time() - fetch_start) >= IPFSTIMEOUT: + if payload is None: + print("Hashes not found") + return '', 404 + else: # payload found but not header, fabricate header + print("HTTP header not found, fabricating for resp replay") + header = '' except ipfsapi.exceptions.TimeoutError: print("{0} not found at {1}".format(cdxjParts[0], digests[-1])) @@ -607,14 +621,8 @@ def handler(signum, frame): print('A type error occurred') print(traceback.format_exc()) print(sys.exc_info()[0]) - except HashNotFoundError: - if payload is None: - print("Hashes not found") - return '', 404 - else: # payload found but not header, fabricate header - print("HTTP header not found, fabricating for resp replay") - header = '' except Exception as e: + print(e) print('Unknown exception occurred while fetching from ipfs.') print(sys.exc_info()[0]) sys.exit() From b0f7654cfa1f24909ba36bcef5d4a1b6e59cad92 Mon Sep 17 00:00:00 2001 From: Sawood Alam Date: Mon, 9 Jul 2018 13:46:27 -0400 Subject: [PATCH 2/2] Minor changes to comply with pycodestyle --- ipwb/replay.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/ipwb/replay.py b/ipwb/replay.py index 94121cef..5ef01eb5 100755 --- a/ipwb/replay.py +++ b/ipwb/replay.py @@ -531,9 +531,10 @@ def getRequestedSetting(requestedSetting): # The key here could either be 'header' or 'payload'. # Using the mutable 'message' dict instead of returning a value due to the # asynchronous nature of threads which is being utilized to call this function. -def load_from_ipfs(digest, message, key): +def loadFromIPFS(digest, message, key): message[key] = IPFS_API.cat(digest) + @app.route('/', defaults={'path': ''}) @app.route('/') def show_uri(path, datetime=None): @@ -591,9 +592,9 @@ def show_uri(path, datetime=None): payload = None try: message = {'header': None, 'payload': None} - fetchHeader = Thread(target=load_from_ipfs, - args=(digests[-2], message, 'header')) - fetchPayload = Thread(target=load_from_ipfs, + fetchHeader = Thread(target=loadFromIPFS, + args=(digests[-2], message, 'header')) + fetchPayload = Thread(target=loadFromIPFS, args=(digests[-1], message, 'payload')) IPFSTIMEOUT = 10 fetch_start = time.time() @@ -601,7 +602,7 @@ def show_uri(path, datetime=None): fetchPayload.start() fetchHeader.join(IPFSTIMEOUT) fetchPayload.join(IPFSTIMEOUT - (time.time() - fetch_start)) - header = message['header'] + header = message['header'] payload = message['payload'] if (time.time() - fetch_start) >= IPFSTIMEOUT: if payload is None: