diff --git a/contrib/imhiredis/imhiredis.c b/contrib/imhiredis/imhiredis.c index 2681891e61..ce3893e26b 100644 --- a/contrib/imhiredis/imhiredis.c +++ b/contrib/imhiredis/imhiredis.c @@ -116,6 +116,8 @@ struct instanceConf_s { ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ uchar *pszBindRuleset; /* default name of Ruleset to bind to */ + prop_t *pInputName; /* the input name property */ + uchar *pszInputName; /* value of the intput name property */ redisContext *conn; redisAsyncContext *aconn; @@ -157,7 +159,7 @@ static struct imhiredisWrkrInfo_s { pthread_attr_t wrkrThrdAttr; /* Attribute for worker threads ; read only after startup */ static int activeHiredisworkers = 0; -static char *REDIS_REPLIES[] = { +static char const *REDIS_REPLIES[] = { "unknown", // 0 "string", // 1 "array", // 2 @@ -179,7 +181,6 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ static prop_t *pInputName = NULL; -/* there is only one global inputName for all messages generated by this input */ /* module-global parameters */ @@ -201,6 +202,7 @@ static struct cnfparamdescr inppdescr[] = { { "key", eCmdHdlrGetWord, CNFPARAM_REQUIRED }, { "uselpop", eCmdHdlrBinary, 0 }, { "ruleset", eCmdHdlrString, 0 }, + { "name", eCmdHdlrString, 0 }, { "stream.consumerGroup", eCmdHdlrGetWord, 0 }, { "stream.consumerName", eCmdHdlrGetWord, 0 }, { "stream.readFrom", eCmdHdlrGetWord, 0 }, @@ -233,6 +235,7 @@ struct timeval glblRedisCommandTimeout = { 5, 0 }; /* 5 seconds */ static void redisAsyncRecvCallback (redisAsyncContext __attribute__((unused)) *c, void *reply, void *inst_obj); static void redisAsyncConnectCallback (const redisAsyncContext *c, int status); static void redisAsyncDisconnectCallback (const redisAsyncContext *c, int status); +redisReply *getRole(redisContext *c); static struct json_object* _redisParseIntegerReply(const redisReply *reply); static struct json_object* _redisParseStringReply(const redisReply *reply); static struct json_object* _redisParseArrayReply(const redisReply *reply); @@ -249,10 +252,10 @@ rsRetVal redisAuthenticate(instanceConf_t *inst); #ifdef HIREDIS_SSL rsRetVal redisInitSSLContext(redisContext *conn, redisSSLContext *ssl_context); #endif -rsRetVal redisConnectSync(redisContext **conn, redisNode *node); +rsRetVal redisConnectSync(redisContext **conn, redisNode *node, instanceConf_t *inst); rsRetVal connectMasterSync(instanceConf_t *inst); static sbool isConnectedSync(instanceConf_t *inst); -rsRetVal redisConnectAsync(redisAsyncContext **aconn, redisNode *node); +rsRetVal redisConnectAsync(redisAsyncContext **aconn, redisNode *node, instanceConf_t *inst); rsRetVal connectMasterAsync(instanceConf_t *inst); static sbool isConnectedAsync(instanceConf_t *inst); rsRetVal redisDequeue(instanceConf_t *inst); @@ -296,6 +299,8 @@ createInstance(instanceConf_t **pinst) inst->streamConsumerACK = 1; inst->pszBindRuleset = NULL; inst->pBindRuleset = NULL; + inst->pszInputName = NULL; + inst->pInputName = NULL; inst->fieldList.nmemb = 0; #ifdef HIREDIS_SSL inst->use_tls = 0; @@ -344,28 +349,32 @@ checkInstance(instanceConf_t *const inst) /* check and print redis connection settings */ if (inst->redisNodesList->server != NULL && inst->redisNodesList->socketPath != NULL) { - LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis: both 'server' and 'socketPath' are given, " - "ignoring 'socketPath'."); + LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis[%s]: both 'server' and 'socketPath' are given, " + "ignoring 'socketPath'.", inst->pszInputName); free(inst->redisNodesList->socketPath); inst->redisNodesList->socketPath = NULL; } if(inst->redisNodesList->server != NULL && inst->redisNodesList->server[0] != '\0') { if (inst->redisNodesList->port == 0) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis: port not set, setting default 6379"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: port not set, setting default 6379", + inst->pszInputName); inst->redisNodesList->port = 6379; } - DBGPRINTF("imhiredis: preferred server is %s (%d)\n", + LogMsg(0, RS_RET_OK, LOG_INFO, "imhiredis[%s]: will use %s (%d) to connect", + inst->pszInputName, inst->redisNodesList->server, inst->redisNodesList->port); inst->redisNodesList->usesSocket = 0; } else if(inst->redisNodesList->socketPath != NULL && inst->redisNodesList->socketPath[0] != '\0') { - DBGPRINTF("imhiredis: preferred server is %s\n", + LogMsg(0, RS_RET_OK, LOG_INFO, "imhiredis[%s]: will use %s to connect", + inst->pszInputName, inst->redisNodesList->socketPath); inst->redisNodesList->usesSocket = 1; } else { - LogError(0, RS_RET_CONFIG_ERROR, "imhiredis: neither 'server' nor 'socketPath' are defined!"); + LogError(0, RS_RET_CONFIG_ERROR, "imhiredis[%s]: neither 'server' nor 'socketPath' are defined!", + inst->pszInputName); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } @@ -375,14 +384,23 @@ checkInstance(instanceConf_t *const inst) // Check and initialize SSL context if (inst->use_tls) { if((inst->client_cert == NULL) ^ (inst->client_key == NULL)){ - LogMsg(0, RS_RET_CONFIG_ERROR, LOG_ERR, "imhiredis: \"client_cert\" and \"client_key\" must be specified together!"); + LogMsg(0, RS_RET_CONFIG_ERROR, LOG_ERR, "imhiredis[%s]: \"client_cert\" and \"client_key\"" + " must be specified together!", + inst->pszInputName); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } - inst->ssl_conn = redisCreateSSLContext(inst->ca_cert_bundle, inst->ca_cert_dir, inst->client_cert, inst->client_key, inst->sni, &ssl_error); + inst->ssl_conn = redisCreateSSLContext( + inst->ca_cert_bundle, + inst->ca_cert_dir, + inst->client_cert, + inst->client_key, + inst->sni, &ssl_error); if (!inst->ssl_conn || ssl_error != REDIS_SSL_CTX_NONE) { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis: TLS configuration Error: %s", redisSSLContextGetError(ssl_error)); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: TLS configuration Error: %s", + inst->pszInputName, + redisSSLContextGetError(ssl_error)); if (inst->ssl_conn != NULL) { redisFreeSSLContext(inst->ssl_conn); inst->ssl_conn = NULL; @@ -393,57 +411,67 @@ checkInstance(instanceConf_t *const inst) #endif if (inst->mode < IMHIREDIS_MODE_QUEUE || inst->mode > IMHIREDIS_MODE_STREAM) { - LogError(0, RS_RET_CONFIG_ERROR, "imhiredis: invalid mode, please choose 'subscribe', " - "'queue' or 'stream' mode."); + LogError(0, RS_RET_CONFIG_ERROR, "imhiredis[%s]: invalid mode, please choose 'subscribe', " + "'queue' or 'stream' mode.", + inst->pszInputName); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if (inst->mode != IMHIREDIS_MODE_QUEUE && inst->useLPop) { - LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis: 'uselpop' set with mode != queue : ignored."); + LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis[%s]: 'uselpop' set with mode != queue : ignored.", + inst->pszInputName); } if (inst->mode == IMHIREDIS_MODE_STREAM) { if(inst->streamConsumerGroup != NULL && inst->streamConsumerName == NULL) { - LogError(0, RS_RET_CONFIG_ERROR, "imhiredis: invalid configuration, " - "please set a consumer name when mode is 'stream' and a consumer group is set"); + LogError(0, RS_RET_CONFIG_ERROR, "imhiredis[%s]: invalid configuration, " + "please set a consumer name when mode is 'stream' and a consumer group is set", + inst->pszInputName); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(inst->streamAutoclaimIdleTime != 0 && inst->streamConsumerGroup == NULL) { - LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis: 'stream.autoclaimIdleTime' " - "set with no consumer group set : ignored."); + LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis[%s]: 'stream.autoclaimIdleTime' " + "set with no consumer group set : ignored.", + inst->pszInputName); } if(inst->streamReadFrom[0] == '\0') { inst->streamReadFrom[0] = '$'; } } else { if (inst->streamConsumerGroup != NULL) { - LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis: 'stream.consumerGroup' " - "set with mode != stream : ignored."); + LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis[%s]: 'stream.consumerGroup' " + "set with mode != stream : ignored.", + inst->pszInputName); } if (inst->streamConsumerName != NULL) { - LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis: 'stream.consumerName' " - "set with mode != stream : ignored."); + LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis[%s]: 'stream.consumerName' " + "set with mode != stream : ignored.", + inst->pszInputName); } if (inst->streamAutoclaimIdleTime != 0) { - LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis: 'stream.autoclaimIdleTime' " - "set with mode != stream : ignored."); + LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis[%s]: 'stream.autoclaimIdleTime' " + "set with mode != stream : ignored.", + inst->pszInputName); } if (inst->streamConsumerACK == 0) { - LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis: 'stream.consumerACK' " - "disabled with mode != stream : ignored."); + LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis[%s]: 'stream.consumerACK' " + "disabled with mode != stream : ignored.", + inst->pszInputName); } if (inst->fieldList.nmemb > 0) { - LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis: 'fields' " - "unused for mode != stream : ignored."); + LogMsg(0, RS_RET_CONFIG_ERROR, LOG_WARNING,"imhiredis[%s]: 'fields' " + "unused for mode != stream : ignored.", + inst->pszInputName); } } if (inst->batchsize !=0 ) { - DBGPRINTF("imhiredis: batchsize is '%d'\n", inst->batchsize); + LogMsg(0, RS_RET_OK, LOG_INFO, + "imhiredis[%s]: batchsize is '%d'", inst->pszInputName, inst->batchsize); } else { LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, - "imhiredis: batchsize not set, setting to default (%d)",BATCH_SIZE); + "imhiredis[%s]: batchsize not set, setting to default (%d)",inst->pszInputName, BATCH_SIZE); inst->batchsize=BATCH_SIZE; } @@ -455,7 +483,8 @@ checkInstance(instanceConf_t *const inst) inst->currentNode = inst->redisNodesList; // search master node (should be either preferred node or its master) if (RS_RET_OK != redisActualizeCurrentNode(inst) || inst->currentNode == NULL) { - LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis: could not connect to a valid master!"); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis[%s]: could not find a valid master!", + inst->pszInputName); } finalize_it: @@ -466,8 +495,9 @@ checkInstance(instanceConf_t *const inst) static inline void std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst) { - LogError(0, NO_ERRCODE, "imhiredis: ruleset '%s' not found - " + LogError(0, NO_ERRCODE, "imhiredis[%s]: ruleset '%s' not found - " "using default ruleset instead", + inst->pszInputName, inst->pszBindRuleset); } @@ -499,6 +529,8 @@ CODESTARTnewInpInst inst->redisNodesList->socketPath = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "name")) { + inst->pszInputName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "port")) { inst->redisNodesList->port = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "password")) { @@ -587,29 +619,39 @@ CODESTARTnewInpInst } } + if (inst->pszInputName) { + CHKiRet(prop.Construct(&inst->pInputName)); + CHKiRet(prop.SetString(inst->pInputName, inst->pszInputName, ustrlen(inst->pszInputName))); + CHKiRet(prop.ConstructFinalize(inst->pInputName)); + } + DBGPRINTF("imhiredis: checking config sanity\n"); if (inst->modeDescription == NULL) { CHKmalloc(inst->modeDescription = (uchar*)strdup("subscribe")); inst->mode = IMHIREDIS_MODE_SUBSCRIBE; - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis: \"mode\" parameter not specified " - "using default redis 'subscribe' mode -- this may not be what you want!"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: \"mode\" parameter not specified " + "using default redis 'subscribe' mode -- this may not be what you want!", + inst->pszInputName); } if (inst->key == NULL) { - LogMsg(0, RS_RET_PARAM_ERROR, LOG_ERR, "imhiredis: \"key\" required parameter not specified!"); + LogMsg(0, RS_RET_PARAM_ERROR, LOG_ERR, "imhiredis[%s]: \"key\" required parameter not specified!", + inst->pszInputName); ABORT_FINALIZE(RS_RET_PARAM_ERROR); } if(inst->redisNodesList->server == NULL && inst->redisNodesList->socketPath == NULL) { CHKmalloc(inst->redisNodesList->server = (uchar *)strdup("127.0.0.1")); inst->redisNodesList->port = 6379; - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis: no server parameter specified " - "using default 127.0.0.1:6379 -- this may not be what you want!"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: no server parameter specified " + "using default 127.0.0.1:6379 -- this may not be what you want!", + inst->pszInputName); } if (inst->password == NULL) { - LogMsg(0, RS_RET_OK, LOG_INFO, "imhiredis: no password specified"); + LogMsg(0, RS_RET_OK, LOG_INFO, "imhiredis[%s]: no password specified", + inst->pszInputName); } - DBGPRINTF("imhiredis: newInpInst key=%s, mode=%s, uselpop=%d\n", - inst->key, inst->modeDescription, inst->useLPop); + DBGPRINTF("imhiredis[%s]: newInpInst key=%s, mode=%s, uselpop=%d\n", + inst->pszInputName, inst->key, inst->modeDescription, inst->useLPop); finalize_it: CODE_STD_FINALIZERnewInpInst @@ -697,7 +739,11 @@ CODESTARTfreeCnf if(inst->streamConsumerName != NULL) free(inst->streamConsumerName); free(inst->streamReadFrom); + if (inst->pInputName) { + prop.Destruct(&inst->pInputName); + } free(inst->pszBindRuleset); + free(inst->pszInputName); if(inst->fieldList.name != NULL) { for(int i = 0 ; i < inst->fieldList.nmemb ; ++i) { free(inst->fieldList.name[i]); @@ -789,7 +835,7 @@ CODESTARTrunInput } - DBGPRINTF("imhiredis: Starting %d imhiredis workerthreads\n", activeHiredisworkers); + LogMsg(0, RS_RET_OK, LOG_DEBUG, "imhiredis: Starting %d imhiredis threads", activeHiredisworkers); imhiredisWrkrInfo = calloc(activeHiredisworkers, sizeof(struct imhiredisWrkrInfo_s)); if (imhiredisWrkrInfo == NULL) { LogError(errno, RS_RET_OUT_OF_MEMORY, "imhiredis: worker-info array allocation failed."); @@ -814,7 +860,7 @@ CODESTARTrunInput */ srSleep(0, 500000); } - DBGPRINTF("imhiredis: terminating upon request of rsyslog core\n"); + LogMsg(0, RS_RET_OK, LOG_DEBUG, "imhiredis: terminating upon request of rsyslog core"); shutdownImhiredisWorkers(); finalize_it: @@ -917,7 +963,7 @@ static void redisAsyncRecvCallback (redisAsyncContext *aconn, void *reply, void The reply is cleaned up by hiredis after the callback returns. TODO We may have to change this function in the future to free replies. */ - instanceConf_t *const inst = (instanceConf_t *) aconn->data; + instanceConf_t *const inst = (instanceConf_t *)aconn->c.privdata; redisReply * r = (redisReply *) reply; if (r == NULL) return; @@ -933,17 +979,18 @@ static void redisAsyncRecvCallback (redisAsyncContext *aconn, void *reply, void /** * Asynchronous connection callback handler */ -static void redisAsyncConnectCallback (const redisAsyncContext *c, int status) { +static void redisAsyncConnectCallback (const redisAsyncContext *ac, int status) { + instanceConf_t *inst = (instanceConf_t *)ac->c.privdata; if (status != REDIS_OK) { - LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis (async): could not connect to redis: " - "%s", c->errstr); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis[%s]: could not connect to redis (async): %s", + inst->pszInputName, + ac->errstr); // remove async context from instance config object, still contained in context's 'data' field - instanceConf_t *inst = (instanceConf_t *) c->data; assert(inst != NULL); inst->aconn = NULL; return; } - DBGPRINTF("imhiredis (async): successfully connected!\n"); + LogMsg(0, RS_RET_OK, LOG_DEBUG, "imhiredis[%s]: successfully connected (async)!", inst->pszInputName); return; } @@ -952,21 +999,23 @@ static void redisAsyncConnectCallback (const redisAsyncContext *c, int status) { /** * Asynchronous disconnection callback handler */ -static void redisAsyncDisconnectCallback (const redisAsyncContext *c, int status) { +static void redisAsyncDisconnectCallback (const redisAsyncContext *ac, int status) { // remove context from instance config object (which is stored in the context 'data' field by us) // context will be freed by the library, so it's only set to NULL here - instanceConf_t *inst = (instanceConf_t *) c->data; + instanceConf_t *inst = (instanceConf_t *)ac->c.privdata; assert(inst != NULL); inst->aconn = NULL; inst->currentNode = NULL; if (status != REDIS_OK) { - LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis (async): got disconnected from redis: " - "%s", c->errstr); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis[%s]: got disconnected from redis (async): %s", + inst->pszInputName, + ac->errstr); return; } - DBGPRINTF("imhiredis (async): successfully disconnected!\n"); + LogMsg(0, RS_RET_OK, LOG_DEBUG, "imhiredis[%s]: successfully disconnected (async)!", + inst->pszInputName); return; } @@ -980,22 +1029,27 @@ static void redisAsyncDisconnectCallback (const redisAsyncContext *c, int status * returns a valid redisReply pointer if an array reply was received, NULL otherwise */ redisReply *getRole(redisContext *c) { + instanceConf_t *inst = (instanceConf_t *)c->privdata; redisReply *reply; assert(c != NULL); reply = redisCommand(c, "ROLE"); if (reply == NULL) { - DBGPRINTF("imhiredis: could not get reply from ROLE command\n"); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis[%s]: could not get reply from ROLE command: %s", + inst->pszInputName, + c->errstr); } else if (reply->type == REDIS_REPLY_ERROR) { - LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis got an error while querying role -> " - "%s\n", reply->str); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis[%s]: got an error while querying role -> %s", + inst->pszInputName, + reply->str); freeReplyObject(reply); reply = NULL; } else if (reply->type != REDIS_REPLY_ARRAY) { - LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis: did not get an array from ROLE command"); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis[%s]: did not get an array from ROLE command", + inst->pszInputName); freeReplyObject(reply); reply = NULL; } @@ -1079,7 +1133,12 @@ static rsRetVal enqMsg(instanceConf_t *const inst, const char *message, size_t m DBGPRINTF("imhiredis: enqMsg: Msg -> '%s'\n", message); CHKiRet(msgConstruct(&pMsg)); - MsgSetInputName(pMsg, pInputName); + if (inst->pInputName) { + MsgSetInputName(pMsg, inst->pInputName); + } + else { + MsgSetInputName(pMsg, pInputName); + } MsgSetRawMsg(pMsg, message, msgLen); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); MsgSetRuleset(pMsg, inst->pBindRuleset); @@ -1104,7 +1163,7 @@ static rsRetVal enqMsgJson(instanceConf_t *const inst, struct json_object *json, MsgSetMSGoffs(pMsg, 0); /* we do not have a header... */ if(RS_RET_OK != MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY)) LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, - "Could not set Flow Control on message."); + "imhiredis[%s]: Could not set Flow Control on message.", inst->pszInputName); if(inst->fieldList.nmemb != 0) { for (int i = 0; i < inst->fieldList.nmemb; i++) { @@ -1136,7 +1195,9 @@ static rsRetVal enqMsgJson(instanceConf_t *const inst, struct json_object *json, if(RS_RET_OK != msgAddJSON(pMsg, (uchar *)inst->fieldList.varname[i], tempJson, 0, 0)) { LogMsg(0, RS_RET_OBJ_CREATION_FAILED, LOG_ERR, - "Failed to add value to '%s'", inst->fieldList.varname[i]); + "imhiredis[%s]: Failed to add value to '%s'", + inst->pszInputName, + inst->fieldList.varname[i]); } tempJson = NULL; @@ -1144,18 +1205,18 @@ static rsRetVal enqMsgJson(instanceConf_t *const inst, struct json_object *json, } else { if(RS_RET_OK != msgAddJSON(pMsg, (uchar*)"!", json, 0, 0)) { LogMsg(0, RS_RET_OBJ_CREATION_FAILED, LOG_ERR, - "Failed to add json info to message!"); + "imhiredis[%s]: Failed to add json info to message!", inst->pszInputName); ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); } } if (metadata != NULL && RS_RET_OK != msgAddJSON(pMsg, (uchar*)".", metadata, 0, 0)) { LogMsg(0, RS_RET_OBJ_CREATION_FAILED, LOG_ERR, - "Failed to add metadata to message!"); + "imhiredis[%s]: Failed to add metadata to message!", inst->pszInputName); ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); } if(RS_RET_OK != submitMsg2(pMsg)) { LogMsg(0, RS_RET_OBJ_CREATION_FAILED, LOG_ERR, - "Failed to submit message to main queue!"); + "imhiredis[%s]: Failed to submit message to main queue!", inst->pszInputName); ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); } DBGPRINTF("enqMsgJson: message enqueued!\n"); @@ -1178,12 +1239,16 @@ rsRetVal redisAuthentSynchronous(redisContext *conn, uchar *password) { assert(password != NULL); assert(password[0] != '\0'); + instanceConf_t *inst = (instanceConf_t *)conn->privdata; + reply = (redisReply *) redisCommand(conn, "AUTH %s", password); if (reply == NULL) { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis: Could not authenticate!\n"); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: Could not authenticate!", inst->pszInputName); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } else if (strncmp(reply->str, "OK", 2)) { - LogError(0, RS_RET_REDIS_AUTH_FAILED, "imhiredis: Authentication failure -> %s\n", reply->str); + LogError(0, RS_RET_REDIS_AUTH_FAILED, "imhiredis[%s]: Authentication failure -> %s", + inst->pszInputName, + reply->str); ABORT_FINALIZE(RS_RET_REDIS_AUTH_FAILED); } @@ -1206,8 +1271,11 @@ rsRetVal redisAuthentAsynchronous(redisAsyncContext *aconn, uchar *password) { assert(password != NULL); assert(password[0] != '\0'); + instanceConf_t *inst = (instanceConf_t *)aconn->c.privdata; + if (REDIS_OK != redisAsyncCommand(aconn, NULL, NULL, "AUTH %s", password)) { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis: error while authenticating asynchronously -> %s\n", + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: error while authenticating asynchronously -> %s", + inst->pszInputName, aconn->errstr); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -1234,7 +1302,7 @@ rsRetVal redisGetServersList(redisNode *node, instanceConf_t *inst, redisNode ** assert(node != NULL); - CHKiRet(redisConnectSync(&context, node)); + CHKiRet(redisConnectSync(&context, node, inst)); #ifdef HIREDIS_SSL if(inst->use_tls && inst->ssl_conn){ @@ -1249,7 +1317,8 @@ rsRetVal redisGetServersList(redisNode *node, instanceConf_t *inst, redisNode ** reply = getRole(context); if(reply == NULL) { - LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis: did not get the role of the server"); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis[%s]: did not get the role of the server", + inst->pszInputName); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -1415,9 +1484,10 @@ rsRetVal redisAuthenticate(instanceConf_t *inst) { // Create a temporary context for synchronous connection, used to validate AUTH command in asynchronous contexts if (inst->mode == IMHIREDIS_MODE_SUBSCRIBE) { - if (RS_RET_OK != redisConnectSync(&usedContext, inst->currentNode)) { - LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis: could not connect to current " - "active node synchronously to validate authentication"); + if (RS_RET_OK != redisConnectSync(&usedContext, inst->currentNode, inst)) { + LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis[%s]: could not connect to current " + "active node synchronously to validate authentication", + inst->pszInputName); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } #ifdef HIREDIS_SSL @@ -1457,8 +1527,12 @@ rsRetVal redisInitSSLContext(redisContext *conn, redisSSLContext *ssl_context) { assert(conn != NULL); assert(ssl_context != NULL); + instanceConf_t *inst = (instanceConf_t *)conn->privdata; + if (redisInitiateSSLWithContext(conn, ssl_context) != REDIS_OK) { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis error while initializing TLs context: %s", conn->errstr); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: error while initializing TLs context: %s", + inst->pszInputName, + conn->errstr); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } finalize_it: @@ -1471,14 +1545,16 @@ rsRetVal redisInitSSLContext(redisContext *conn, redisSSLContext *ssl_context) { * connection function for synchronous (queue) mode * node should not be NULL */ -rsRetVal redisConnectSync(redisContext **conn, redisNode *node) { +rsRetVal redisConnectSync(redisContext **conn, redisNode *node, instanceConf_t *inst) { DEFiRet; redisOptions options = {0}; + assert(node != NULL); + options.connect_timeout = &glblRedisConnectTimeout; options.command_timeout = &glblRedisCommandTimeout; - assert(node != NULL); + REDIS_OPTIONS_SET_PRIVDATA(&options, (void *)inst, NULL); if (node->usesSocket) REDIS_OPTIONS_SET_UNIX(&options, (const char *)node->socketPath); @@ -1489,21 +1565,33 @@ rsRetVal redisConnectSync(redisContext **conn, redisNode *node) { if (*conn == NULL) { if (node->usesSocket) { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis: can not connect to redis server '%s' " - "-> could not allocate context!\n", node->socketPath); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: can not connect to redis server '%s' " + "-> could not allocate context!", + inst->pszInputName, + node->socketPath); } else { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis: can not connect to redis server '%s', " - "port %d -> could not allocate context!\n", node->server, node->port); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: can not connect to redis server '%s', " + "port %d -> could not allocate context!", + inst->pszInputName, + node->server, + node->port); } ABORT_FINALIZE(RS_RET_REDIS_ERROR); } else if ((*conn)->err) { if (node->usesSocket) { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis: can not connect to redis server '%s' " - "-> %s\n", node->socketPath, (*conn)->errstr); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: can not connect to redis server '%s' " + "-> %s", + inst->pszInputName, + node->socketPath, + (*conn)->errstr); } else { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis: can not connect to redis server '%s', " - "port %d -> %s\n", node->server, node->port, (*conn)->errstr); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: can not connect to redis server '%s', " + "port %d -> %s", + inst->pszInputName, + node->server, + node->port, + (*conn)->errstr); } ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -1522,13 +1610,15 @@ rsRetVal redisConnectSync(redisContext **conn, redisNode *node) { * connection function for asynchronous (subscribe) mode * node should not be NULL */ -rsRetVal redisConnectAsync(redisAsyncContext **aconn, redisNode *node) { +rsRetVal redisConnectAsync(redisAsyncContext **aconn, redisNode *node, instanceConf_t *inst) { DEFiRet; redisOptions options = {0}; options.connect_timeout = &glblRedisConnectTimeout; options.command_timeout = &glblRedisCommandTimeout; + REDIS_OPTIONS_SET_PRIVDATA(&options, (void *)inst, NULL); + assert(node != NULL); if (node->usesSocket) @@ -1539,15 +1629,22 @@ rsRetVal redisConnectAsync(redisAsyncContext **aconn, redisNode *node) { *aconn = redisAsyncConnectWithOptions(&options); if(*aconn == NULL) { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis (async): could not allocate context!\n"); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: could not allocate context (async)!", + inst->pszInputName); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } else if ((*aconn)->err) { if (node->usesSocket) { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis (async): cannot connect to server '%s' " - "-> %s\n", node->socketPath, (*aconn)->errstr); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: cannot connect to server (async) '%s' -> %s", + inst->pszInputName, + node->socketPath, + (*aconn)->errstr); } else { - LogError(0, RS_RET_REDIS_ERROR, "imhiredis (async): cannot connect to server '%s', port '%d' " - "-> %s\n", node->server, node->port, (*aconn)->errstr); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: cannot connect to server '%s', port '%d'" + " (async) -> %s", + inst->pszInputName, + node->server, + node->port, + (*aconn)->errstr); } ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -1568,7 +1665,7 @@ rsRetVal redisConnectAsync(redisAsyncContext **aconn, redisNode *node) { rsRetVal connectMasterAsync(instanceConf_t *inst) { DEFiRet; - if(RS_RET_OK != redisConnectAsync(&(inst->aconn), inst->currentNode)) { + if(RS_RET_OK != redisConnectAsync(&(inst->aconn), inst->currentNode, inst)) { inst->currentNode = NULL; ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -1595,8 +1692,7 @@ rsRetVal connectMasterAsync(instanceConf_t *inst) { ABORT_FINALIZE(RS_RET_REDIS_AUTH_FAILED); } - // finalize context creation - inst->aconn->data = (void *)inst; + // finalize async context creation redisAsyncSetConnectCallback(inst->aconn, redisAsyncConnectCallback); redisAsyncSetDisconnectCallback(inst->aconn, redisAsyncDisconnectCallback); redisLibeventAttach(inst->aconn, inst->evtBase); @@ -1621,7 +1717,7 @@ static sbool isConnectedAsync(instanceConf_t *inst) { rsRetVal connectMasterSync(instanceConf_t *inst) { DEFiRet; - if(RS_RET_OK != redisConnectSync(&(inst->conn), inst->currentNode)) { + if(RS_RET_OK != redisConnectSync(&(inst->conn), inst->currentNode, inst)) { inst->currentNode = NULL; ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -1696,8 +1792,12 @@ rsRetVal redisDequeue(instanceConf_t *inst) { do { if (REDIS_OK != redisGetReply(inst->conn, (void **) &reply)) { // error getting reply, must stop - LogError(0, RS_RET_REDIS_ERROR, "redisDequeue: Error reading reply after POP #%d " - "on key '%s'", (inst->batchsize - i), inst->key); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: Error reading reply after POP #%d " + "on key '%s' -> %s", + inst->pszInputName, + (inst->batchsize - i), + inst->key, + inst->conn->errstr); // close connection redisFree(inst->conn); inst->currentNode = NULL; @@ -1715,18 +1815,24 @@ rsRetVal redisDequeue(instanceConf_t *inst) { break; case REDIS_REPLY_ERROR: // There is a problem with the key or the Redis instance - LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "redisDequeue: error " - "while POP'ing key '%s' -> %s", inst->key, reply->str); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis[%s]: error " + "while POP'ing key '%s' -> %s", + inst->pszInputName, + inst->key, + reply->str); ABORT_FINALIZE(RS_RET_REDIS_ERROR); default: - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "redisDequeue: " - "unexpected reply type: %s", REDIS_REPLIES[replyType%15]); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: " + "unexpected reply type: %s", + inst->pszInputName, + REDIS_REPLIES[replyType%15]); } freeReplyObject(reply); reply = NULL; } else { /* reply == NULL */ - LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "redisDequeue: unexpected empty reply " - "for successful return"); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis[%s]: unexpected empty reply " + "for successful return", + inst->pszInputName); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } } @@ -1786,8 +1892,9 @@ rsRetVal ensureConsumerGroupCreated(instanceConf_t *inst) { inst->streamConsumerGroup, inst->key); } else { - LogError(0, RS_RET_ERR, "ensureConsumerGroupCreated: An unknown error " + LogError(0, RS_RET_ERR, "imhiredis[%s]: An unknown error " "occurred while creating a Consumer group %s on stream %s -> %s", + inst->pszInputName, inst->streamConsumerGroup, inst->key, reply->str); @@ -1795,14 +1902,17 @@ rsRetVal ensureConsumerGroupCreated(instanceConf_t *inst) { } break; default: - LogError(0, RS_RET_ERR, "ensureConsumerGroupCreated: An unknown reply was received " - "-> %s", REDIS_REPLIES[(reply->type)%15]); + LogError(0, RS_RET_ERR, "imhiredis[%s]: An unknown reply was received " + "while creating a consumer group -> %s", + inst->pszInputName, + REDIS_REPLIES[(reply->type)%15]); ABORT_FINALIZE(RS_RET_ERR); } } else { - LogError(0, RS_RET_REDIS_ERROR, "ensureConsumerGroupCreated: Could not create group %s on stream %s!", - inst->streamConsumerGroup, - inst->key); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: Could not create group %s on stream %s!", + inst->pszInputName, + inst->streamConsumerGroup, + inst->key); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -1836,24 +1946,28 @@ rsRetVal ackStreamIndex(instanceConf_t *inst, uchar *stream, uchar *group, uchar } break; case REDIS_REPLY_ERROR: - LogError(0, RS_RET_ERR, "ackStreamIndex: An error occurred " + LogError(0, RS_RET_ERR, "imhiredis[%s]: An error occurred " "while trying to ACK message %s on %s[%s] -> %s", + inst->pszInputName, index, stream, group, reply->str); ABORT_FINALIZE(RS_RET_ERR); default: - LogError(0, RS_RET_ERR, "ackStreamIndex: unexpected reply type: %s", - REDIS_REPLIES[(reply->type)%15]); + LogError(0, RS_RET_ERR, "imhiredis[%s]: unexpected reply type " + "while aknowledging message: %s", + inst->pszInputName, + REDIS_REPLIES[(reply->type)%15]); ABORT_FINALIZE(RS_RET_ERR); } } else { - LogError(0, RS_RET_REDIS_ERROR, "ackStreamIndex: Could not ACK message with index %s for %s[%s]!", - index, - stream, - group); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: Could not ACK message with index %s for %s[%s]!", + inst->pszInputName, + index, + stream, + group); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -1946,7 +2060,8 @@ static rsRetVal handleRedisXREADReply(instanceConf_t *const inst, const redisRep if(reply == NULL || reply->type != REDIS_REPLY_ARRAY) { /* we do not process empty or non-ARRAY lines */ DBGPRINTF("handleRedisXREADReply: object is not an array, ignoring\n"); - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXREADReply: object is not an array, ignoring"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: Reply is not an array, ignoring", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } else { // iterating on streams @@ -1954,21 +2069,24 @@ static rsRetVal handleRedisXREADReply(instanceConf_t *const inst, const redisRep streamObj = reply->element[i]; // object should contain the name of the stream, and an array containing the messages if(streamObj->type != REDIS_REPLY_ARRAY || streamObj->elements != 2) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXREADReply: wrong object format, " - "object should contain the name of the stream and an array of messages"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: wrong object format, " + "object should contain the name of the stream and an array of messages", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } if(streamObj->element[0]->type != REDIS_REPLY_STRING) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXREADReply: wrong field format, " - "first entry is not a string (supposed to be stream name)"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: wrong field format, " + "first entry is not a string (supposed to be stream name)", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } msgList = streamObj->element[1]; if(msgList->type != REDIS_REPLY_ARRAY) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXREADReply: wrong field format, " - "second entry is not an array (supposed to be list of messages for stream)"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: wrong field format, " + "second entry is not an array (supposed to be list of messages for stream)", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } @@ -1979,19 +2097,22 @@ static rsRetVal handleRedisXREADReply(instanceConf_t *const inst, const redisRep msgObj = msgList->element[j]; // Object should contain the name of the index, and its content(s) if(msgObj->type != REDIS_REPLY_ARRAY || msgObj->elements != 2) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXREADReply: wrong object " - "format, object should contain the index and its content(s)"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: wrong object " + "format, object should contain the index and its content(s)", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } if(msgObj->element[0]->type != REDIS_REPLY_STRING) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXREADReply: wrong field " - "format, first entry should be a string (index name)"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: wrong field " + "format, first entry should be a string (index name)", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } if(msgObj->type != REDIS_REPLY_ARRAY) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXREADReply: wrong field " - "format, second entry should be an array (index content(s))"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: wrong field " + "format, second entry should be an array (index content(s))", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } @@ -2051,21 +2172,24 @@ static rsRetVal handleRedisXAUTOCLAIMReply( } else { // Object should contain between 2 and 3 elements (depends on Redis server version) if(reply->elements < 2 || reply->elements > 3) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXAUTOCLAIMReply: wrong number of fields, " - "cannot process entry"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: wrong number of fields, " + "cannot process entry", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } if(reply->element[0]->type != REDIS_REPLY_STRING) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXAUTOCLAIMReply: the first element " - "is not a string, cannot process entry"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: the first element " + "is not a string, cannot process entry", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } msgList = reply->element[1]; if(msgList->type != REDIS_REPLY_ARRAY) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXAUTOCLAIMReply: the second element " - "is not an array, cannot process entry"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: the second element " + "is not an array, cannot process entry", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } @@ -2075,19 +2199,22 @@ static rsRetVal handleRedisXAUTOCLAIMReply( msgObj = msgList->element[j]; // Object should contain the name of the index, and its content(s) if(msgObj->type != REDIS_REPLY_ARRAY || msgObj->elements != 2) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXAUTOCLAIMReply: wrong message " - "format, cannot process"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: wrong message " + "format, cannot process", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } if(msgObj->element[0]->type != REDIS_REPLY_STRING) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXAUTOCLAIMReply: first message " - "element not a string, cannot process"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: first message " + "element not a string, cannot process", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } if(msgObj->type != REDIS_REPLY_ARRAY) { - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "handleRedisXAUTOCLAIMReply: second message " - "element not an array, cannot process"); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: second message " + "element not an array, cannot process", + inst->pszInputName); ABORT_FINALIZE(RS_RET_OK_WARN); } @@ -2131,8 +2258,9 @@ rsRetVal redisStreamRead(instanceConf_t *inst) { CHKmalloc(autoclaimIndex = calloc(1, STREAM_INDEX_STR_MAXLEN)); // Cannot claim from '$', will have to claim from the beginning of the stream if(inst->streamReadFrom[0] == '$') { - LogMsg(0, RS_RET_OK, LOG_WARNING, "Cannot claim pending entries from '$', " - "will have to claim from the beginning of the stream"); + LogMsg(0, RS_RET_OK, LOG_WARNING, "imhiredis[%s]: Cannot claim pending entries from '$', " + "will have to claim from the beginning of the stream", + inst->pszInputName); memcpy(autoclaimIndex, "0-0", 4); } else { memcpy(autoclaimIndex, inst->streamReadFrom, STREAM_INDEX_STR_MAXLEN); @@ -2172,8 +2300,9 @@ rsRetVal redisStreamRead(instanceConf_t *inst) { } } if(reply == NULL) { - LogError(0, RS_RET_REDIS_ERROR, "redisStreamRead: Error while trying to read stream '%s'", - inst->key); + LogError(0, RS_RET_REDIS_ERROR, "imhiredis[%s]: Error while trying to read stream '%s'", + inst->pszInputName, + inst->key); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -2198,13 +2327,17 @@ rsRetVal redisStreamRead(instanceConf_t *inst) { break; case REDIS_REPLY_ERROR: // There is a problem with the key or the Redis instance - LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "redisStreamRead: error " - "while reading stream(s) -> %s", reply->str); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis[%s]: error " + "while reading stream(s) -> %s", + inst->pszInputName, + reply->str); srSleep(1, 0); ABORT_FINALIZE(RS_RET_REDIS_ERROR); default: - LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "redisStreamRead: unexpected " - "reply type: %s", REDIS_REPLIES[replyType%15]); + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imhiredis[%s]: unexpected " + "reply type: %s", + inst->pszInputName, + REDIS_REPLIES[replyType%15]); } freeReplyObject(reply); reply = NULL; @@ -2243,7 +2376,7 @@ rsRetVal redisSubscribe(instanceConf_t *inst) { inst->key); if (ret != REDIS_OK) { - LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "redisSubscribe: Could not subscribe"); + LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis[%s]: Could not subscribe", inst->pszInputName); ABORT_FINALIZE(RS_RET_REDIS_ERROR); } @@ -2267,7 +2400,8 @@ void workerLoop(struct imhiredisWrkrInfo_s *me) { if (me->inst->currentNode != NULL) { rsRetVal ret = me->fnConnectMaster(me->inst); if(ret != RS_RET_OK) { - LogMsg(0, ret, LOG_WARNING, "workerLoop: Could not connect successfully to master"); + LogMsg(0, ret, LOG_WARNING, "imhiredis[%s]: Could not connect successfully to master", + me->inst->pszInputName); } } @@ -2277,8 +2411,9 @@ void workerLoop(struct imhiredisWrkrInfo_s *me) { * Sleep 10 seconds before attempting to resume a broken connexion * (sleep small amounts to avoid missing termination status) */ - LogMsg(0, RS_RET_OK, LOG_INFO, "workerLoop: " - "no valid connection, sleeping 10 seconds before retrying..."); + LogMsg(0, RS_RET_OK, LOG_INFO, "imhiredis[%s]: " + "no valid connection, sleeping 10 seconds before retrying...", + me->inst->pszInputName); for(i = 0; i < 100; i++) { // Rsyslog asked for shutdown, thread should be stopped if (glbl.GetGlobalInputTermState() != 0) @@ -2297,8 +2432,9 @@ void workerLoop(struct imhiredisWrkrInfo_s *me) { if (me->inst->currentNode != NULL) { rsRetVal ret = me->fnConnectMaster(me->inst); if(ret != RS_RET_OK) { - LogMsg(0, ret, LOG_WARNING, "workerLoop: " - "Could not connect successfully to master"); + LogMsg(0, ret, LOG_WARNING, "imhiredis[%s]: " + "Could not connect successfully to master", + me->inst->pszInputName); } } } diff --git a/tests/imhiredis-logs-name.sh b/tests/imhiredis-logs-name.sh new file mode 100755 index 0000000000..97a42c3bde --- /dev/null +++ b/tests/imhiredis-logs-name.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# added 2025-11-20 by Théo Bertin, released under ASL 2.0 +## Uncomment for debugging +#export RS_REDIR=-d + +. ${srcdir:=.}/diag.sh init + +start_redis + +generate_conf +add_conf ' +global(localhostname="server") +module(load="../contrib/imhiredis/.libs/imhiredis") + +template(name="outfmt" type="string" string="%$/num% %msg%\n") + +input(type="imhiredis" + server="127.0.0.1" + port="'$REDIS_RANDOM_PORT'" + key="mykey" + mode="queue" + name="test-imhiredis-queue" + ruleset="redis") + +ruleset(name="redis") { + set $/num = cnum($/num + 1); + action(type="omfile" + file="'$RSYSLOG_OUT_LOG'" + template="outfmt") +} + +action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt") +' +startup +shutdown_when_empty +wait_shutdown + +stop_redis + +content_check "imhiredis[test-imhiredis-queue]: " +check_not_present "imhiredis: " +check_not_present "imhiredis[(null)]: " + +# Removes generated configuration file, log and pid files +cleanup_redis + +exit_test diff --git a/tests/imhiredis-logs-no-name.sh b/tests/imhiredis-logs-no-name.sh new file mode 100755 index 0000000000..494a4ea7be --- /dev/null +++ b/tests/imhiredis-logs-no-name.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +# added 2025-11-20 by Théo Bertin, released under ASL 2.0 +## Uncomment for debugging +#export RS_REDIR=-d + +. ${srcdir:=.}/diag.sh init + +start_redis + +generate_conf +add_conf ' +global(localhostname="server") +module(load="../contrib/imhiredis/.libs/imhiredis") + +template(name="outfmt" type="string" string="%$/num% %msg%\n") + +input(type="imhiredis" + server="127.0.0.1" + port="'$REDIS_RANDOM_PORT'" + key="mykey" + mode="queue" + ruleset="redis") + +ruleset(name="redis") { + set $/num = cnum($/num + 1); + action(type="omfile" + file="'$RSYSLOG_OUT_LOG'" + template="outfmt") +} + +action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt") +' +startup +shutdown_when_empty +wait_shutdown + +stop_redis + +content_check "imhiredis[(null)]: " +check_not_present "imhiredis: " + +# Removes generated configuration file, log and pid files +cleanup_redis + +exit_test