diff --git a/.gitignore b/.gitignore index 1f4b930..66e02a1 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ dist MANIFEST bagit.egg-info .idea +.eggs +*.log diff --git a/bagit.py b/bagit.py index a821973..e8fbbbc 100755 --- a/bagit.py +++ b/bagit.py @@ -136,8 +136,13 @@ def find_locale_dir(): UNICODE_BYTE_ORDER_MARK = "\uFEFF" -def make_bag( - bag_dir, bag_info=None, processes=1, checksums=None, checksum=None, encoding="utf-8" +def make_bag(bag_dir, + bag_info=None, + processes=1, + checksums=None, + checksum=None, + encoding="utf-8", + follow_links=False, ): """ Convert a given directory into a bag. You can pass in arbitrary @@ -234,7 +239,11 @@ def make_bag( os.chmod("data", os.stat(cwd).st_mode) total_bytes, total_files = make_manifests( - "data", processes, algorithms=checksums, encoding=encoding + "data", + processes, + algorithms=checksums, + encoding=encoding, + follow_links=follow_links ) LOGGER.info(_("Creating bagit.txt")) @@ -266,7 +275,7 @@ def make_bag( finally: os.chdir(old_dir) - return Bag(bag_dir) + return Bag(bag_dir, follow_links=follow_links) class Bag(object): @@ -275,7 +284,7 @@ class Bag(object): valid_files = ["bagit.txt", "fetch.txt"] valid_directories = ["data"] - def __init__(self, path=None): + def __init__(self, path=None, follow_links=False): super(Bag, self).__init__() self.tags = {} self.info = {} @@ -297,6 +306,7 @@ def __init__(self, path=None): self.algorithms = [] self.tag_file_name = None + self.follow_links = follow_links self.path = abspath(path) if path: # if path ends in a path separator, strip it off @@ -428,7 +438,8 @@ def payload_files(self): """Returns a list of filenames which are present on the local filesystem""" payload_dir = os.path.join(self.path, "data") - for dirpath, _, filenames in os.walk(payload_dir): + for dirpath, _, filenames in os.walk(payload_dir, + followlinks=self.follow_links): for f in filenames: # Jump through some hoops here to make the payload files are # returned with the directory structure relative to the base @@ -507,7 +518,11 @@ def save(self, processes=1, manifests=False): # Generate new manifest files if manifests: total_bytes, total_files = make_manifests( - "data", processes, algorithms=self.algorithms, encoding=self.encoding + "data", + processes, + algorithms=self.algorithms, + encoding=self.encoding, + follow_links=self.follow_links ) # Update Payload-Oxum @@ -921,8 +936,6 @@ def _validate_bagittxt(self): def _path_is_dangerous(self, path): """ Return true if path looks dangerous, i.e. potentially operates - outside the bagging directory structure, e.g. ~/.bashrc, ../../../secrets.json, - \\?\c:\, D:\sys32\cmd.exe """ if os.path.isabs(path): return True @@ -930,12 +943,22 @@ def _path_is_dangerous(self, path): return True if os.path.expandvars(path) != path: return True - real_path = os.path.realpath(os.path.join(self.path, path)) - real_path = os.path.normpath(real_path) - bag_path = os.path.realpath(self.path) - bag_path = os.path.normpath(bag_path) - common = os.path.commonprefix((bag_path, real_path)) - return not (common == bag_path) + + # check for unsafe character sequences like + # ~/.bashrc, ../../../secrets.json, \\?\c:\, D:\sys32\cmd.exe + norm_path = os.path.normpath(os.path.join(self.path, path)) + norm_bag_path = os.path.normpath(self.path) + if os.path.commonprefix((norm_path, norm_bag_path)) != norm_bag_path: + return True + + # check for symbolic or hard links + real_path = os.path.realpath(norm_path) + real_bag_path = os.path.realpath(norm_bag_path) + if os.path.commonprefix((real_path, real_bag_path)) != real_bag_path \ + and not self.follow_links: + return True + + return False class BagError(Exception): @@ -1232,7 +1255,8 @@ def _make_tag_file(bag_info_path, bag_info): f.write("%s: %s\n" % (h, txt)) -def make_manifests(data_dir, processes, algorithms=DEFAULT_CHECKSUMS, encoding="utf-8"): +def make_manifests(data_dir, processes, algorithms=DEFAULT_CHECKSUMS, + encoding="utf-8", follow_links=False): LOGGER.info( _("Using %(process_count)d processes to generate manifests: %(algorithms)s"), {"process_count": processes, "algorithms": ", ".join(algorithms)}, @@ -1242,11 +1266,13 @@ def make_manifests(data_dir, processes, algorithms=DEFAULT_CHECKSUMS, encoding=" if processes > 1: pool = multiprocessing.Pool(processes=processes) - checksums = pool.map(manifest_line_generator, _walk(data_dir)) + checksums = pool.map(manifest_line_generator, _walk(data_dir, + follow_links=follow_links)) pool.close() pool.join() else: - checksums = [manifest_line_generator(i) for i in _walk(data_dir)] + checksums = [manifest_line_generator(i) for i in _walk(data_dir, + follow_links=follow_links)] # At this point we have a list of tuples which start with the algorithm name: manifest_data = {} @@ -1309,12 +1335,12 @@ def _make_tagmanifest_file(alg, bag_dir, encoding="utf-8"): tagmanifest.write("%s %s\n" % (digest, filename)) -def _find_tag_files(bag_dir): +def _find_tag_files(bag_dir, follow_links=False): for dir in os.listdir(bag_dir): if dir != "data": if os.path.isfile(dir) and not dir.startswith("tagmanifest-"): yield dir - for dir_name, _, filenames in os.walk(dir): + for dir_name, _, filenames in os.walk(dir, followlinks=follow_links): for filename in filenames: if filename.startswith("tagmanifest-"): continue @@ -1323,8 +1349,8 @@ def _find_tag_files(bag_dir): yield os.path.relpath(p, bag_dir) -def _walk(data_dir): - for dirpath, dirnames, filenames in os.walk(data_dir): +def _walk(data_dir, follow_links=False): + for dirpath, dirnames, filenames in os.walk(data_dir, followlinks=follow_links): # if we don't sort here the order of entries is non-deterministic # which makes it hard to test the fixity of tagmanifest-md5.txt filenames.sort() @@ -1338,7 +1364,7 @@ def _walk(data_dir): yield path -def _can_bag(test_dir): +def _can_bag(test_dir, follow_links=False): """Scan the provided directory for files which cannot be bagged due to insufficient permissions""" unbaggable = [] @@ -1350,7 +1376,8 @@ def _can_bag(test_dir): if not os.access(test_dir, os.W_OK): unbaggable.append(test_dir) - for dirpath, dirnames, filenames in os.walk(test_dir): + for dirpath, dirnames, filenames in os.walk(test_dir, + followlinks=follow_links): for directory in dirnames: full_path = os.path.join(dirpath, directory) if not os.access(full_path, os.W_OK): @@ -1359,7 +1386,7 @@ def _can_bag(test_dir): return unbaggable -def _can_read(test_dir): +def _can_read(test_dir, follow_links=False): """ returns ((unreadable_dirs), (unreadable_files)) """ @@ -1369,7 +1396,8 @@ def _can_read(test_dir): if not os.access(test_dir, os.R_OK): unreadable_dirs.append(test_dir) else: - for dirpath, dirnames, filenames in os.walk(test_dir): + for dirpath, dirnames, filenames in os.walk(test_dir, + followlinks=follow_links): for dn in dirnames: full_path = os.path.join(dirpath, dn) if not os.access(full_path, os.R_OK): @@ -1499,6 +1527,15 @@ def _make_parser(): " without performing checksum validation to detect corruption." ), ) + parser.add_argument( + "--follow-links", + action="store_true", + help=_( + "Allow bag payload directory to contain symbolic or hard links" + " on operating systems that support them." + ), + ) + checksum_args = parser.add_argument_group( _("Checksum Algorithms"), @@ -1508,7 +1545,6 @@ def _make_parser(): ) % ", ".join(DEFAULT_CHECKSUMS), ) - for i in CHECKSUM_ALGOS: alg_name = re.sub(r"^([A-Z]+)(\d+)$", r"\1-\2", i.upper()) checksum_args.add_argument( @@ -1571,12 +1607,12 @@ def main(): # validate the bag if args.validate: try: - bag = Bag(bag_dir) + bag = Bag(bag_dir, follow_links=args.follow_links) # validate throws a BagError or BagValidationError bag.validate( processes=args.processes, fast=args.fast, - completeness_only=args.completeness_only, + completeness_only=args.completeness_only ) if args.fast: LOGGER.info(_("%s valid according to Payload-Oxum"), bag_dir) @@ -1596,6 +1632,7 @@ def main(): bag_info=args.bag_info, processes=args.processes, checksums=args.checksums, + follow_links=args.follow_links ) except Exception as exc: LOGGER.error( diff --git a/test.py b/test.py index eab3d95..551b3af 100644 --- a/test.py +++ b/test.py @@ -1099,6 +1099,37 @@ def test_fetch_malformed_url(self): self.assertEqual(expected_msg, str(cm.exception)) + def test_bag_symlink_is_dangerous(self): + src = j(os.path.dirname(__file__), "README.rst") + dst = j(self.tmpdir, "README.rst") + os.symlink(src, dst) + self.assertRaisesRegexp( + bagit.BagError, + 'Path "data/README.rst" in manifest ".*?" is unsafe', + bagit.make_bag, + self.tmpdir + ) + + def test_bag_symlink_file(self): + src = j(os.path.dirname(__file__), "README.rst") + dst = j(self.tmpdir, "README.rst") + os.symlink(src, dst) + bag = bagit.make_bag(self.tmpdir, follow_links=True) + self.assertTrue(bag.validate()) + + def test_symlink_directory_ignored(self): + src = j(os.path.dirname(__file__), 'test-data', 'si') + dst = j(self.tmpdir, "si-again") + os.symlink(src, dst) + bag = bagit.make_bag(self.tmpdir) + self.assertEqual(len(bag.entries), 15) + + def test_symlink_directory_followed(self): + src = j(os.path.dirname(__file__), 'test-data', 'si') + dst = j(self.tmpdir, "si-again") + os.symlink(src, dst) + bag = bagit.make_bag(self.tmpdir, follow_links=True) + self.assertEqual(len(bag.entries), 17) class TestUtils(unittest.TestCase): def setUp(self):