diff --git a/README.md b/README.md index 5db4cff..35a8a26 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,16 @@ Release notes ============= -**Glacier command line utility was renamed from `glacier` to `glacier-cmd`, because of inconsistencies with boto.** +**15/10/2012: Bookkeeping database layout has changed a bit, the old format would cause problems if multiple files with the same name were uploaded to a vault. If you are upgrading this tool, run `glacier-cmd updatedb` once to update your bookkeeping database to the new layout. If you do not use bookkeeping, no need for this.** -**Renamed configuration file from `.glacier` to `.glacier-cmd` to reflect new name of this utility.** +**WARNING: make sure this SimpleDB domain is only used for glacier-cmd bookkeeping data storage, as all items that are not recognised will be cleaned up in the process.** **Command line parameter description changed from positional argument to optional argument. This means from now on you must add `--description ` on the command line to give a description. This to allow for multiple file names and wild cards to be used in conjunction with the `upload` subcommand.** +**Renamed configuration file from `.glacier` to `.glacier-cmd` to reflect new name of this utility.** + +**Glacier command line utility was renamed from `glacier` to `glacier-cmd`, because of inconsistencies with boto.** + **For everybody having problems with install, don't forget to install git.** Amazon Glacier CLI diff --git a/doc/Usage.rst b/doc/Usage.rst index 11c565b..c5c0c2b 100644 --- a/doc/Usage.rst +++ b/doc/Usage.rst @@ -292,17 +292,27 @@ Set a description of your archive. This may be up to 1024 characters long, and w Resume an interrupted job with the specified uploadid. If this option is present, ``glacier-cmd`` will check wether this uploadid exists, and if so check the hashes of the already uploaded parts to the local file. If all parts match, the upload will be resumed. If there is any problem, an error message will be shown. -* ``--resume`` +The uploadid of an upload can be found using the ``--listmultiparts`` command as described above. -Not implemented yet. +* ``--resume`` Attempt to automatically resume an upload using information stored in the bookkeeping database. This option requires :doc:`Bookkeeping` to be enabled. +This checks first if the file name and file size match the information in the database, then whether there is a multipart job active for this upload, and finally whether the already uploaded data matches the local data. + +This option only works for uploads of local files; in case of resumption of stdin jobs you must use the ``--uploadid`` option as described above. + * ``--bacula`` The file name is a bacula-style list of multiple files. This is useful if this script is used in conjunction with the Bacula backup software. Bacula separates files with the `|` character; see :doc:`Scripting` for more details. The file list should look like ``/path/to/backups/vol001|vol002|vol003``, with the path given by the user script. +* ``--sessions`` + +Number of parallel upload sessions to use. Default is 1. This may be used to speed up uploads of archives to Glacier. + +The script has no upper limit on number of sessions, you may experiment a bit to find out what your system can handle and what gives you the best results. Note that with multiple sessions the upload rate indicated is inaccurate at the beginning, and becomes progressively more accurate over time. + Downloading an archive. ^^^^^^^^^^^^^^^^^^^^^^^ @@ -358,6 +368,10 @@ The name of the file to write the downloaded data to. If omitted, stdout is used Overwrite a local file with the same name. If not given, an error will be shown if `` exists already. +* ``--resume`` + +Attempt to resume an aborted download session. This requires the ``--outfile`` option to be given with the name of the file containing the partially downloaded archive. When invoked, the remote data will be compared to the already downloaded data, and if it matches the download will continue. + Deleting an archive. ^^^^^^^^^^^^^^^^^^^^ @@ -458,3 +472,13 @@ To calculate the tree hash from a local file, to compare with the hash Amazon pr +--------------+------------------------------------------------------------------+ +Update database. +---------------- +``$glacier-cmd updatedb`` + +.. program-output:: glacier-cmd updatedb -h + +This anyone upgrading from a pre-late-oct-2012 version has to run once. That time changes were made to the bookkeeping database, and this command converts your current database to the new format. + +Running it more than once has no effect, and this command is subject to removal from future versions. + diff --git a/glacier/GlacierWrapper.py b/glacier/GlacierWrapper.py index 2af4cf7..aca602a 100644 --- a/glacier/GlacierWrapper.py +++ b/glacier/GlacierWrapper.py @@ -22,16 +22,22 @@ import termios import struct import mmap +import multiprocessing +import math from functools import wraps from dateutil.parser import parse as dtparse from datetime import datetime from pprint import pformat - from glaciercorecalls import GlacierConnection, GlacierWriter - from glacierexception import * + +def _call_back(data): + global counter + counter += 1 + print 'counter: %s, data: %s'% (counter, data) + class log_class_call(object): """ Decorator that logs class calls to specific functions. @@ -719,6 +725,7 @@ def list_jobs(self, vault_name, completed=None, code=e.code) job_list += response.copy()['JobList'] marker = response.copy()['Marker'] + response.read() if limit and len(job_list) >= limit: job_list = job_list[:limit] break @@ -850,6 +857,7 @@ def listmultiparts(self, vault_name, limit=None): uploads += response.copy()['UploadsList'] marker = response.copy()['Marker'] + response.read() if limit and len(uploads) >= limit: uploads = uploads[:limit] break @@ -864,7 +872,7 @@ def listmultiparts(self, vault_name, limit=None): @log_class_call("Uploading archive.", "Upload of archive finished.") def upload(self, vault_name, file_name, description, region, - stdin, alternative_name, part_size, uploadid, resume): + stdin, alternative_name, part_size, uploadid, resume, sessions): """ Uploads a file to Amazon Glacier. @@ -913,7 +921,6 @@ def upload(self, vault_name, file_name, description, region, # Otherwise try to read data from stdin. total_size = 0 reader = None - mmapped_file = None if not stdin: if not file_name: raise InputException( @@ -922,7 +929,6 @@ def upload(self, vault_name, file_name, description, region, try: f = open(file_name, 'rb') - mmapped_file = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) total_size = os.path.getsize(file_name) except IOError as e: raise InputException( @@ -930,9 +936,12 @@ def upload(self, vault_name, file_name, description, region, cause=e, code='FileError') + self.logger.debug('Successfully opened %s for reading.'% file_name) + elif select.select([sys.stdin,],[],[],0.0)[0]: reader = sys.stdin total_size = 0 + self.logger.debug('Connected to stdin for reading data to upload.') else: raise InputException( "There is nothing to upload.", @@ -950,11 +959,48 @@ def upload(self, vault_name, file_name, description, region, # value to stay within the self.MAX_PARTS (10,000) block limit). part_size = self._check_part_size(part_size, total_size) part_size_in_bytes = part_size * 1024 * 1024 + self.logger.debug('Using a part size of %s MB for upload.'% part_size) + + # If the key resume is True, we have to check whether we can find this + # file in the SimpleDB database. So search for a match on the file + # name, check for exact match file name and size, and whether there + # is an uploadid linked to it. Raise exceptions on the way in case + # of mismatches. + if resume: + items = self.search(vault=vault_name, + file_name=file_name, + uploads=True) + for item in items: + if item['filename'] == file_name: + if item.has_key('upload_id'): + if int(item['size']) == os.path.getsize(file_name): + + # We get it as unicode string which gives problems + # down the line (in writer.write_part). Converting + # to normal string solves this issue. + uploadid = str(item['upload_id']) + break + else: + raise InputException( + 'Can not resume the upload of %s.'% file_name, + code='FileError', + cause='File size mismatch. This file: %s, expected: %s.'% (item['size'], os.path.getsize(file_name))) + + else: + raise InputException( + 'Can not resume the upload of %s.'% file_name, + code='CommandError', + cause='No upload in progress for a file with this name.') + + self.logger.debug('Found uploadid for resume request; attempting to resume this upload.') # If we have an UploadId, check whether it is linked to a current # job. If so, check whether uploaded data matches the input data and # try to resume uploading. upload = None + + # If we have an upload id, try to find a matching active session, + # if any. if uploadid: uploads = self.listmultiparts(vault_name) for upload in uploads: @@ -964,21 +1010,46 @@ def upload(self, vault_name, file_name, description, region, part_size_in_bytes = upload['PartSizeInBytes'] break else: + if resume: + item = self.sdb_domain.get_item(uploadid) + self.sdb_domain.delete_item(item) + raise InputException( + 'Can not resume upload of this data as the original job has expired.', + code='CommandError') + raise InputException( 'Can not resume upload of this data as no existing job with this uploadid could be found.', - code='IdError') + code='CommandError') # Initialise the writer task. - writer = GlacierWriter(self.glacierconn, vault_name, description=description, - part_size_in_bytes=part_size_in_bytes, uploadid=uploadid, logger=self.logger) + writer = GlacierWriter(self.glacierconn, vault_name, + description=description, + part_size_in_bytes=part_size_in_bytes, + uploadid=uploadid, logger=self.logger) + + # The parts_map contains a list marking parts that have been uploaded + # successfully with their list SHA chunk hashes, and those that still + # need uploading as None. This list is later used to determine which + # parts need uploading, and to calculate the final tree hash. + # Stdin jobs are purely sequential and we don't have the size so it's + # not applicable in that case. + if total_size: + parts_map = [None for i in range(int(math.ceil(float(total_size)/part_size_in_bytes)))] + else: + parts_map = None + # We have an existing upload job; try to resume this. if upload: marker = None + start = stop = uploaded_size = 0 while True: # Fetch a list of already uploaded parts and their SHA hashes. try: - response = self.glacierconn.list_parts(vault_name, uploadid, marker=marker) + response = self.glacierconn.list_parts( + vault_name, + uploadid, + marker=marker) except boto.glacier.exceptions.UnexpectedHTTPResponseError as e: raise ResponseException( 'Failed to get a list already uploaded parts for interrupted upload %s.'% uploadid, @@ -986,7 +1057,7 @@ def upload(self, vault_name, file_name, description, region, code=e.code) list_parts_response = response.copy() - current_position = 0 + response.read() # Process the parts list. # For each part of data, take the matching data range from @@ -996,32 +1067,41 @@ def upload(self, vault_name, file_name, description, region, # function to handle non-sequential parts. for part in list_parts_response['Parts']: start, stop = (int(p) for p in part['RangeInBytes'].split('-')) - print 'start: %s, current position: %s'% (start, current_position) - if not start == current_position: - if stdin: - raise InputException( - 'Cannot verify non-sequential upload data from stdin.', - code='ResumeError') - if reader: - reader.seek(start) - - if mmapped_file and stop > len(mmapped_file): + if not start == uploaded_size and stdin: raise InputException( - 'File does not match uploaded data; please check your uploadid and try again.', - cause='File is smaller than uploaded data.', + 'Cannot verify non-sequential upload data from stdin.', code='ResumeError') - - # Try to read the chunk of data, and take the hash if we - # have received anything. - # If no data or hash mismatch, stop checking raise an + + # Try to read the chunk of data, take the hash if we have + # received anything, and compare this to the hash received + # from Glacier. + # If no data or hash mismatch, stop checking and raise an # exception. data = None - data = reader.read(stop-start) if reader else mmapped_file[start:stop] + if stdin: + data = reader.read(stop-start+1) + else: + if stop > total_size: + raise InputException( + 'File does not match uploaded data; please check your uploadid and try again.', + cause='File is smaller than uploaded data.', + code='ResumeError') + + data = mmap.mmap(f.fileno(), + length=stop-start+1, + offset=start, + access=mmap.ACCESS_READ) + if data: - data_hash = glaciercorecalls.tree_hash(glaciercorecalls.chunk_hashes(data)) + data_hash = glaciercorecalls.tree_hash( + glaciercorecalls.chunk_hashes(data) + ) if glaciercorecalls.bytes_to_hex(data_hash) == part['SHA256TreeHash']: self.logger.debug('Part %s hash matches.'% part['RangeInBytes']) writer.tree_hashes.append(data_hash) + if parts_map: + parts_map[start/part_size_in_bytes] = data_hash + else: raise InputException( 'Received data does not match uploaded data; please check your uploadid and try again.', @@ -1033,13 +1113,13 @@ def upload(self, vault_name, file_name, description, region, 'Received data does not match uploaded data; please check your uploadid and try again.', cause='No or not enough data to match.', code='ResumeError') - - current_position += stop - start + + uploaded_size += stop - start + 1 # If a marker is present, this means there are more pages # of parts available. If no marker, we have the last page. marker = list_parts_response['Marker'] - writer.uploaded_size = stop + writer.uploaded_size = uploaded_size if not marker: break @@ -1067,54 +1147,158 @@ def upload(self, vault_name, file_name, description, region, % (self._size_fmt(writer.uploaded_size)) self._progress(msg) - - # Read file in parts so we don't fill the whole memory. + start_time = current_time = previous_time = time.time() start_bytes = writer.uploaded_size - while True: - if reader: - part = reader.read(part_size_in_bytes) - else: - if len(mmapped_file) > writer.uploaded_size+part_size_in_bytes: - part = mmapped_file[writer.uploaded_size:writer.uploaded_size+part_size_in_bytes] - else: - part = mmapped_file[writer.uploaded_size:] - if not part: - break - - writer.write(part) - current_time = time.time() - overall_rate = int((writer.uploaded_size-start_bytes)/(current_time - start_time)) - if total_size > 0: + # Store the upload session in the bookkeeping database for future + # resumption. + if self.bookkeeping: + self.logger.info('Writing in-progress upload information into the bookkeeping database.') - # Calculate transfer rates in bytes per second. - current_rate = int(part_size_in_bytes/(current_time - previous_time)) + # Use the alternative name as given by --name if we have it. + file_name = alternative_name if alternative_name else file_name - # Estimate finish time, based on overall transfer rate. - if overall_rate > 0: - time_left = (total_size - writer.uploaded_size)/overall_rate - eta = time.strftime("%H:%M:%S", time.localtime(current_time + time_left)) - else: - time_left = "Unknown" - eta = "Unknown" + # If still no name this is an stdin job, so set name accordingly. + file_name = file_name if file_name else 'Data from stdin.' - msg = 'Wrote %s of %s (%s%%). Rate %s/s, average %s/s, eta %s.' \ - % (self._size_fmt(writer.uploaded_size), - self._size_fmt(total_size), - self._bold(str(int(100 * writer.uploaded_size/total_size))), - self._size_fmt(current_rate, 2), - self._size_fmt(overall_rate, 2), - eta) + # Set all the info we have for this upload, and store it in the + # bookkeeping db. + file_attrs = { + 'region': region, + 'vault': vault_name, + 'filename': file_name, + 'size': total_size, + 'upload_id': writer.uploadid, + 'location': None, + 'description': description, + 'date':'%s' % datetime.utcnow().replace(tzinfo=pytz.utc), + 'hash': None + } + self.sdb_domain.put_attributes(writer.uploadid, file_attrs) + + # As upload from file allows for multiple sessions to run in parallel, + # while stdin jobs are sequential, the upload code for the two types + # is split up. + # First up: upload from stdin. Read the file from stdin one part at a + # time, store this part in memory, and write it out to Glacier. + if reader: + while True: + part = reader.read(part_size_in_bytes) + if not part: + break + + writer.write(part) - else: + # Log the progress. + current_time = time.time() + overall_rate = int((writer.uploaded_size-start_bytes)/(current_time - start_time)) msg = 'Wrote %s. Rate %s/s.' \ % (self._size_fmt(writer.uploaded_size), self._size_fmt(overall_rate, 2)) + self._progress(msg) + previous_time = current_time + self.logger.debug(msg) + + # Second method: upload from file. + # Iterate over the parts_map, uploading all the parts where the + # parts_map[parts_nr] is False. + # The byte ranges of the respective parts are put in a work queue, + # the worker processes will one by one upload these parts in parallel. + # This is not guaranteed to happen in order. + else: + try: + sessions = int(sessions) + if sessions < 1: + raise ValueError + + except ValueError: + raise InputException( + 'Number of sessions must be a postive integer, larger than 0.', + code='CommandError', + cause='Invalid number of sessions: %s.'% sessions) + + f.close() + q = multiprocessing.JoinableQueue() + parent_conn, child_conn = multiprocessing.Pipe() + procs = [] + uploaded_size = start_bytes + + # Put items in the queue; must do this before starting the + # processes as otherwise they will quit instantly for not having + # anything in the queue to work on. + for part_nr in range(len(parts_map)): + if parts_map[part_nr]: + continue + + start = part_nr * part_size_in_bytes + stop = (start + part_size_in_bytes) if (start + part_size_in_bytes) < total_size else total_size + q.put((start, stop, part_nr)) + + + # Create the upload processes. + for i in range(sessions): + p = multiprocessing.Process( + target=glaciercorecalls.upload_part_process, + args=(q, child_conn, self.aws_access_key, + self.aws_secret_key, self.region, file_name, + vault_name, description, part_size_in_bytes, + writer.uploadid, self.logger)) + p.start() + procs.append(p) + + # wait for all processes to finish: when a process is finished + # (or crashed) it's not alive any more. + while True: + while len([p for p in procs if p.is_alive()]): + time.sleep(1) + update = False + while parent_conn.poll(): + part_tree_hash, part_nr, size = parent_conn.recv() + parts_map[part_nr] = part_tree_hash + uploaded_size += size + update = True + + if update: + + # Calculate transfer rates in bytes per second. + current_time = time.time() + overall_rate = int((uploaded_size-start_bytes)/(current_time - start_time)) + + # Estimate finish time, based on overall transfer rate. + if overall_rate > 0: + time_left = (total_size - uploaded_size)/overall_rate + eta = time.strftime("%H:%M:%S", time.localtime(current_time + time_left)) + else: + time_left = "Unknown" + eta = "Unknown" + + msg = 'Wrote %s of %s (%s%%). Average rate %s/s, eta %s.' \ + % (self._size_fmt(uploaded_size), + self._size_fmt(total_size), + self._bold(str(int(100 * uploaded_size/total_size))), + self._size_fmt(overall_rate, 2), + eta) + self._progress(msg) + previous_time = current_time + self.logger.debug(msg) + + if q.empty(): + break - self._progress(msg) - previous_time = current_time - self.logger.debug(msg) + # All processes crash; create new process to finish up the + # work. + p = multiprocessing.Process( + target=glaciercorecalls.upload_part_process, + args=(q, child_conn, self.aws_access_key, + self.aws_secret_key, self.region, file_name, + vault_name, description, part_size_in_bytes, + writer.uploadid, self.logger)) + p.start() + procs.append(p) + + writer.tree_hashes = parts_map + writer.uploaded_size = uploaded_size writer.close() current_time = time.time() @@ -1148,15 +1332,56 @@ def upload(self, vault_name, file_name, description, region, 'size': writer.uploaded_size } -## if file_name: -## file_attrs['filename'] = file_name -## elif stdin: -## file_attrs['filename'] = 'data from stdin' - self.sdb_domain.put_attributes(file_attrs['filename'], file_attrs) + item = self.sdb_domain.get_item(writer.uploadid) + if item: + self.sdb_domain.delete_item(item) return (archive_id, sha256hash) + @sdb_connect + def updatedb(self): + """ + Updates the SimpleDB to use the archive id as item name instead + of the file name. + """ + query = 'select * from `%s`'% self.bookkeeping_domain_name + items = self.sdb_domain.select(query) + old_items = new_items = {} + print 'Reading items from the database...' + for item in items: + try: + item_key = item['archive_id'] if item.has_key('archive_id') else item['upload_id'] + except KeyError: + print '''Deleting item. Doesn't seem to be from glacier-cmd.''' + self.sdb_domain.delete_item(item) + continue + + self.sdb_domain.delete_item(item) + new_item = {} + for key in item.keys(): + new_item[key] = item[key] + + new_items[item_key] = new_item + print 'Read %s items.\r'% len(new_items), + sys.stdout.flush() + + data = {} + total_items = 0 + print '\n' + for key in new_items.keys(): + + data[key] = new_items[key] + if len(data) == 25: + total_items += 25 + self.sdb_domain.batch_put_attributes(data) + data = {} + print 'Updated %s items.\r'% total_items, + sys.stdout.flush() + + if data: + self.sdb_domain.batch_put_attributes(data) + print 'Updated %s items.'% (total_items + len(data)) @glacier_connect @log_class_call("Processing archive retrieval job.", @@ -1223,7 +1448,7 @@ def getarchive(self, vault_name, archive_id): @log_class_call("Download an archive.", "Download archive done.") def download(self, vault_name, archive_id, part_size, - out_file_name=None, overwrite=False): + resume=False, out_file_name=None, overwrite=False): """ Download a file from Glacier, and store it in out_file. If no out_file is given, the file will be dumped on stdout. @@ -1248,31 +1473,145 @@ def download(self, vault_name, archive_id, part_size, else: raise InputException( - "Requested archive not available. Please make sure \ -your archive ID is correct, and start a retrieval job using \ -'getarchive' if necessary.", + '''Requested archive not available. Please make sure the archive ID +is correct, and start a retrieval job using 'getarchive' if necessary.''', code='IdError') - # Check whether we can access the file the archive has to be written to. + # Check whether we can access the file the archive has to be + # written to, and if we have an existing file whether to resume + # writing to it. + # If resumption is requested, try to compare the local data + # to the remote archive data, and if it compares continue the + # download where we were. out_file = None - if out_file_name: + hash_list = [] + downloaded_size = 0 + if resume: + if not out_file_name: + raise InputException( + 'Must provide outfile with existing data to resume download.', + code='CommandError') + + if not os.path.isfile(out_file_name): + raise InputException( + "Cannot access the ouput file for resumption of downloading: %s."% out_file_name, + cause='File not found.', + code='FileError') + + out_file_size = os.path.getsize(out_file_name) + if out_file_size == job['ArchiveSizeInBytes']: + + # It appears the archive has been downloaded completely + # already. Double check this. + file_hash = self.get_tree_hash(out_file_name) + if file_hash == job['SHA256TreeHash']: + raise InputException ( + 'Download of archive %s to local file %s is completed already.'% (archive_id, out_file_name), + code='ResumeError') + + raise InputException( + 'Archive data does not match local data.', + cause='SHA256 tree hash mismatch.', + code='ResumeError') + + if out_file_size > 0: + self.logger.debug('Attempting to resume download of this archive to file %s.'% out_file_name) + self._progress('Comparing data with Glacier for download resumption.') + + # Get tree hash and hash list of the partially downloaded data. + # Use mmap to reduce memory overhead while handling the local + # data. + try: + f = open(out_file_name, 'rb') + mmapped_out_file = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) + except IOError as e: + raise InputException( + "Could not read the output file %s for hash checking."% file_name, + cause=e, + code='FileError') + + # Ask Amazon for a hash on this data by opening a read + # connection; the hash of the data is in the response. + # Note: this must be done by normal block size; + # so 1,2,4,8 etc. MB of data at a time. Calculate + # biggest block that fits in the data, and work down from + # there until all data checked. + MB = 1024*1024 + check_part_size = 4096 * MB + checked_size = 0 + while checked_size < out_file_size: + + # Make sure our check_part_size is smaller than the + # amount of data left to check. + while check_part_size > (out_file_size - checked_size): + check_part_size = check_part_size / 2 + + # Get chunk hashes and the tree hash of the part that + # has to be checked, and store these hashes in the + # hash_list of this download. + start_part = checked_size/MB + stop_part = start_part + check_part_size/MB + check_hash_list = [hashlib.sha256(part).digest() for part in iter([mmapped_out_file[i*MB:(i+1)*MB] for i in range(start_part, stop_part)])] + hash_list += check_hash_list + local_hash = glaciercorecalls.bytes_to_hex(glaciercorecalls.tree_hash(hash_list)) + + # Get the hash of the data stored in Glacier, and check + # whether it matches out local data. + response = self.glacierconn.get_job_output(vault_name, + download_job['JobId'], + byte_range=(checked_size, checked_size+check_part_size-1)) + if response['TreeHash'] != local_hash: + raise InputException( + 'Archive data does not match local data.', + cause='SHA256 tree hash mismatch.', + code='ResumeError') + + self.logger.debug('Tree-hash match on %s MB part, range %s-%s.'% (check_part_size/MB, checked_size, checked_size+check_part_size)) + checked_size += check_part_size + + self.logger.debug('Hash check OK; continuing download resumption.') + + # Try to open the out_file in write mode, for appending data. + try: + out_file = open(out_file_name, 'ab') + except IOError as e: + raise InputException( + "Cannot access the ouput file for writing: %s."% out_file_name, + cause=e, + code='FileError') + + downloaded_size = os.path.getsize(out_file_name) + self.logger.debug('All checks passed; resuming download of data.') + self._progress('Resuming download now.') + + else: + out_file = open(out_file_name, 'wb') + + # If we have a file name, check whether it exists already, whether + # we may overwrite it, and finally whether we can actually write + # to this file. + elif out_file_name: if os.path.isfile(out_file_name) and not overwrite: raise InputException( - "File exists already, aborting. Use the overwrite flag to overwrite existing file.", + '''\ +File %s exists already, aborting. +Use the --resume flag to resume downloading to this file, \ +or the --overwrite flag to overwrite the existing file.'''% out_file_name, code="FileError") try: - out_file = open(out_file_name, 'w') + out_file = open(out_file_name, 'wb') except IOError as e: raise InputException( - "Cannot access the ouput file.", + "Cannot access the ouput file for writing: %s."% out_file_name, cause=e, code='FileError') + self.logger.debug('Downloading archive, writing data to file %s.'% out_file_name) # Sanity checking done; start downloading the file, part by part. total_size = download_job['ArchiveSizeInBytes'] - part_size_in_bytes = self._check_part_size(part_size, total_size) * 1024 * 1024 - start_bytes = downloaded_size = 0 - hash_list = [] + part_size_in_bytes = self._check_part_size(part_size, total_size) * MB + self.logger.debug('Using part size of %s bytes.'% part_size_in_bytes) + start_bytes = downloaded_size start_time = current_time = previous_time = time.time() # Log our pending action. @@ -1281,7 +1620,7 @@ def download(self, vault_name, archive_id, part_size, else: self.logger.debug('Starting download of archive to stdout.') - # Download the data, one part at a time. + # Start the actual download, one part at a time. while downloaded_size < total_size: # Read a part of data. @@ -1289,8 +1628,8 @@ def download(self, vault_name, archive_id, part_size, to_bytes = min(downloaded_size + part_size_in_bytes, total_size) try: response = self.glacierconn.get_job_output(vault_name, - download_job['JobId'], - byte_range=(from_bytes, to_bytes-1)) + download_job['JobId'], + byte_range=(from_bytes, to_bytes-1)) data = response.read() except boto.glacier.exceptions.UnexpectedHTTPResponseError as e: raise ResponseException( @@ -1298,18 +1637,27 @@ def download(self, vault_name, archive_id, part_size, cause=self._decode_error_message(e.body), code=e.code) - hash_list.append(glaciercorecalls.chunk_hashes(data)) + # Get the chunk hashes for this part, add them to + # the hash_list, and compare the tree_hash of the + # downloaded data to what we expect to receive. + chunk_hashes = glaciercorecalls.chunk_hashes(data) + hash_list += chunk_hashes + if glaciercorecalls.tree_hash(chunk_hashes) != response['TreeHash']: + raise CommunicationException( + 'Hash check of downloaded data failed, aborting download of archive.', + code='DownloadError') + downloaded_size = to_bytes if out_file: try: - out_file.write(response.read()) + out_file.write(data) except IOError as e: raise InputException( "Cannot write data to the specified file.", cause=e, code='FileError') else: - sys.stdout.write(response.read()) + sys.stdout.write(data) sys.stdout.flush() # Calculate progress statistics. @@ -1352,7 +1700,7 @@ def download(self, vault_name, archive_id, part_size, @sdb_connect @log_class_call("Searching for archive.", "Search done.") - def search(self, vault=None, region=None, file_name=None, search_term=None): + def search(self, vault=None, region=None, file_name=None, search_term=None, uploads=False): """ Searches for archives using SimpleDB @@ -1387,17 +1735,6 @@ def search(self, vault=None, region=None, file_name=None, search_term=None): if region: self._check_region(region) -## if file_name and ('"' in file_name or "'" in file_name): -## raise InputException( -## 'Quotes like \' and \" are not allowed in search terms.', -## cause='Invalid search term %s: contains quotes.'% file_name) -## -## -## if search_term and ('"' in search_term or "'" in search_term): -## raise InputException( -## 'Quotes like \' and \" are not allowed in search terms.', -## cause='Invalid search term %s: contains quotes.'% search_term) - self.logger.debug('Search terms: vault %s, region %s, file name %s, search term %s'% (vault, region, file_name, search_term)) search_params = [] @@ -1427,8 +1764,8 @@ def search(self, vault=None, region=None, file_name=None, search_term=None): # an archive_id attribute). try: for item in result: - self.logger.debug('Next search result:\n%s'% item) - if item.has_key('archive_id'): + if item.has_key('upload_id' if uploads else 'archive_id'): + self.logger.debug('Next search result:\n%s'% item) items.append(item) except boto.exception.SDBResponseError as e: raise ResponseException( @@ -1646,6 +1983,10 @@ def get_tree_hash(self, file_name): "Could not access the file given: %s."% file_name, cause=e, code='FileError') + if os.path.getsize(file_name) == 0: + raise InputException( + "Can not caclulate the hash of an empty file: %s."% file_name, + code='FileError') hashes = [hashlib.sha256(part).digest() for part in iter((lambda:reader.read(1024*1024)), '')] return glaciercorecalls.bytes_to_hex(glaciercorecalls.tree_hash(hashes)) @@ -1685,6 +2026,11 @@ def __init__(self, aws_access_key, aws_secret_key, region, self._check_region(region) + + global counter + counter = 0 + + self.logger.debug("""\ Creating GlacierWrapper instance with aws_access_key=%s, diff --git a/glacier/glacier.py b/glacier/glacier.py index 6540435..5341690 100755 --- a/glacier/glacier.py +++ b/glacier/glacier.py @@ -254,7 +254,7 @@ def download(args): Download an archive. """ glacier = default_glacier_wrapper(args) - response = glacier.download(args.vault, args.archive, args.partsize, + response = glacier.download(args.vault, args.archive, args.partsize, resume=args.resume, out_file_name=args.outfile, overwrite=args.overwrite) if args.outfile: output_msg(response, args.output, success=True) @@ -296,8 +296,10 @@ def upload(args): globbed = glob.glob(f) if globbed: for g in globbed: - response = glacier.upload(args.vault, g, args.description, args.region, args.stdin, - args.name, args.partsize, args.uploadid, args.resume) + response = glacier.upload( + args.vault, g, args.description, args.region, + args.stdin, args.name, args.partsize, args.uploadid, + args.resume, args.sessions) results.append({"Uploaded file": g, "Created archive with ID": response[0], "Archive SHA256 tree hash": response[1]}) @@ -309,8 +311,9 @@ def upload(args): elif args.stdin: # No file name; using stdin. - response = glacier.upload(args.vault, None, args.description, args.region, args.stdin, - args.name, args.partsize, args.uploadid, args.resume) + response = glacier.upload( + args.vault, None, args.description, args.region, args.stdin, + args.name, args.partsize, args.uploadid, args.resume, args.sessions) results = [{"Created archive with ID": response[0], "Archive SHA256 tree hash": response[1]}] @@ -419,6 +422,49 @@ def treehash(args): output_table(hash_results, args.output) +@handle_errors +def updatedb(args): + glacier = default_glacier_wrapper(args) + glacier.updatedb() + +@handle_errors +def backupdb(args): + """ + Create a copy of the current bookkeeping db, and put it on Glacier. + """ + + # If args.outfile: save to that file. --outfile + + # if args.zip: compresse data before saving to file. --compress + + # if args.stdout: dump to stdout (json code, never compressed). --stdout + + # if no special requests: + # check for vault 'glacier-cmd_bookkeeping', create if necessary. + + # compress data to zip file; upload this file to glacier with + # description glacier-cmd_bookkeeping_yyyy_mm_dd_hh_ss + + +@handle_errors +def restoredb(args): + """ + Restore database from glacier. + """ + + # If args.infile: use it. --infile + + # If args.zip: infile is zipped, otherwise plain json. --zip + # can we check for this? Try to unzip, see what happens? + + # If nothing given, restore from Glacier: + # Check whether we have a vault glacier-cmd_bookkeeping. + # Check inventory of vault glacier-cmd_bookkeeping; + # notify user of progress. + # Check which is latest backup archive; retrieve it; notify + # user of progress. + # When available, download it and return the data into the database. + def main(): program_description = u""" Command line interface for Amazon Glacier @@ -604,13 +650,16 @@ def main(): help='''\ Attempt to resume an interrupted multi-part upload. Does not work in combination with --stdin, and -requires bookkeeping to be enabled. -(not implemented yet)''') +requires bookkeeping to be enabled.''') parser_upload.add_argument('--bacula', action='store_true', help='''\ The (single!) file name will be parsed using Bacula's style of providing multiple names on the command line. E.g.: /path/to/backup/vol001|vol002|vol003''') + parser_upload.add_argument('--sessions', default=1, + help='''\ +The number of parallel upload sessions to use when uploading +data to Amazon Glacier.''') parser_upload.set_defaults(func=upload) # glacier-cmd listmultiparts @@ -692,6 +741,11 @@ def main(): If not given, the smallest possible part size will be used depending on the size of the job at hand.''') + parser_download.add_argument('--resume', action='store_true', + help='''\ +Attempt to resume an interrupted download. You must provide --outfile + that contains already downloaded data if using this option.''') + parser_download.set_defaults(func=download) # glacier-cmd rmarchive @@ -738,7 +792,13 @@ def main(): help='The filename to calculate the treehash of.') parser_describejob.set_defaults(func=treehash) - # TODO args.logtostdout becomes false when parsing the remaining_argv + # glacier-cmd updatedb + parser_describejob = subparsers.add_parser('updatedb', + help='Update the db to match change in item key. You need to run \ + this once if you have bookkeeping data from before mid Oct 2012.') + parser_describejob.set_defaults(func=updatedb) + + # FIXME args.logtostdout becomes false when parsing the remaining_argv # so here we bridge this. An ugly hack but it works. logtostdout = args.logtostdout diff --git a/glacier/glaciercorecalls.py b/glacier/glaciercorecalls.py index 21369f1..badbf13 100644 --- a/glacier/glaciercorecalls.py +++ b/glacier/glaciercorecalls.py @@ -21,6 +21,8 @@ import json import sys import time +import mmap +import Queue import boto.glacier.layer1 @@ -67,6 +69,58 @@ def tree_hash(fo): def bytes_to_hex(str): return ''.join( [ "%02x" % ord( x ) for x in str] ).strip() +def upload_part_process(q, conn, aws_access_key, aws_secret_key, region, + file_name, vault_name, description, + part_size_in_bytes, uploadid, logger): + """ + Starts the upload process of a chunk of data. + """ + f = open(file_name, 'rb') + try: + logger.debug("""\ +Connecting to Amazon Glacier for worker process with + aws_access_key %s + aws_secret_key %s + region %s""", + aws_access_key, + aws_secret_key, + region) + glacierconn = GlacierConnection(aws_access_key, + aws_secret_key, + region_name=region) + except boto.exception.AWSConnectionError as e: + raise ConnectionException( + "Cannot connect to Amazon Glacier.", + cause=e.cause, + code="GlacierConnectionError") + writer = GlacierWriter(glacierconn, vault_name, description=description, + part_size_in_bytes=part_size_in_bytes, + uploadid=uploadid, logger=logger) + while True: + try: + item = q.get(False) + except Queue.Empty: + break + + start, stop, part_nr = item + part = mmap.mmap(fileno=f.fileno(), + length=stop-start, + offset=start, + access=mmap.ACCESS_READ) + logger.debug('Got to work on range %s-%s'% (start, stop)) + try: + writer.write(part, start=start) + conn.send( (writer.part_tree_hash, + part_nr, + stop-start) ) + q.task_done() + except: + q.put(item) + conn.close() + raise + + conn.close() + class GlacierWriter(object): """ Presents a file-like object for writing to a Amazon Glacier @@ -81,7 +135,7 @@ def __init__(self, connection, vault_name, self.part_size = part_size_in_bytes self.vault_name = vault_name self.connection = connection -## self.location = None + self.location = None self.logger = logger if uploadid: @@ -91,14 +145,13 @@ def __init__(self, connection, vault_name, self.part_size, description) self.uploadid = response['UploadId'] + response.read() self.uploaded_size = 0 self.tree_hashes = [] self.closed = False -## self.upload_url = response.getheader("location") - def write(self, data): - + def write(self, data, start=None): if self.closed: raise CommunicationError( "Tried to write to a GlacierWriter that is already closed.", @@ -109,63 +162,55 @@ def write(self, data): 'Block of data provided must be equal to or smaller than the set block size.', code='InternalError') - part_tree_hash = tree_hash(chunk_hashes(data)) - self.tree_hashes.append(part_tree_hash) - headers = { - "x-amz-glacier-version": "2012-06-01", - "Content-Range": "bytes %d-%d/*" % (self.uploaded_size, - (self.uploaded_size+len(data))-1), - "Content-Length": str(len(data)), - "Content-Type": "application/octet-stream", - "x-amz-sha256-tree-hash": bytes_to_hex(part_tree_hash), - "x-amz-content-sha256": hashlib.sha256(data).hexdigest() - } - - self.connection.upload_part(self.vault_name, - self.uploadid, - hashlib.sha256(data).hexdigest(), - bytes_to_hex(part_tree_hash), - (self.uploaded_size, self.uploaded_size+len(data)-1), - data) - -## retries = 0 -## while True: -## response = self.connection.make_request( -## "PUT", -## self.upload_url, -## headers, -## data) -## -## # Success. -## if response.status == 204: -## break -## -## # Time-out recieved: sleep for 5 minutes and try again. -## # Do not try more than five times; after that it's over. -## elif response.status == 408: -## if retries >= 5: -## resp = json.loads(response.read()) -## raise ResonseException( -## resp['message'], -## cause='Timeout', -## code=resp['code']) -## -## if self.logger: -## logger.warning(resp['message']) -## logger.warning('sleeping 300 seconds (5 minutes) before retrying.') -## -## retries += 1 -## time.sleep(300) -## -## else: -## raise ResponseException( -## "Multipart upload part expected response status 204 (got %s):\n%s"\ -## % (response.status, response.read()), -## cause=resp['message'], -## code=resp['code']) - -## response.read() + self.part_tree_hash = tree_hash(chunk_hashes(data)) + self.tree_hashes.append(self.part_tree_hash) + start = start if start else self.uploaded_size + stop = start+len(data)-1 + if self.logger: + self.logger.debug('Starting upload of part %s-%s.'% (start, stop)) + + # Catch time-outs: if time-out received, wait a bit and + # try again. + # Uses exponential wait: 2 sec, 8 sec, 32 sec, 128 sec, 256 sec. + # If still failure after five times retrying, give up and raise + # an exception. + retries = 0 + delay = 2 + while True: + try: + response = self.connection.upload_part(self.vault_name, + self.uploadid, + hashlib.sha256(data).hexdigest(), + bytes_to_hex(self.part_tree_hash), + (start, stop), + data) + break + + except boto.glacier.exceptions.UnexpectedHTTPResponseError as e: + if e.code != 408: + raise ResponseException( + "Error while uploading data to Amazon Glacier.", + cause=e, + code=e.code) + + if retries >= 5: + raise ResonseException( + "Timeout while uploading data to Amazon Glacier. Retried five times; giving up.", + cause=e, + code=e.code) + + if self.logger: + logger.warning(e.message) + logger.warning('sleeping %s seconds before retrying.'% delay) + + time.sleep(delay) + retries += 1 + delay = delay * 4 + + response.read() self.uploaded_size += len(data) + if self.logger: + self.logger.debug('Finished uploading part %s-%s.'% (start, stop)) def close(self): @@ -173,14 +218,24 @@ def close(self): return # Complete the multiplart glacier upload - response = self.connection.complete_multipart_upload(self.vault_name, - self.uploadid, - bytes_to_hex(tree_hash(self.tree_hashes)), - self.uploaded_size) + try: + response = self.connection.complete_multipart_upload(self.vault_name, + self.uploadid, + bytes_to_hex(tree_hash(self.tree_hashes)), + self.uploaded_size) + except boto.glacier.exceptions.UnexpectedHTTPResponseError as e: + raise ResponseException( + "Error while closing a multipart upload to Amazon Glacier.", + cause=e, + code=e.code) + + response.read() self.archive_id = response['ArchiveId'] self.location = response['Location'] self.hash_sha256 = bytes_to_hex(tree_hash(self.tree_hashes)) self.closed = True + if self.logger: + self.logger.debug('Closed multipart upload.') def get_archive_id(self): self.close()