@@ -14,8 +14,11 @@ import (
1414 "github.com/tinylib/msgp/msgp"
1515)
1616
17- // Stream name for retina networkflow logs
18- const RetinaNetworkFlowLogsStreamName = "RETINA_NETWORK_FLOW_LOGS"
17+ // Stream names for network flow logs
18+ const (
19+ ContainerNetworkLogsStreamName = "CONTAINER_NETWORK_LOGS" // New stream name
20+ RetinaNetworkFlowLogsStreamName = "RETINA_NETWORK_FLOW_LOGS" // Legacy stream name
21+ )
1922
2023var (
2124 // retina networkflow logs stream tag name
@@ -75,10 +78,15 @@ func PostNetworkFlowRecords(tailPluginRecords []map[interface{}]interface{}) int
7578 }
7679
7780 if len (networkFlowLogsMsgPackEntries ) > 0 {
78- MdsdNetworkFlowLogsStreamTagName = getOutputStreamIdTag (RetinaNetworkFlowLogsStreamName , MdsdNetworkFlowLogsStreamTagName , & NetworkFlowTagRefreshTracker )
81+ // Try getting the stream tag with the new name first
82+ MdsdNetworkFlowLogsStreamTagName = getOutputStreamIdTag (ContainerNetworkLogsStreamName , MdsdNetworkFlowLogsStreamTagName , & NetworkFlowTagRefreshTracker )
7983 if MdsdNetworkFlowLogsStreamTagName == "" {
80- Log ("Error::mdsd::Failed to get stream tag for networkflow logs. Will retry ..." )
81- return output .FLB_RETRY
84+ // If new stream name fails, try the legacy stream name
85+ MdsdNetworkFlowLogsStreamTagName = getOutputStreamIdTag (RetinaNetworkFlowLogsStreamName , MdsdNetworkFlowLogsStreamTagName , & NetworkFlowTagRefreshTracker )
86+ if MdsdNetworkFlowLogsStreamTagName == "" {
87+ Log ("Error::mdsd::Failed to get stream tag for networkflow logs. Will retry ..." )
88+ return output .FLB_RETRY
89+ }
8290 }
8391 if MdsdNetworkFlowClient == nil {
8492 Log ("Error::mdsd::mdsd connection does not exist for networkflow mdsd client. re-connecting ..." )
@@ -286,12 +294,17 @@ func mapNetworkFlowLogsToDataMap(dataMap map[string]interface{}, record map[stri
286294 if traceObservationPoint := extractString (flow , "trace_observation_point" ); traceObservationPoint != "" {
287295 dataMap ["TraceObservationPoint" ] = traceObservationPoint
288296 }
289- // Packets and Bytes
290- if packetsSent , ok := flow ["packets_sent" ]; ok {
291- dataMap ["PacketsSent" ] = safeToInt (packetsSent )
292- }
293- if packetsReceived , ok := flow ["packets_received" ]; ok {
294- dataMap ["PacketsReceived" ] = safeToInt (packetsReceived )
297+ // Flow counts from extensions
298+ if extensions , ok := flow ["extensions" ].(map [string ]interface {}); ok {
299+ if ingressCount , ok := extensions ["ingress_flow_count" ]; ok {
300+ dataMap ["IngressFlowCount" ] = safeToInt (ingressCount )
301+ }
302+ if egressCount , ok := extensions ["egress_flow_count" ]; ok {
303+ dataMap ["EgressFlowCount" ] = safeToInt (egressCount )
304+ }
305+ if unknownCount , ok := extensions ["unknown_direction_flow_count" ]; ok {
306+ dataMap ["UnknownDirectionFlowCount" ] = safeToInt (unknownCount )
307+ }
295308 }
296309 // Policies
297310 policiesData := map [string ]interface {}{}
@@ -329,8 +342,17 @@ func mapNetworkFlowLogsToDataMap(dataMap map[string]interface{}, record map[stri
329342 if summary , ok := flow ["Summary" ]; ok {
330343 additionalData ["Summary" ] = summary
331344 }
332- if extensions , ok := flow ["extensions" ]; ok {
333- additionalData ["Extensions" ] = extensions
345+ if extensions , ok := flow ["extensions" ].(map [string ]interface {}); ok {
346+ // Create a new map without the flow count fields
347+ filteredExtensions := make (map [string ]interface {})
348+ for k , v := range extensions {
349+ if k != "ingress_flow_count" && k != "egress_flow_count" && k != "unknown_direction_flow_count" {
350+ filteredExtensions [k ] = v
351+ }
352+ }
353+ if len (filteredExtensions ) > 0 {
354+ additionalData ["Extensions" ] = filteredExtensions
355+ }
334356 }
335357 if len (additionalData ) > 0 {
336358 dataMap ["AdditionalFlowData" ] = serializeToJSON (additionalData )
0 commit comments