Skip to content

Commit

Permalink
feat: go pipeline support file tag and host id (#2050)
Browse files Browse the repository at this point in the history
* feat: go pipeline support file tag and host id

* fix lint

* add a new param to pass data dir to go

* fix

* fix

* fix

* fix
  • Loading branch information
Abingcbc authored Jan 23, 2025
1 parent 5e562cb commit 183af3a
Show file tree
Hide file tree
Showing 27 changed files with 410 additions and 92 deletions.
2 changes: 1 addition & 1 deletion core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ std::string GetAgentDataDir() {
dir = GetProcessExecutionDir();
#else
if (BOOL_FLAG(logtail_mode)) {
dir = AppConfig::GetInstance()->GetLoongcollectorConfDir() + PATH_SEPARATOR + "checkpoint";
dir = AppConfig::GetInstance()->GetLoongcollectorConfDir() + PATH_SEPARATOR + "checkpoint" + PATH_SEPARATOR;
} else {
dir = STRING_FLAG(data_dir) + PATH_SEPARATOR;
}
Expand Down
1 change: 1 addition & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ std::string GetAgentName();
std::string GetMonitorInfoFileName();
std::string GetSymLinkName();
std::string GetAgentPrefix();
std::string GetFileTagsDir();

template <class T>
class DoubleBuffer {
Expand Down
2 changes: 1 addition & 1 deletion core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
#endif

DEFINE_FLAG_BOOL(ilogtail_disable_core, "disable core in worker process", true);
DEFINE_FLAG_INT32(file_tags_update_interval, "second", 1);
DEFINE_FLAG_INT32(file_tags_update_interval, "second", 60);
DEFINE_FLAG_INT32(config_scan_interval, "seconds", 10);
DEFINE_FLAG_INT32(tcmalloc_release_memory_interval, "force release memory held by tcmalloc, seconds", 300);
DEFINE_FLAG_INT32(exit_flushout_duration, "exit process flushout duration", 20 * 1000);
Expand Down
27 changes: 17 additions & 10 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DECLARE_FLAG_STRING(ALIYUN_LOG_FILE_TAGS);
DECLARE_FLAG_INT32(file_tags_update_interval);
DECLARE_FLAG_STRING(agent_host_id);

using namespace std;
using namespace logtail;
Expand All @@ -63,19 +65,24 @@ LogtailPlugin::LogtailPlugin() {
mPluginContainerConfig.mAliuid = STRING_FLAG(logtail_profile_aliuid);
mPluginContainerConfig.mCompressor = CompressorFactory::GetInstance()->Create(CompressType::ZSTD);

mPluginCfg["LoongcollectorConfDir"] = AppConfig::GetInstance()->GetLoongcollectorConfDir();
mPluginCfg["LoongcollectorLogDir"] = GetAgentLogDir();
mPluginCfg["LoongcollectorDataDir"] = GetAgentGoCheckpointDir();
mPluginCfg["LoongcollectorLogConfDir"] = GetAgentGoLogConfDir();
mPluginCfg["LoongcollectorPluginLogName"] = GetPluginLogName();
mPluginCfg["LoongcollectorVersionTag"] = GetVersionTag();
mPluginCfg["LoongcollectorCheckPointFile"] = GetGoPluginCheckpoint();
mPluginCfg["LoongcollectorThirdPartyDir"] = GetAgentThirdPartyDir();
mPluginCfg["LoongcollectorPrometheusAuthorizationPath"] = GetAgentPrometheusAuthorizationPath();
mPluginCfg["LoongCollectorConfDir"] = AppConfig::GetInstance()->GetLoongcollectorConfDir();
mPluginCfg["LoongCollectorLogDir"] = GetAgentLogDir();
mPluginCfg["LoongCollectorDataDir"] = GetAgentDataDir();
mPluginCfg["LoongCollectorLogConfDir"] = GetAgentGoLogConfDir();
mPluginCfg["LoongCollectorPluginLogName"] = GetPluginLogName();
mPluginCfg["LoongCollectorVersionTag"] = GetVersionTag();
mPluginCfg["LoongCollectorGoCheckPointDir"] = GetAgentGoCheckpointDir();
mPluginCfg["LoongCollectorGoCheckPointFile"] = GetGoPluginCheckpoint();
mPluginCfg["LoongCollectorThirdPartyDir"] = GetAgentThirdPartyDir();
mPluginCfg["LoongCollectorPrometheusAuthorizationPath"] = GetAgentPrometheusAuthorizationPath();
mPluginCfg["HostIP"] = LoongCollectorMonitor::mIpAddr;
mPluginCfg["Hostname"] = LoongCollectorMonitor::mHostname;
mPluginCfg["EnableSlsMetricsFormat"] = BOOL_FLAG(enable_sls_metrics_format);
mPluginCfg["FileTagsPath"] = STRING_FLAG(ALIYUN_LOG_FILE_TAGS);
if (!STRING_FLAG(ALIYUN_LOG_FILE_TAGS).empty()) {
mPluginCfg["FileTagsPath"] = GetFileTagsDir();
mPluginCfg["FileTagsInterval"] = INT32_FLAG(file_tags_update_interval);
}
mPluginCfg["AgentHostID"] = STRING_FLAG(agent_host_id);
}

LogtailPlugin::~LogtailPlugin() {
Expand Down
1 change: 1 addition & 0 deletions core/plugin/processor/inner/ProcessorTagNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ bool ProcessorTagNative::Init(const Json::Value& config) {
return true;
}

// should keep same with Go addAllConfigurableTags
void ProcessorTagNative::Process(PipelineEventGroup& logGroup) {
AddTag(logGroup, TagKey::HOST_NAME_TAG_KEY, LoongCollectorMonitor::GetInstance()->mHostname);
auto entity = InstanceIdentity::Instance()->GetEntity();
Expand Down
6 changes: 3 additions & 3 deletions core/unittest/common/http/CurlUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ void CurlUnittest::TestFollowRedirect() {
APSARA_TEST_EQUAL(404, res.GetStatusCode());
}

// UNIT_TEST_CASE(CurlUnittest, TestSendHttpRequest)
// UNIT_TEST_CASE(CurlUnittest, TestCurlTLS)
// UNIT_TEST_CASE(CurlUnittest, TestFollowRedirect)
UNIT_TEST_CASE(CurlUnittest, TestSendHttpRequest)
UNIT_TEST_CASE(CurlUnittest, TestCurlTLS)
UNIT_TEST_CASE(CurlUnittest, TestFollowRedirect)

} // namespace logtail

Expand Down
53 changes: 29 additions & 24 deletions pkg/config/global_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,34 @@ type GlobalConfig struct {
DefaultLogQueueSize int
DefaultLogGroupQueueSize int
// Directory to store prometheus configuration file.
LoongcollectorPrometheusAuthorizationPath string
LoongCollectorPrometheusAuthorizationPath string
// Directory to store loongcollector data, such as checkpoint, etc.
LoongcollectorConfDir string
LoongCollectorConfDir string
// Directory to store loongcollector log config.
LoongcollectorLogConfDir string
LoongCollectorLogConfDir string
// Directory to store loongcollector log.
LoongcollectorLogDir string
LoongCollectorLogDir string
// Directory to store loongcollector data.
LoongcollectorDataDir string
LoongCollectorDataDir string
// Directory to store loongcollector debug data.
LoongcollectorDebugDir string
LoongCollectorDebugDir string
// Directory to store loongcollector third party data.
LoongcollectorThirdPartyDir string
LoongCollectorThirdPartyDir string
// Log name of loongcollector plugin.
LoongcollectorPluginLogName string
LoongCollectorPluginLogName string
// Tag of loongcollector version.
LoongcollectorVersionTag string
LoongCollectorVersionTag string
// Checkpoint dir name of loongcollector plugin.
LoongCollectorGoCheckPointDir string
// Checkpoint file name of loongcollector plugin.
LoongcollectorCheckPointFile string
LoongCollectorGoCheckPointFile string
// Network identification from loongcollector.
HostIP string
Hostname string
DelayStopSec int
FileTagsPath string
HostIP string
Hostname string
DelayStopSec int
FileTagsPath string
FileTagsInterval int
AgentHostID string

EnableTimestampNanosecond bool
UsingOldContentTag bool
Expand All @@ -78,16 +82,17 @@ func newGlobalConfig() (cfg GlobalConfig) {
FlushIntervalMs: 3000,
DefaultLogQueueSize: 1000,
DefaultLogGroupQueueSize: 4,
LoongcollectorConfDir: "./conf/",
LoongcollectorLogConfDir: "./conf/",
LoongcollectorLogDir: "./log/",
LoongcollectorPluginLogName: "go_plugin.LOG",
LoongcollectorVersionTag: "loongcollector_version",
LoongcollectorCheckPointFile: "go_plugin_checkpoint",
LoongcollectorDataDir: "./data/",
LoongcollectorDebugDir: "./debug/",
LoongcollectorThirdPartyDir: "./thirdparty/",
LoongcollectorPrometheusAuthorizationPath: "./conf/",
LoongCollectorConfDir: "./conf/",
LoongCollectorLogConfDir: "./conf/",
LoongCollectorLogDir: "./log/",
LoongCollectorPluginLogName: "go_plugin.LOG",
LoongCollectorVersionTag: "loongcollector_version",
LoongCollectorGoCheckPointDir: "./data/",
LoongCollectorGoCheckPointFile: "go_plugin_checkpoint",
LoongCollectorDataDir: "./data/",
LoongCollectorDebugDir: "./debug/",
LoongCollectorThirdPartyDir: "./thirdparty/",
LoongCollectorPrometheusAuthorizationPath: "./conf/",
DelayStopSec: 300,
}
return
Expand Down
6 changes: 3 additions & 3 deletions pkg/helper/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ type Dumper struct {

func (d *Dumper) Init() {
// 只有 service_http_server 插件会使用这个模块
_ = os.MkdirAll(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorDebugDir, "dump"), 0750)
_ = os.MkdirAll(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorDebugDir, "dump"), 0750)
d.input = make(chan *DumpData, 10)
d.stop = make(chan struct{})
files, err := GetFileListByPrefix(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorDebugDir, "dump"), d.prefix, true, 0)
files, err := GetFileListByPrefix(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorDebugDir, "dump"), d.prefix, true, 0)
if err != nil {
logger.Warning(context.Background(), "LIST_HISTORY_DUMP_ALARM", "err", err)
} else {
Expand Down Expand Up @@ -98,7 +98,7 @@ func (d *Dumper) doDumpFile() {
}
}
cutFile := func() (f *os.File, err error) {
nFile := path.Join(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorDebugDir, "dump"), fileName+"_"+time.Now().Format("2006-01-02_15"))
nFile := path.Join(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorDebugDir, "dump"), fileName+"_"+time.Now().Format("2006-01-02_15"))
if len(d.dumpDataKeepFiles) == 0 || d.dumpDataKeepFiles[len(d.dumpDataKeepFiles)-1] != nFile {
d.dumpDataKeepFiles = append(d.dumpDataKeepFiles, nFile)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/helper/dumper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
)

func TestServiceHTTP_doDumpFile(t *testing.T) {
_, err := os.Stat(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorDebugDir, "dump"))
_, err := os.Stat(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorDebugDir, "dump"))
if err == nil {
files, findErr := GetFileListByPrefix(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorDebugDir, "dump"), "custom", true, 0)
files, findErr := GetFileListByPrefix(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorDebugDir, "dump"), "custom", true, 0)
require.NoError(t, findErr)
for _, file := range files {
_ = os.Remove(file)
Expand Down
14 changes: 7 additions & 7 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func InitLogger() {

func InitTestLogger(options ...ConfigOption) {
once.Do(func() {
config.LoongcollectorGlobalConfig.LoongcollectorLogDir = "./"
config.LoongcollectorGlobalConfig.LoongcollectorConfDir = "./"
config.LoongcollectorGlobalConfig.LoongcollectorLogConfDir = "./"
config.LoongcollectorGlobalConfig.LoongCollectorLogDir = "./"
config.LoongcollectorGlobalConfig.LoongCollectorConfDir = "./"
config.LoongcollectorGlobalConfig.LoongCollectorLogConfDir = "./"
initTestLogger(options...)
catchStandardOutput()
})
Expand All @@ -124,7 +124,7 @@ func initNormalLogger() {
for _, option := range defaultProductionOptions {
option()
}
confDir := config.LoongcollectorGlobalConfig.LoongcollectorLogConfDir
confDir := config.LoongcollectorGlobalConfig.LoongCollectorLogConfDir
if _, err := os.Stat(confDir); os.IsNotExist(err) {
_ = os.MkdirAll(confDir, os.ModePerm)
}
Expand All @@ -141,7 +141,7 @@ func initTestLogger(options ...ConfigOption) {
for _, option := range options {
option()
}
setLogConf(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorLogConfDir, "plugin_logger.xml"))
setLogConf(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorLogConfDir, "plugin_logger.xml"))
}

func Debug(ctx context.Context, kvPairs ...interface{}) {
Expand Down Expand Up @@ -271,7 +271,7 @@ func Flush() {

func setLogConf(logConfig string) {
if !retainFlag {
_ = os.Remove(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorLogConfDir, "plugin_logger.xml"))
_ = os.Remove(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorLogConfDir, "plugin_logger.xml"))
}
debugFlag = 0
logtailLogger = seelog.Disabled
Expand Down Expand Up @@ -330,7 +330,7 @@ func generateDefaultConfig() string {
if memoryReceiverFlag {
memoryReceiverFlagStr = "<custom name=\"memory\" />"
}
return fmt.Sprintf(template, levelFlag, config.LoongcollectorGlobalConfig.LoongcollectorLogDir, config.LoongcollectorGlobalConfig.LoongcollectorPluginLogName, consoleStr, memoryReceiverFlagStr)
return fmt.Sprintf(template, levelFlag, config.LoongcollectorGlobalConfig.LoongCollectorLogDir, config.LoongcollectorGlobalConfig.LoongCollectorPluginLogName, consoleStr, memoryReceiverFlagStr)
}

// Close the logger and recover the stdout and stderr
Expand Down
14 changes: 7 additions & 7 deletions pkg/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func init() {
}

func clean() {
_ = os.Remove(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorLogConfDir, "plugin_logger.xml"))
_ = os.Remove(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorLogDir, config.LoongcollectorGlobalConfig.LoongcollectorPluginLogName))
_ = os.Remove(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorLogConfDir, "plugin_logger.xml"))
_ = os.Remove(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorLogDir, config.LoongcollectorGlobalConfig.LoongCollectorPluginLogName))
}

func readLog(index int) string {
bytes, _ := os.ReadFile(path.Join(config.LoongcollectorGlobalConfig.LoongcollectorLogDir, config.LoongcollectorGlobalConfig.LoongcollectorPluginLogName))
bytes, _ := os.ReadFile(path.Join(config.LoongcollectorGlobalConfig.LoongCollectorLogDir, config.LoongcollectorGlobalConfig.LoongCollectorPluginLogName))
logs := strings.Split(string(bytes), "\n")
if index > len(logs)-1 {
return ""
Expand Down Expand Up @@ -108,26 +108,26 @@ func Test_generateDefaultConfig(t *testing.T) {
}{
{
name: "production",
want: fmt.Sprintf(template, "info", config.LoongcollectorGlobalConfig.LoongcollectorLogDir, "", ""),
want: fmt.Sprintf(template, "info", config.LoongcollectorGlobalConfig.LoongCollectorLogDir, "", ""),
flagSetter: func() {},
},
{
name: "test-debug-level",
want: fmt.Sprintf(template, "debug", config.LoongcollectorGlobalConfig.LoongcollectorLogDir, "", ""),
want: fmt.Sprintf(template, "debug", config.LoongcollectorGlobalConfig.LoongCollectorLogDir, "", ""),
flagSetter: func() {
flag.Set(FlagLevelName, "debug")
},
},
{
name: "test-wrong-level",
want: fmt.Sprintf(template, "info", config.LoongcollectorGlobalConfig.LoongcollectorLogDir, "", ""),
want: fmt.Sprintf(template, "info", config.LoongcollectorGlobalConfig.LoongCollectorLogDir, "", ""),
flagSetter: func() {
flag.Set(FlagLevelName, "debug111")
},
},
{
name: "test-open-console",
want: fmt.Sprintf(template, "info", config.LoongcollectorGlobalConfig.LoongcollectorLogDir, "<console/>", ""),
want: fmt.Sprintf(template, "info", config.LoongcollectorGlobalConfig.LoongCollectorLogDir, "<console/>", ""),
flagSetter: func() {
flag.Set(FlagConsoleName, "true")
},
Expand Down
1 change: 1 addition & 0 deletions plugin_main/plugin_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func initPluginBase(cfgStr string) int {
initOnce.Do(func() {
LoadGlobalConfig(cfgStr)
InitHTTPServer()
pluginmanager.InitFileConfig(&config.LoongcollectorGlobalConfig)
setGCPercentForSlowStart()
logger.Info(context.Background(), "init plugin base, version", config.BaseVersion)
if *flags.DeployMode == flags.DeploySingleton && *flags.EnableKubernetesMeta {
Expand Down
4 changes: 2 additions & 2 deletions pluginmanager/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ func (p *checkPointManager) Init() error {
p.shutdown = make(chan struct{}, 1)
p.configCounter = make(map[string]int)
p.cleanThreshold = DefaultCleanThreshold
logtailDataDir := config.LoongcollectorGlobalConfig.LoongcollectorDataDir
logtailDataDir := config.LoongcollectorGlobalConfig.LoongCollectorGoCheckPointDir
pathExist, err := util.PathExists(logtailDataDir)
var dbPath string
if err == nil && pathExist {
if *CheckPointFile != "" {
dbPath = filepath.Join(logtailDataDir, *CheckPointFile)
} else {
dbPath = filepath.Join(logtailDataDir, config.LoongcollectorGlobalConfig.LoongcollectorCheckPointFile)
dbPath = filepath.Join(logtailDataDir, config.LoongcollectorGlobalConfig.LoongCollectorGoCheckPointFile)
}
} else {
// c++程序如果这个目录创建失败会直接exit,所以这里一般应该不会走进来
Expand Down
2 changes: 1 addition & 1 deletion pluginmanager/checkpoint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func MkdirDataDir() {
os.MkdirAll(config.LoongcollectorGlobalConfig.LoongcollectorDataDir, 0750)
os.MkdirAll(config.LoongcollectorGlobalConfig.LoongCollectorGoCheckPointDir, 0750)
}

func Test_checkPointManager_SaveGetCheckpoint(t *testing.T) {
Expand Down
Loading

0 comments on commit 183af3a

Please sign in to comment.