1818 FILESYSTEM_DISCOVERY_PARAM_MAPPING ,
1919 IGNORED_LNET_GROUPS ,
2020 IGNORED_STATS ,
21+ JOBID_TAG_PARAMS ,
2122 JOBSTATS_PARAMS ,
23+ TAGS_WITH_FILESYSTEM ,
2224 LustreParam ,
2325)
2426
2527RATE_UNITS : Set [str ] = {'locks/s' }
2628
2729
30+ class IgnoredFilesystemName (Exception ):
31+ pass
32+
33+
2834def _get_stat_type (suffix : str , unit : str ) -> str :
2935 """
3036 Returns the metric type for a given stat suffix and unit.
@@ -115,13 +121,13 @@ def check(self, _: Any) -> None:
115121 self .submit_changelogs (self .changelog_lines_per_check )
116122
117123 self .submit_device_health (self .devices )
118- self .submit_param_data (self .params , self . filesystems )
124+ self .submit_param_data (self .params )
119125 self .submit_lnet_stats_metrics ()
120126 self .submit_lnet_local_ni_metrics ()
121127 self .submit_lnet_peer_ni_metrics ()
122128
123129 if self .node_type in ('mds' , 'oss' ):
124- self .submit_jobstats_metrics (self . filesystems )
130+ self .submit_jobstats_metrics ()
125131
126132 def update (self ) -> None :
127133 '''
@@ -173,7 +179,7 @@ def _update_filesystems(self) -> None:
173179
174180 def _update_changelog_targets (self , devices : List [Dict [str , Any ]], filesystems : List [str ]) -> None :
175181 self .log .debug ('Determining changelog targets...' )
176- target_regex = [filesystem + r'-MDT\d\d\d\d' for filesystem in filesystems ]
182+ target_regex = [re . escape ( filesystem ) + r'-MDT\d\d\d\d' for filesystem in filesystems ]
177183 targets = []
178184 for device in devices :
179185 for regex in target_regex :
@@ -218,24 +224,41 @@ def _run_command(self, bin: str, *args: str, sudo: bool = False) -> str:
218224 self .log .error ('Failed to run command %s: %s' , cmd , e )
219225 return ''
220226
221- def submit_jobstats_metrics (self , filesystems : List [ str ] ) -> None :
227+ def submit_jobstats_metrics (self ) -> None :
222228 '''
223229 Submit the jobstats metrics to Datadog.
224230
225231 For more information, see: https://doc.lustre.org/lustre_manual.xhtml#jobstats
226232 '''
227- jobstats_params = self ._get_jobstats_params_list ()
228- for jobstats_param in jobstats_params :
229- device_name = jobstats_param .split ('.' )[1 ] # For example: lustre-MDT0000
230- if not any (device_name .startswith (fs ) for fs in filesystems ):
233+ jobstats_param : LustreParam | None = None
234+ for param in JOBSTATS_PARAMS :
235+ if self .node_type in param .node_types :
236+ jobstats_param = param
237+ break
238+ if jobstats_param is None :
239+ self .log .debug ('Invalid jobstats device_type: %s' , self .node_type )
240+ return
241+ param_names = self ._get_jobstats_params_list (jobstats_param )
242+ jobid_config_tags = [
243+ f'{ param .regex } :{ self ._run_command ("lctl" , "get_param" , "-ny" , param .regex , sudo = True ).strip ()} '
244+ for param in JOBID_TAG_PARAMS
245+ ]
246+ for param_name in param_names :
247+ try :
248+ tags = (
249+ self .tags
250+ + self ._extract_tags_from_param (jobstats_param .regex , param_name , jobstats_param .wildcards )
251+ + jobid_config_tags
252+ )
253+ except IgnoredFilesystemName :
231254 continue
232- jobstats_metrics = self ._get_jobstats_metrics (jobstats_param ).get ('job_stats' )
255+ jobstats_metrics = self ._get_jobstats_metrics (param_name ).get ('job_stats' )
233256 if jobstats_metrics is None :
234- self .log .debug ('No jobstats metrics found for %s' , jobstats_param )
257+ self .log .debug ('No jobstats metrics found for %s' , param_name )
235258 continue
236259 for job in jobstats_metrics :
237260 job_id = job .get ('job_id' , "unknown" )
238- tags = self . tags + [ f'device_name: { device_name } ' , f' job_id:{ job_id } ']
261+ tags . append ( f' job_id:{ job_id } ')
239262 for metric_name , metric_values in job .items ():
240263 if not isinstance (metric_values , dict ):
241264 continue
@@ -254,18 +277,10 @@ def _submit_jobstat(self, name: str, values: Dict[str, Any], tags: List[str]) ->
254277 metric_type = _get_stat_type (suffix , values ['unit' ])
255278 self ._submit (f'job_stats.{ name } .{ suffix } ' , value , metric_type , tags = tags )
256279
257- def _get_jobstats_params_list (self ) -> List [str ]:
280+ def _get_jobstats_params_list (self , param ) -> List [str ]:
258281 '''
259282 Get the jobstats params from the command line.
260283 '''
261- param = None
262- for jobstat_param in JOBSTATS_PARAMS :
263- if self .node_type in jobstat_param .node_types :
264- param = jobstat_param
265- break
266- if param is None :
267- self .log .debug ('Invalid jobstats device_type: %s' , self .node_type )
268- return []
269284 raw_params = self ._run_command ('lctl' , 'list_param' , param .regex , sudo = True )
270285 return [line .strip () for line in raw_params .splitlines () if line .strip ()]
271286
@@ -374,7 +389,7 @@ def _get_lnet_metrics(self, stats_type: str = 'stats') -> Dict[str, Any]:
374389 self .log .debug ('Could not get lnet %s, caught exception: %s' , stats_type , e )
375390 return {}
376391
377- def submit_param_data (self , params : Set [LustreParam ], filesystems : List [ str ] ) -> None :
392+ def submit_param_data (self , params : Set [LustreParam ]) -> None :
378393 '''
379394 Submit general stats and metrics from Lustre parameters.
380395 '''
@@ -384,11 +399,10 @@ def submit_param_data(self, params: Set[LustreParam], filesystems: List[str]) ->
384399 continue
385400 matched_params = self ._run_command ('lctl' , 'list_param' , param .regex , sudo = True )
386401 for param_name in matched_params .splitlines ():
387- tags = self .tags + self ._extract_tags_from_param (param .regex , param_name , param .wildcards )
388- if any (fs_tag in param .wildcards for fs_tag in ('device_name' , 'device_uuid' )):
389- if not any (fs in param_name for fs in filesystems ):
390- self .log .debug ('Skipping param %s as it did not match any filesystem' , param_name )
391- continue
402+ try :
403+ tags = self .tags + self ._extract_tags_from_param (param .regex , param_name , param .wildcards )
404+ except IgnoredFilesystemName :
405+ continue
392406 raw_stats = self ._run_command ('lctl' , 'get_param' , '-ny' , param_name , sudo = True )
393407 if not param .regex .endswith ('.stats' ):
394408 self ._submit_param (param .prefix , param_name , tags )
@@ -432,7 +446,8 @@ def _extract_tags_from_param(self, param_regex: str, param_name: str, wildcards:
432446 tags = []
433447 regex_parts = param_regex .split ('.' )
434448 param_parts = param_name .split ('.' )
435- wildcard_number = 0
449+ wildcard_generator = (wildcard for wildcard in wildcards )
450+ filesystem = None
436451 if not len (regex_parts ) == len (param_parts ):
437452438453 if len (regex_parts ) + 3 == len (param_parts ):
@@ -446,13 +461,20 @@ def _extract_tags_from_param(self, param_regex: str, param_name: str, wildcards:
446461 return tags
447462 for part_number , part in enumerate (regex_parts ):
448463 if part == '*' :
449- if wildcard_number >= len (wildcards ):
450- self .log .debug (
451- 'Found %s wildcards, which exceeds available wildcard tags %s' , wildcard_number , wildcards
452- )
464+ try :
465+ current_wildcard = next (wildcard_generator )
466+ current_part = param_parts [part_number ]
467+ tags .append (f'{ current_wildcard } :{ current_part } ' )
468+ if current_wildcard in TAGS_WITH_FILESYSTEM and filesystem is None :
469+ filesystem = current_part .split ('-' )[0 ]
470+ tags .append (f'filesystem:{ filesystem } ' )
471+ self .log .debug ('Determined filesystem as %s from parameter %s' , filesystem , param_name )
472+ if filesystem not in self .filesystems :
473+ self .log .debug ('Skipping param %s as it did not match any filesystem' , param_name )
474+ raise IgnoredFilesystemName
475+ except StopIteration :
476+ self .log .debug ('Number of found wildcards exceeds available wildcard tags %s' , wildcards )
453477 return tags
454- tags .append (f'{ wildcards [wildcard_number ]} :{ param_parts [part_number ]} ' )
455- wildcard_number += 1
456478 return tags
457479
458480 def _parse_stats (self , raw_stats : str ) -> Dict [str , Dict [str , Union [int , str ]]]:
0 commit comments