diff --git a/src/XProtocol/XProtocol.hh b/src/XProtocol/XProtocol.hh index 0bcb8df6399..d7681d30176 100644 --- a/src/XProtocol/XProtocol.hh +++ b/src/XProtocol/XProtocol.hh @@ -620,6 +620,7 @@ enum XQueryType { kXR_Qckscan= 6, kXR_Qconfig= 7, kXR_Qvisa = 8, + kXR_Qhead = 9, kXR_Qopaque=16, kXR_Qopaquf=32, kXR_Qopaqug=64 diff --git a/src/XrdCl/XrdClFileSystem.hh b/src/XrdCl/XrdClFileSystem.hh index 8229a234c10..1792c0e3f59 100644 --- a/src/XrdCl/XrdClFileSystem.hh +++ b/src/XrdCl/XrdClFileSystem.hh @@ -60,6 +60,7 @@ namespace XrdCl Space = kXR_Qspace, //!< Query logical space stats Stats = kXR_QStats, //!< Query server stats Visa = kXR_Qvisa, //!< Query file visa attributes + Head = kXR_Qhead, //!< Query http header response XAttr = kXR_Qxattr //!< Query file extended attributes }; }; diff --git a/src/XrdOuc/XrdOucCache.hh b/src/XrdOuc/XrdOucCache.hh index ad582bcbbaf..87ad34a20fb 100644 --- a/src/XrdOuc/XrdOucCache.hh +++ b/src/XrdOuc/XrdOucCache.hh @@ -36,6 +36,7 @@ #include "XrdOuc/XrdOucCacheStats.hh" #include "XrdOuc/XrdOucIOVec.hh" +#include "XrdCl/XrdClBuffer.hh" struct stat; class XrdOucEnv; @@ -147,6 +148,20 @@ long long FSize() = 0; virtual int Fstat(struct stat &sbuff) {(void)sbuff; return 1;} + +//------------------------------------------------------------------------------ +//! Perform an fcntl() operation (defaults to passthrough). +//! +//! @param AMT, for the moment XrdCl::Buffer to pass query code value and +//! XrdCl::Buffer to pass the string response. The XrdCL::Buffers is +//! interpreted as std::string +//! +//! @return <0 - fstat failed, value is -errno. +//! =0 - fstat succeeded, sbuff holds stat information. +//! >0 - fstat could not be done, forward operation to next level. +//------------------------------------------------------------------------------ +virtual int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) { return -1; } + //----------------------------------------------------------------------------- //! Get the file's location (i.e. endpoint hostname and port) //! diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 33ea646ef4f..94b328a5214 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -22,10 +22,13 @@ #include #include "XrdCl/XrdClURL.hh" +#include "XrdCl/XrdClFileSystem.hh" +#include "XrdCl/XrdClFileStateHandler.hh" #include "XrdOuc/XrdOucEnv.hh" #include "XrdOuc/XrdOucUtils.hh" #include "XrdOuc/XrdOucPrivateUtils.hh" +#include "XrdOuc/XrdOucJson.hh" #include "XrdSys/XrdSysTimer.hh" #include "XrdSys/XrdSysTrace.hh" @@ -425,6 +428,7 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f } // This is always true, now that IOFileBlock is unsupported. + if (filesize == 0) { struct stat st; @@ -444,7 +448,7 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f if (filesize >= 0) { - file = File::FileOpen(path, off, filesize); + file = File::FileOpen(path, off, filesize, io->GetInput()); } { @@ -904,6 +908,20 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen, return -ENOENT; } +//______________________________________________________________________________ +// If supported, write Cache-Control as xattr to cinfo file. +// One can use file descriptor or full path interchangeably +//------------------------------------------------------------------------------ +void Cache::WriteCacheControlXAttr(int cinfo_fd, const char* path, const std::string& cc) +{ + if (m_metaXattr) { + int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), path, cinfo_fd, 0); + if (res != 0) { + TRACE(Error, "WritecacheControlXAttr error setting xattr " << res); + } + } +} + //______________________________________________________________________________ // If supported, write file_size as xattr to cinfo file. //------------------------------------------------------------------------------ @@ -958,6 +976,47 @@ long long Cache::DetermineFullFileSize(const std::string &cinfo_fname) return ret; } +//______________________________________________________________________________ +// Get cache control attributes from the corresponding cinfo-file name. +// Returns -error on failure. +//------------------------------------------------------------------------------ +int Cache::GetCacheControlXAttr(const std::string &cinfo_fname, std::string& ival) +{ + if (m_metaXattr) { + + char pfn[4096]; + m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096); + + char cc[512]; + int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, pfn, -1); + if (res > 0) + { + std::string tmp(cc, res); + ival = tmp; + } + return res; + } + return 0; +} + +//______________________________________________________________________________ +// Get cache control attributes from the corresponding cinfo-file name. +// Returns -error on failure. +//------------------------------------------------------------------------------ +int Cache::GetCacheControlXAttr(int fd, std::string& ival) +{ + if (m_metaXattr) { + char cc[512]; + int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, nullptr, fd); + if (res > 0) + { + ival = std::string(cc, res); + return res; + } + } + return 0; +} + //______________________________________________________________________________ // Calculate if the file is to be considered cached for the purposes of // only-if-cached and setting of atime of the Stat() calls. @@ -1097,6 +1156,64 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK) { TRACE(Dump, "Prepare defer open " << f_name); + + std::string icc; + if (GetCacheControlXAttr(i_name, icc) > 0) { + using namespace nlohmann; + json cc_json = json::parse(icc); + + bool mustRevalidate = cc_json.contains("revalidate") && (cc_json["revalidate"] == true); + bool hasExpired = false; + if (cc_json.contains("expire")) + { + time_t current_time; + current_time = time(NULL); + if (current_time > cc_json["expire"]) + hasExpired = true; + } + + bool ccIsValid = true; + + if (cc_json.contains("ETag") && (mustRevalidate || hasExpired)) { + // Compare cinfo xattr etag and the etag from file system query response + // Note: qeury returns only etag value, not a json string + XrdCl::FileSystem fs(url); + XrdCl::Buffer queryArgs(1024); // pass file path throug args: reserve bytes to store path + queryArgs.FromString(curl); + XrdCl::Buffer *response = nullptr; + XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Head, queryArgs, response); + + if (st.IsOK()) + { + std::string etag = response->ToString(); + ccIsValid = (etag == cc_json["ETag"]); + TRACE(Info, "Prepare " << f_name << ", ETag valid res: " << ccIsValid); + + // update expiration time if Etag is valid + if (cc_json.contains("max-age")) + { + time_t ma = cc_json["max-age"]; + cc_json["expire"] = ma + time(NULL); + char pfn[4096]; + m_oss->Lfn2Pfn(i_name.c_str(), pfn, 4096); + WriteCacheControlXAttr(-1, pfn, cc_json.dump()); + } + } + else + { + // Message has a status beacuse we are in the block condition for cache-contol xattr + TRACE(Error, "Prepare() XrdCl::FileSystem::Query failed " << f_name.c_str()); + ccIsValid = false; + } + } + + if (!ccIsValid) + { + // invalidate cinfo on ETag mismatch + UnlinkFile(f_name, false); + } + } // end chekcing cache control xattr in cinfo file + return 1; } else diff --git a/src/XrdPfc/XrdPfc.hh b/src/XrdPfc/XrdPfc.hh index 45c9448b61c..b8a5002bf08 100644 --- a/src/XrdPfc/XrdPfc.hh +++ b/src/XrdPfc/XrdPfc.hh @@ -186,8 +186,12 @@ public: virtual int ConsiderCached(const char *url); bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk); + void WriteCacheControlXAttr(int cinfo_fd, const char* path, const std::string& cc); void WriteFileSizeXAttr(int cinfo_fd, long long file_size); long long DetermineFullFileSize(const std::string &cinfo_fname); + int GetCacheControlXAttr(const std::string &cinfo_fname, std::string& res); + int GetCacheControlXAttr(int fd, std::string& res); + //-------------------------------------------------------------------- //! \brief Makes decision if the original XrdOucCacheIO should be cached. diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index f55f881aa04..61ab1d5528c 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -27,8 +27,11 @@ #include "XrdSys/XrdSysTimer.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucEnv.hh" +#include "XrdOuc/XrdOucJson.hh" #include "XrdSfs/XrdSfsInterface.hh" +#include "XrdCl/XrdClFileStateHandler.hh" + #include #include #include @@ -135,10 +138,10 @@ void File::Close() //------------------------------------------------------------------------------ -File* File::FileOpen(const std::string &path, long long offset, long long fileSize) +File* File::FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO* inputIO) { File *file = new File(path, offset, fileSize); - if ( ! file->Open()) + if ( ! file->Open(inputIO)) { delete file; file = 0; @@ -420,7 +423,7 @@ void File::RemoveIO(IO *io) //------------------------------------------------------------------------------ -bool File::Open() +bool File::Open(XrdOucCacheIO* inputIO) { // Sets errno accordingly. @@ -531,6 +534,34 @@ bool File::Open() m_cfi.Write(m_info_file, ifn.c_str()); m_info_file->Fsync(); cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size); + + // access and write cache-control attributes + XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::Head; + XrdCl::Buffer queryArgs(5); + std::string qs = std::to_string(queryCode); + queryArgs.FromString(qs); + XrdCl::Buffer *responseFctl = nullptr; + int resFctl = inputIO->Fcntl(queryArgs, responseFctl); + if (resFctl == 0) + { + std::string cc_str = responseFctl->ToString(); + nlohmann::json cc_json = nlohmann::json::parse(cc_str); + if (cc_json.contains("max-age")) + { + time_t ma = cc_json["max-age"]; + ma += time(NULL); + cc_json["expire"] = ma; + cc_str = cc_json.dump(); + } + TRACE(Error, "GetFile() XrdCl::File::Fcntl value " << cc_str); + cache()->WriteCacheControlXAttr(m_info_file->getFD(), nullptr, cc_str); + } + else if (resFctl != kXR_Unsupported) + { + // Query XrdCl::QueryCode::Head is optional, print error only if informatin is supported + TRACE(Error, "GetFile() XrdCl::File::Fcntl query XrdCl::QueryCode::Head failed " << inputIO->Path()); + } + TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks()); } else diff --git a/src/XrdPfc/XrdPfcFile.hh b/src/XrdPfc/XrdPfcFile.hh index 0d103ad4cc7..314fad2177a 100644 --- a/src/XrdPfc/XrdPfcFile.hh +++ b/src/XrdPfc/XrdPfcFile.hh @@ -208,7 +208,7 @@ public: // Constructor, destructor, Open() and Close() are private. //! Static constructor that also does Open. Returns null ptr if Open fails. - static File* FileOpen(const std::string &path, long long offset, long long fileSize); + static File* FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO*); //! Handle removal of a block from Cache's write queue. void BlockRemovedFromWriteQ(Block*); @@ -301,7 +301,7 @@ private: void Close(); //! Open file handle for data file and info file on local disk. - bool Open(); + bool Open(XrdOucCacheIO* inputOrigin); static const char *m_traceID; diff --git a/src/XrdPosix/XrdPosixFile.cc b/src/XrdPosix/XrdPosixFile.cc index 8863475843a..71cb8eff07d 100644 --- a/src/XrdPosix/XrdPosixFile.cc +++ b/src/XrdPosix/XrdPosixFile.cc @@ -386,6 +386,13 @@ int XrdPosixFile::Fstat(struct stat &buf) buf.st_mode = myMode; return 0; } + +int XrdPosixFile::Fcntl(const XrdCl::Buffer &arg, XrdCl::Buffer *&response) +{ + // AMT: temporary solution to handle unsuported operations in XrdPfc::File::Open() + XrdCl::XRootDStatus status = clFile.Fcntl(arg, response); + return status.IsOK() ? 0 : status.errNo; +} /******************************************************************************/ /* H a n d l e R e s p o n s e */ diff --git a/src/XrdPosix/XrdPosixFile.hh b/src/XrdPosix/XrdPosixFile.hh index 6d99cf46489..5335384f969 100644 --- a/src/XrdPosix/XrdPosixFile.hh +++ b/src/XrdPosix/XrdPosixFile.hh @@ -100,6 +100,8 @@ static void DelayedDestroy(XrdPosixFile *fp); int Fstat(struct stat &buf) override; + int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) override; + const char *Location(bool refresh=false) override; void HandleResponse(XrdCl::XRootDStatus *status, diff --git a/src/XrdPosix/XrdPosixPrepIO.hh b/src/XrdPosix/XrdPosixPrepIO.hh index d7eef1c250c..2cbada0879a 100644 --- a/src/XrdPosix/XrdPosixPrepIO.hh +++ b/src/XrdPosix/XrdPosixPrepIO.hh @@ -48,6 +48,8 @@ long long FSize() {return (Init() ? fileP->FSize() : openRC);} int Fstat(struct stat &buf) {return (Init() ? fileP->Fstat(buf) : openRC);} +int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) { return (Init() ? fileP->Fcntl(args, res) : openRC); } + int Open() {Init(); return openRC;} const char *Path() {return fileP->Path();}