Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Enable concurrent read from IPFS in replay #425

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions ipwb/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import surt
import re
import traceback
import signal
from pywb.utils.binsearch import iter_exact
from pywb.utils.canonicalize import unsurt
# from pywb.utils.canonicalize import canonicalize as surt
Expand All @@ -37,6 +36,9 @@

import requests

from threading import Thread
import time

import util as ipwbUtils
from util import IPFSAPI_IP, IPFSAPI_PORT, IPWBREPLAY_IP, IPWBREPLAY_PORT
from util import INDEX_FILE
Expand Down Expand Up @@ -525,6 +527,13 @@ def getRequestedSetting(requestedSetting):
return Response(ipwbUtils.getIPFSAPIHostAndPort() + '/webui')


# Lookup digest in IPFS and populate the 'message' dic using specified key.
# The key here could either be 'header' or 'payload'.
# Using the mutable 'message' dict instead of returning a value due to the
# asynchronous nature of threads which is being utilized to call this function.
def load_from_ipfs(digest, message, key):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been using camelCaseFunctionNames whereas here you used an_underscore_delimited_function_name. Please make this consistent to match the style of the rest of the package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not consistent about camelCasing. I initially named that way, but found some functions defined with underscores, so changed it just before committing the code. Changing it back to camelCase now.

message[key] = IPFS_API.cat(digest)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra CRLF needed here to comply with pycodestyle.

@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def show_uri(path, datetime=None):
Expand Down Expand Up @@ -578,24 +587,29 @@ def show_uri(path, datetime=None):

digests = jObj['locator'].split('/')

class HashNotFoundError(Exception):
pass

payload = None
header = None
payload = None
try:
def handler(signum, frame):
raise HashNotFoundError()

if os.name != 'nt': # Bug #310
signal.signal(signal.SIGALRM, handler)
signal.alarm(10)

payload = IPFS_API.cat(digests[-1])
header = IPFS_API.cat(digests[-2])

if os.name != 'nt': # Bug #310
signal.alarm(0)
message = {'header': None, 'payload': None}
fetchHeader = Thread(target=load_from_ipfs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove superfluous space to comply with pycodestyle.

args=(digests[-2], message, 'header'))
fetchPayload = Thread(target=load_from_ipfs,
args=(digests[-1], message, 'payload'))
IPFSTIMEOUT = 10
fetch_start = time.time()
fetchHeader.start()
fetchPayload.start()
fetchHeader.join(IPFSTIMEOUT)
fetchPayload.join(IPFSTIMEOUT - (time.time() - fetch_start))
header = message['header']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove superfluous space to comply with pycodestyle.

payload = message['payload']
if (time.time() - fetch_start) >= IPFSTIMEOUT:
if payload is None:
print("Hashes not found")
return '', 404
else: # payload found but not header, fabricate header
print("HTTP header not found, fabricating for resp replay")
header = ''

except ipfsapi.exceptions.TimeoutError:
print("{0} not found at {1}".format(cdxjParts[0], digests[-1]))
Expand All @@ -607,14 +621,8 @@ def handler(signum, frame):
print('A type error occurred')
print(traceback.format_exc())
print(sys.exc_info()[0])
except HashNotFoundError:
if payload is None:
print("Hashes not found")
return '', 404
else: # payload found but not header, fabricate header
print("HTTP header not found, fabricating for resp replay")
header = ''
except Exception as e:
print(e)
print('Unknown exception occurred while fetching from ipfs.')
print(sys.exc_info()[0])
sys.exit()
Expand Down