Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,6 @@ testing = [
addopts = [
"--import-mode=importlib",
]
markers = [
"integration: marks tests as integration tests (require live OSDF infrastructure)",
]
101 changes: 78 additions & 23 deletions src/pelicanfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ def __init__(

# Overwrite the httpsfs _ls_real call with ours with ours
self.http_file_system._ls_real = self._ls_real
self.http_file_system._ls = self._ls_from_http

# Note this is a class method because it's overwriting a class method for the AbstractFileSystem
@classmethod
Expand Down Expand Up @@ -517,10 +518,15 @@ async def get_working_cache(self, fileloc: str) -> Tuple[str, DirectorResponse]:
logger.debug(f"Choosing a cache for {fileloc}...")
fparsed = urllib.parse.urlparse(fileloc)
# Removing the query if need be
cache_url, director_response = self._match_namespace(fparsed.path)
if cache_url:
logger.debug(f"Found previously working cache: {cache_url}")
return cache_url, director_response
try:
cache_url, director_response = self._match_namespace(fparsed.path)
if cache_url:
logger.debug(f"Found previously working cache: {cache_url}")
return cache_url, director_response
except NoAvailableSource:
# Namespace exists but cache list is empty (e.g., from get_dirlist_url caching)
# Fall through to discover caches
logger.debug("Namespace found but no caches available, discovering caches")

# Calculate the list of applicable caches; this takes into account the
# preferredCaches for the filesystem. If '+' is a preferred cache, we
Expand Down Expand Up @@ -639,29 +645,48 @@ async def get_dirlist_url(self, fileloc: str) -> Tuple[str, DirectorResponse]:
Returns a tuple of (dirlist url, director_response) for the given namespace location
"""
logger.debug(f"Finding the collections endpoint for {fileloc}...")
await self._set_director_url()

url = urllib.parse.urljoin(self.director_url, fileloc)
# Check for cached namespace info (similar to get_working_cache)
fparsed = urllib.parse.urlparse(fileloc)
namespace_info = self._get_prefix_info(fparsed.path)

# Timeout response in seconds - the default response is 5 minutes
timeout = aiohttp.ClientTimeout(total=5)
session = await self.http_file_system.set_session()
async with session.request("PROPFIND", url, timeout=timeout, allow_redirects=False) as resp:
if "Link" not in resp.headers:
raise BadDirectorResponse()
collections_url = get_collections_url(resp.headers)
if namespace_info and namespace_info.cache_manager.director_response:
# We have cached director response, extract collections URL from it
director_response = namespace_info.cache_manager.director_response
collections_url = director_response.x_pel_ns_hdr.collections_url if director_response.x_pel_ns_hdr else None
else:
# No cache, query the director
await self._set_director_url()
url = urllib.parse.urljoin(self.director_url, fileloc)

if not collections_url:
logger.error(f"No collections endpoint found for {fileloc}")
raise NoCollectionsUrl()
# Timeout response in seconds - the default response is 5 minutes
timeout = aiohttp.ClientTimeout(total=5)
session = await self.http_file_system.set_session()
async with session.request("PROPFIND", url, timeout=timeout, allow_redirects=False) as resp:
if "Link" not in resp.headers:
raise BadDirectorResponse()
collections_url = get_collections_url(resp.headers)

dirlist_url = urllib.parse.urljoin(collections_url, fileloc)
# Parse the headers to get the full director response
director_response = parse_director_response(resp.headers)

# Parse the headers to get the full director response
director_response = parse_director_response(resp.headers)
director_response.location = dirlist_url
# Cache the director response for future use
namespace = director_response.x_pel_ns_hdr.namespace if director_response.x_pel_ns_hdr else ""
if namespace:
with self._namespace_lock:
existing = self._namespace_cache.get(namespace)
# Cache if namespace doesn't exist or existing entry has no director_response
if not existing or not existing.director_response:
self._namespace_cache[namespace] = _CacheManager(existing.cache_list if existing else [], director_response)

return dirlist_url, director_response
if not collections_url:
logger.error(f"No collections endpoint found for {fileloc}")
raise NoCollectionsUrl()

dirlist_url = urllib.parse.urljoin(collections_url, fparsed.path)
director_response.location = dirlist_url

return dirlist_url, director_response

def _get_prefix_info(self, path: str) -> Optional[NamespaceInfo]:
"""
Expand Down Expand Up @@ -745,6 +770,32 @@ async def _ls(self, path, detail=True, **kwargs):
self.dircache[path] = out
return self._remove_host_from_paths(out)

async def _ls_from_http(self, url, detail=True, **kwargs):
"""
This _ls is called from HTTPFileSystem and receives a cache URL.
We need to convert it to a namespace path and then to a collections URL.
Note: We do NOT remove hosts from the results because HTTPFileSystem needs
full URLs to download files.
"""
# Extract the path from the URL
parsed = urllib.parse.urlparse(url)
path = parsed.path

# Get the collections URL for this path
collections_url, director_response = await self.get_dirlist_url(path)

# Handle token generation if required
operation = self._get_token_operation("_ls")
self._handle_token_generation(collections_url, director_response, operation)

# Call _ls_real with the collections URL
if self.use_listings_cache and collections_url in self.dircache:
out = self.dircache[collections_url]
else:
out = await self._ls_real(collections_url, detail=detail)
self.dircache[collections_url] = out
return out

async def _ls_real(self, url, detail=True, client=None):
"""
This _ls_real uses a webdavclient listing rather than an https call. This lets pelicanfs identify whether an object
Expand Down Expand Up @@ -815,8 +866,10 @@ def get_item_detail(item):
return [get_item_detail(item) for item in items]
return sorted(set(items))

@_dirlist_dec
async def _isdir(self, path):
# Don't use @_dirlist_dec here because http_file_system._isdir will call
# _ls_from_http which handles the collections URL conversion
path = self._check_fspath(path)
return await self.http_file_system._isdir(path)

@_dirlist_dec
Expand Down Expand Up @@ -886,8 +939,10 @@ async def _glob(self, path, maxdepth=None, **kwargs):

return list(out)

@_dirlist_dec
async def _du(self, path, total=True, maxdepth=None, **kwargs):
# Don't use @_dirlist_dec here because http_file_system._du will call
# _walk which calls _ls_from_http which handles the collections URL conversion
path = self._check_fspath(path)
return await self.http_file_system._du(path, total, maxdepth, **kwargs)

@_dirlist_dec
Expand Down
123 changes: 123 additions & 0 deletions test/test_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Copyright (C) 2025, Pelican Project, Morgridge Institute for Research

Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may
obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from pytest_httpserver import HTTPServer

import pelicanfs.core


def test_get_directory_recursive(httpserver: HTTPServer, get_client, get_webdav_client, top_listing_response):
"""
Test that .get() with recursive=True can handle directory paths.

After fixing the 409 Conflict issue, PROPFIND requests now go to the
collections server which properly handles directory listings.
"""
foo_bar_url = httpserver.url_for("foo/bar")

httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")})
httpserver.expect_oneshot_request("/foo/bar").respond_with_data(
"",
status=307,
headers={
"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1',
"X-Pelican-Namespace": f"namespace=/foo, collections-url={foo_bar_url}",
},
)

# PROPFIND request with trailing slash now succeeds
httpserver.expect_request("/foo/bar/", method="PROPFIND").respond_with_data(top_listing_response, status=207)

pelfs = pelicanfs.core.PelicanFileSystem(
httpserver.url_for("/"),
get_client=get_client,
skip_instance_cache=True,
get_webdav_client=get_webdav_client,
)

# Test that listing works (which is what .get() does internally)
result = pelfs.ls("/foo/bar", detail=False)
assert isinstance(result, (list, set))


def test_ls_directory_with_trailing_slash_conflict(httpserver: HTTPServer, get_client, get_webdav_client, top_listing_response):
"""
Test that _ls_real properly lists directories with PROPFIND to collections server.

After the fix, PROPFIND requests are sent to the collections server which
properly handles directory listings with trailing slashes.
"""
foo_bar_url = httpserver.url_for("foo/bar")

httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")})

# First request without trailing slash gets redirect with collections-url
httpserver.expect_oneshot_request("/foo/bar").respond_with_data(
"",
status=307,
headers={
"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1',
"X-Pelican-Namespace": f"namespace=/foo, collections-url={foo_bar_url}",
},
)

# PROPFIND to collections server now succeeds
httpserver.expect_request("/foo/bar/", method="PROPFIND").respond_with_data(top_listing_response, status=207)

pelfs = pelicanfs.core.PelicanFileSystem(
httpserver.url_for("/"),
get_client=get_client,
skip_instance_cache=True,
get_webdav_client=get_webdav_client,
)

# Call ls - should successfully list directory
result = pelfs.ls("/foo/bar", detail=False)
assert isinstance(result, (list, set))


def test_ls_directory_not_found(httpserver: HTTPServer, get_client, get_webdav_client):
"""
Test that when a directory doesn't exist, we get a FileNotFoundError.
"""
foo_bar_url = httpserver.url_for("foo/bar")

httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")})
httpserver.expect_oneshot_request("/foo/bar").respond_with_data(
"",
status=307,
headers={
"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1',
"X-Pelican-Namespace": f"namespace=/foo, collections-url={foo_bar_url}",
},
)

# PROPFIND returns 404 for non-existent directory
httpserver.expect_request("/foo/bar/", method="PROPFIND").respond_with_data("Not found", status=404)
httpserver.expect_request("/foo/bar", method="PROPFIND").respond_with_data("Not found", status=404)

pelfs = pelicanfs.core.PelicanFileSystem(
httpserver.url_for("/"),
get_client=get_client,
skip_instance_cache=True,
get_webdav_client=get_webdav_client,
)

# Should raise FileNotFoundError for non-existent directory
try:
pelfs.ls("/foo/bar", detail=False)
assert False, "Expected FileNotFoundError"
except FileNotFoundError:
pass # Expected
10 changes: 5 additions & 5 deletions test/test_namespace_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_cache_manager_without_token_requirement():

def test_get_prefix_info_returns_namespace_info():
"""Test that _get_prefix_info returns namespace_info with namespace-level token requirements"""
pelfs = PelicanFileSystem()
pelfs = PelicanFileSystem(skip_instance_cache=True)

# Mock the namespace cache to return a _CacheManager with token requirement
cache_list = ["https://cache1.example.com", "https://cache2.example.com"]
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_get_origin_url_parses_token_requirements(monkeypatch):

from pelicanfs.core import PelicanFileSystem

pelfs = PelicanFileSystem()
pelfs = PelicanFileSystem(skip_instance_cache=True)

# Mock the get_director_headers method to return headers with token requirement
async def mock_get_director_headers(fileloc, origin=False):
Expand Down Expand Up @@ -174,7 +174,7 @@ def test_get_origin_url_no_token_requirement(monkeypatch):

from pelicanfs.core import PelicanFileSystem

pelfs = PelicanFileSystem()
pelfs = PelicanFileSystem(skip_instance_cache=True)

# Mock the get_director_headers method to return headers without token requirement
async def mock_get_director_headers(fileloc, origin=False):
Expand Down Expand Up @@ -215,7 +215,7 @@ def test_get_dirlist_url_parses_token_requirements(monkeypatch):

from pelicanfs.core import PelicanFileSystem

pelfs = PelicanFileSystem()
pelfs = PelicanFileSystem(skip_instance_cache=True)

# Mock the _set_director_url method
async def mock_set_director_url():
Expand Down Expand Up @@ -287,7 +287,7 @@ def test_get_dirlist_url_no_token_requirement(monkeypatch):

from pelicanfs.core import PelicanFileSystem

pelfs = PelicanFileSystem()
pelfs = PelicanFileSystem(skip_instance_cache=True)

# Mock the _set_director_url method
async def mock_set_director_url():
Expand Down
Loading