Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: collect files in PV when another container restart or stop #2010

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/checkpoint/CheckPointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
string realFilePath;
int32_t fileOpenFlag = 0; // default, we close file ptr
int32_t containerStopped = 0;
string containerID;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个场景E2E是否可以构造,是否有对应的用例?

int32_t lastForceRead = 0;
int32_t idxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UT、E2E重点找迅飞review下。

if (meta.isMember("real_file_name")) {
Expand Down Expand Up @@ -242,6 +243,9 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
if (meta.isMember("container_stopped")) {
containerStopped = meta["container_stopped"].asInt();
}
if (meta.isMember("container_id")) {
containerID = meta["container_id"].asString();
}
if (meta.isMember("last_force_read")) {
lastForceRead = meta["last_force_read"].asInt();
}
Expand All @@ -267,6 +271,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
realFilePath,
fileOpenFlag != 0,
containerStopped != 0,
containerID,
lastForceRead != 0);
ptr->mLastUpdateTime = update_time;
ptr->mIdxInReaderArray = idxInReaderArray;
Expand Down Expand Up @@ -300,6 +305,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
realFilePath,
fileOpenFlag != 0,
containerStopped != 0,
containerID,
lastForceRead != 0);
ptr->mLastUpdateTime = update_time;
ptr->mIdxInReaderArray = idxInReaderArray;
Expand Down Expand Up @@ -345,6 +351,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
leaf["dev"] = Json::Value(Json::UInt64(checkPointPtr->mDevInode.dev));
leaf["file_open"] = Json::Value(checkPointPtr->mFileOpenFlag ? 1 : 0);
leaf["container_stopped"] = Json::Value(checkPointPtr->mContainerStopped ? 1 : 0);
leaf["container_id"] = Json::Value(checkPointPtr->mContainerID);
leaf["last_force_read"] = Json::Value(checkPointPtr->mLastForceRead ? 1 : 0);
leaf["config_name"] = Json::Value(checkPointPtr->mConfigName);
// forward compatible
Expand Down Expand Up @@ -375,6 +382,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
leaf["dev"] = Json::Value(Json::UInt64(checkPointPtr->mDevInode.dev));
leaf["file_open"] = Json::Value(checkPointPtr->mFileOpenFlag ? 1 : 0);
leaf["container_stopped"] = Json::Value(checkPointPtr->mContainerStopped ? 1 : 0);
leaf["container_id"] = Json::Value(checkPointPtr->mContainerID);
leaf["last_force_read"] = Json::Value(checkPointPtr->mLastForceRead ? 1 : 0);
leaf["config_name"] = Json::Value(checkPointPtr->mConfigName);
// forward compatible
Expand Down
3 changes: 3 additions & 0 deletions core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class CheckPoint {
int32_t mLastUpdateTime = 0;
bool mFileOpenFlag = false;
bool mContainerStopped = false;
std::string mContainerID;
bool mLastForceRead = false;
std::string mCache;
std::string mConfigName;
Expand All @@ -64,6 +65,7 @@ class CheckPoint {
const std::string& realFileName,
bool fileOpenFlag,
bool containerStopped,
std::string containerID,
bool lastForceRead)
: mDevInode(devInode),
mOffset(offset),
Expand All @@ -72,6 +74,7 @@ class CheckPoint {
mLastUpdateTime(0),
mFileOpenFlag(fileOpenFlag),
mContainerStopped(containerStopped),
mContainerID(containerID),
mLastForceRead(lastForceRead),
mConfigName(configName),
mFileName(filename),
Expand Down
3 changes: 3 additions & 0 deletions core/file_server/ConfigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,9 @@ void ConfigManager::GetContainerStoppedEvents(std::vector<Event*>& eventVec) {
continue;
}
Event* pStoppedEvent = new Event(iter->mRealBaseDir, "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, -1, 0);
pStoppedEvent->SetConfigName(cmd->mConfigName);
pStoppedEvent->SetContainerID(containerInfo.mID);
iter->mStopped = true;
LOG_DEBUG(
sLogger,
("GetContainerStoppedEvent Type", pStoppedEvent->GetType())("Source", pStoppedEvent->GetSource())(
Expand Down
1 change: 1 addition & 0 deletions core/file_server/ContainerInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ struct ContainerInfo {
std::vector<sls_logs::LogTag> mTags; // ContainerNameTag
std::vector<sls_logs::LogTag> mMetadatas; // ExternalEnvTag and ExternalK8sLabelTag
Json::Value mJson; // this obj's json, for saving to local file
bool mStopped = false; // whether this container is stopped

static bool ParseByJSONObj(const Json::Value&, ContainerInfo&, std::string&);
static bool ParseAllByJSONObj(const Json::Value&, std::unordered_map<std::string, ContainerInfo>&, std::string&);
Expand Down
4 changes: 3 additions & 1 deletion core/file_server/EventDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ void EventDispatcher::AddExistedCheckPointFileEvents() {
cpt.real_path(),
1,
0,
"",
0);
const auto result = validateCheckpoint(v1Cpt, cachePathDevInodeMap, eventVec);
switch (result) {
Expand Down Expand Up @@ -834,11 +835,12 @@ void EventDispatcher::UnregisterEventHandler(const string& path) {
LOG_INFO(sLogger, ("remove the watcher for dir", path)("wd", wd));
}

void EventDispatcher::StopAllDir(const string& baseDir) {
void EventDispatcher::StopAllDir(const string& baseDir, const string& containerID) {
LOG_DEBUG(sLogger, ("Stop all sub dir", baseDir));
auto subDirAndHandlers = FindAllSubDirAndHandler(baseDir);
for (auto& subDirAndHandler : subDirAndHandlers) {
Event e(subDirAndHandler.first, "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, -1, 0);
e.SetContainerID(containerID);
subDirAndHandler.second->Handle(e);
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/file_server/EventDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class EventDispatcher {
*
* @param path for whom event handler will be removed.
*/
void StopAllDir(const std::string& baseDir);
void StopAllDir(const std::string& baseDir, const std::string& containerID);

EventHandler* GetHandler(const char* path) {
MapType<std::string, int>::Type::iterator itr = mPathWdMap.find(path);
Expand Down Expand Up @@ -287,6 +287,7 @@ class EventDispatcher {
friend class FuseFileUnittest;
friend class MultiServerConfigUpdatorUnitest;
friend class EventDispatcherDirUnittest;
friend class ModifyHandlerUnittest;

void CleanEnviroments();
int32_t GetInotifyWatcherCount();
Expand Down
3 changes: 2 additions & 1 deletion core/file_server/FileDiscoveryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,8 @@ ContainerInfo* FileDiscoveryOptions::GetContainerPathByLogPath(const string& log
if (!mContainerInfos) {
return NULL;
}
for (size_t i = 0; i < mContainerInfos->size(); ++i) {
// reverse order to find the latest container
for (int i = mContainerInfos->size() - 1; i >= 0; --i) {
if (_IsSubPath((*mContainerInfos)[i].mRealBaseDir, logPath)) {
return &(*mContainerInfos)[i];
}
Expand Down
5 changes: 5 additions & 0 deletions core/file_server/event/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Event {
uint64_t mInode;
int64_t mHashKey;
std::string mConfigName;
std::string mContainerID;

// for read timeout
int64_t mLastReadPos;
Expand Down Expand Up @@ -108,6 +109,8 @@ class Event {

const std::string& GetConfigName() const { return mConfigName; }

const std::string& GetContainerID() const { return mContainerID; }

int64_t GetLastReadPos() const { return mLastReadPos; }

int64_t GetLastFilePos() const { return mLastFilePos; }
Expand All @@ -122,6 +125,8 @@ class Event {

void SetConfigName(const std::string& configName) { mConfigName = configName; }

void SetContainerID(const std::string& containerID) { mContainerID = containerID; }

void SetLastReadPos(int64_t lastReadPos) { mLastReadPos = lastReadPos; }

void SetLastFilePos(int64_t lastFilePos) { mLastFilePos = lastFilePos; }
Expand Down
35 changes: 24 additions & 11 deletions core/file_server/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,9 @@ void ModifyHandler::Handle(const Event& event) {
for (auto& pair : mNameReaderMap) {
LogFileReaderPtrArray& readerArray = pair.second;
for (auto& reader : readerArray) {
if (reader->GetContainerID() != event.GetContainerID()) {
continue;
}
reader->SetContainerStopped();
if (reader->IsReadToEnd() || reader->ShouldForceReleaseDeletedFileFd()) {
if (reader->IsFileOpened()) {
Expand All @@ -539,7 +542,7 @@ void ModifyHandler::Handle(const Event& event) {
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize()));
"file size", reader->GetFileSize())("container id", event.GetContainerID()));
if (!readerArray[0]->ShouldForceReleaseDeletedFileFd() && reader->HasDataInCache()) {
ForceReadLogAndPush(readerArray[0]);
}
Expand Down Expand Up @@ -785,16 +788,26 @@ void ModifyHandler::Handle(const Event& event) {
"file size", reader->GetFileSize()));
reader->CloseFilePtr();
} else if (reader->IsContainerStopped()) {
// release fd as quick as possible
LOG_INFO(
sLogger,
("close the file", "current file has been read, and the relative container has been stopped")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())("file device",
reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
ForceReadLogAndPush(reader);
reader->CloseFilePtr();
// update container info one more time, ensure file is hold by same cotnainer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cotnainer -> container

if (reader->UpdateContainerInfo() && !reader->IsContainerStopped()) {
LOG_INFO(sLogger,
("file is reused by a new container", reader->GetContainerID())(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
} else {
// release fd as quick as possible
LOG_INFO(sLogger,
("close the file",
"current file has been read, and the relative container has been stopped")(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
ForceReadLogAndPush(reader);
reader->CloseFilePtr();
}
}
break;
}
Expand Down
3 changes: 2 additions & 1 deletion core/file_server/event_handler/EventHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class ModifyHandler : public EventHandler {
void DeleteRollbackReader();
void MakeSpaceForNewReader();


static bool CompareReaderByUpdateTime(const LogFileReader* left, const LogFileReader* right) {
return left->GetLastUpdateTime() < right->GetLastUpdateTime();
}
Expand Down Expand Up @@ -113,6 +112,7 @@ class ModifyHandler : public EventHandler {
virtual void HandleTimeOut();
virtual bool DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag);
bool IsAllFileRead() override;
const std::string& GetConfigName() const { return mConfigName; }

#ifdef APSARA_UNIT_TEST_MAIN
friend class ConfigUpdatorUnittest;
Expand Down Expand Up @@ -176,6 +176,7 @@ class CreateModifyHandler : public EventHandler {
friend class ConfigUpdatorUnittest;
friend class EventDispatcherTest;
friend class SenderUnittest;
friend class EventDispatcherContainerUnittest;
#endif
};

Expand Down
2 changes: 1 addition & 1 deletion core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {
string path = source;
if (object.size() > 0)
path += PATH_SEPARATOR + object;
dispatcher->StopAllDir(path);
dispatcher->StopAllDir(path, ev->GetContainerID());
} else {
EventHandler* handler = dispatcher->GetHandler(source.c_str());
if (handler) {
Expand Down
46 changes: 40 additions & 6 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ LogFileReader* LogFileReader::CreateLogFileReader(const string& hostLogPathDir,
? discoveryConfig.first->GetWildcardPaths()[0]
: discoveryConfig.first->GetBasePath(),
containerPath->mRealBaseDir.size());
reader->SetContainerID(containerPath->mID);
reader->AddExtraTags(containerPath->mMetadatas);
reader->AddExtraTags(containerPath->mTags);
}
Expand Down Expand Up @@ -265,6 +266,7 @@ void LogFileReader::DumpMetaToMem(bool checkConfigFlag, int32_t idxInReaderArray
mRealLogPath,
mLogFileOp.IsOpen(),
mContainerStopped,
mContainerID,
mLastForceRead);
// use last event time as checkpoint's last update time
checkPointPtr->mLastUpdateTime = mLastEventTime;
Expand Down Expand Up @@ -312,16 +314,23 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t
mLastFileSignatureSize = checkPointPtr->mSignatureSize;
mRealLogPath = checkPointPtr->mRealFileName;
mLastEventTime = checkPointPtr->mLastUpdateTime;
mContainerStopped = checkPointPtr->mContainerStopped;
if (checkPointPtr->mContainerID == mContainerID) {
mContainerStopped = checkPointPtr->mContainerStopped;
} else {
LOG_INFO(
sLogger,
("container id is different between container discovery and checkpoint",
checkPointPtr->mRealFileName)("checkpoint", checkPointPtr->mContainerID)("current", mContainerID));
}
// new property to recover reader exactly from checkpoint
mIdxInReaderArrayFromLastCpt = checkPointPtr->mIdxInReaderArray;
LOG_INFO(sLogger,
("recover log reader status from checkpoint, project", GetProject())("logstore", GetLogstore())(
"config", GetConfigName())("log reader queue name", mHostLogPath)("file device",
ToString(mDevInode.dev))(
"file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)(
"file signature size", mLastFileSignatureSize)("real file path", mRealLogPath)(
"last file position", mLastFilePos)("index in reader array", mIdxInReaderArrayFromLastCpt));
"config", GetConfigName())("log reader queue name", mHostLogPath)(
"file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))(
"file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)(
"real file path", mRealLogPath)("last file position", mLastFilePos)(
"index in reader array", mIdxInReaderArrayFromLastCpt)("container id", mContainerID));
// if file is open or
// last update time is new and the file's container is not stopped we
// we should use first modify
Expand Down Expand Up @@ -2521,6 +2530,31 @@ const std::string& LogFileReader::GetConvertedPath() const {
#endif
}

bool LogFileReader::UpdateContainerInfo() {
FileDiscoveryConfig discoveryConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName);
if (discoveryConfig.first == nullptr) {
return false;
}
ContainerInfo* containerInfo = discoveryConfig.first->GetContainerPathByLogPath(mHostLogPathDir);
Copy link
Collaborator

@yyuuttaaoo yyuuttaaoo Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可能有问题,如果一个是主机上采集pv,一个是容器内,那么主机的是不会去更新container信息的,此时还能否将stop的container重置为空?
所以config信息是不是还是有用,不能只靠containerid

if (containerInfo && containerInfo->mID != mContainerID) {
LOG_INFO(sLogger,
("container info of file reader changed", "may be because container restart")(
"old container id", mContainerID)("new container id", containerInfo->mID)(
"container status", containerInfo->mStopped ? "stopped" : "running"));
// if config have wildcard path, use mWildcardPaths[0] as base path
SetDockerPath(!discoveryConfig.first->GetWildcardPaths().empty() ? discoveryConfig.first->GetWildcardPaths()[0]
: discoveryConfig.first->GetBasePath(),
containerInfo->mRealBaseDir.size());
SetContainerID(containerInfo->mID);
mContainerStopped = containerInfo->mStopped;
mExtraTags.clear();
AddExtraTags(containerInfo->mMetadatas);
AddExtraTags(containerInfo->mTags);
return true;
}
return false;
}

#ifdef APSARA_UNIT_TEST_MAIN
void LogFileReader::UpdateReaderManual() {
if (mLogFileOp.IsOpen()) {
Expand Down
7 changes: 7 additions & 0 deletions core/file_server/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ class LogFileReader {

time_t GetContainerStoppedTime() const { return mContainerStoppedTime; }

std::string GetContainerID() const { return mContainerID; }

void SetContainerID(const std::string& containerID) { mContainerID = containerID; }

bool UpdateContainerInfo();

bool IsFileOpened() const { return mLogFileOp.IsOpen(); }

bool ShouldForceReleaseDeletedFileFd();
Expand Down Expand Up @@ -506,6 +512,7 @@ class LogFileReader {
bool mFileDeleted = false;
time_t mDeletedTime = 0;
bool mContainerStopped = false;
std::string mContainerID;
time_t mContainerStoppedTime = 0;
time_t mReadStoppedContainerAlarmTime = 0;
int32_t mReadDelayTime = 0;
Expand Down
2 changes: 1 addition & 1 deletion core/unittest/controller/EventDispatcherDirUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class EventDispatcherDirUnittest : public ::testing::Test {
void TestStopAllDir() {
LOG_INFO(sLogger, ("TestStopAllDir() begin", time(NULL)));
std::string baseDir = "/basepath0";
EventDispatcher::GetInstance()->StopAllDir(baseDir);
EventDispatcher::GetInstance()->StopAllDir(baseDir, "");
for (size_t i = 0; i < 10; ++i) {
if (i < 4) {
APSARA_TEST_EQUAL_FATAL(mHandlers[i].handle_count, 1);
Expand Down
Loading
Loading