Skip to content

Observable Extractor improvement #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cortexutils/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def build_taxonomy(self, level, namespace, predicate, value):
'namespace': namespace,
'predicate': predicate,
'value': value
}
}

def summary(self, raw):
"""Returns a summary, needed for 'short.html' template. Overwrite it for your needs!
Expand Down
107 changes: 107 additions & 0 deletions cortexutils/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Extractor:
def __init__(self, ignore=None):
self.ignore = ignore
self.regex = self.__init_regex()
self.ftregex = self.__init_ft_regex()

@staticmethod
def __init_regex():
Expand Down Expand Up @@ -117,6 +118,87 @@ def __init_regex():

return regex

@staticmethod
def __init_ft_regex():

"""
Returns compiled full text regex list.

:return: List of {type, regex} dicts
:rtype: list
"""

#### Generic regexes

# IPv4
ftregex = [{
'types': ['ip'],
'regex': re.compile(r'(?:^|\D)((?:25[0-5]|2[0-4]\d|[1]\d\d|[1-9]\d|[0-9])\.(?:25[0-5]|2[0-4]\d|[1]\d\d|[1-9]\d|[0-9])\.(?:25[0-5]|2[0-4]\d|[1]\d\d|[1-9]\d|[0-9])\.(?:25[0-5]|2[0-4]\d|[1]\d\d|[1-9]\d|[0-9]))(?:\D|$)', re.MULTILINE)
}]

# URL
ftregex.append({
'types': ['url','fqdn','domain','uri_path'],
'regex': re.compile(r'((?:http|https):\/\/((?:(?:.*?)\.)?(.*?(?:\.\w+)+))\/?([a-zA-Z0-9\/\-\_\.\~\=\?]+\??)?)', re.MULTILINE)
})

# mail
ftregex.append({
'types': ['mail','domain'],
'regex': re.compile(r'((?:[a-zA-Z0-9\/\-\_\.\+]+)@{1}([a-zA-Z0-9\-\_]+\.[a-zA-Z0-9\-\_\.]+)+)', re.MULTILINE)
})

### Mail Specific regexes

return ftregex

@staticmethod
def __init_analyzer_regex():

"""
Returns False when the analyzer has no analyzer specific regexes.

:return: Empty list
:rtype: list
"""

empty_list = []

return empty_list

def __findftmatch(self, value):
"""Checks if the given value is contains regexes

:param value: The value to check
:type value: str or number
:return: Data type of value, if known, else empty string
:rtype: str
"""
self.found_observables = []
if isinstance(value, (str, unicode)):
self.regexpack = self.ftregex
for r in self.regexpack:
self.hits = re.findall(r.get('regex'), value)
if len(self.hits) > 0:
for found_observable in self.hits:
if isinstance(found_observable, tuple):
i = 0
for groups in found_observable:
self.found_observables.append({
'type': r.get('types')[i],
'value': found_observable[i]
})
i += 1
else:
self.found_observables.append({
'type': r.get('types')[0],
'value': found_observable
})
if len(self.found_observables) > 0:
return self.found_observables
else:
return ''

def __checktype(self, value):
"""Checks if the given value is a known datatype

Expand Down Expand Up @@ -167,6 +249,10 @@ def check_iterable(self, iterable):
'type': dt,
'value': iterable
})
#Check full text for regex matches
matches = self.__findftmatch(iterable)
if len(matches) > 0:
results.extend(matches)
elif isinstance(iterable, list):
for item in iterable:
if isinstance(item, list) or isinstance(item, dict):
Expand All @@ -178,6 +264,10 @@ def check_iterable(self, iterable):
'type': dt,
'value': item
})
#Check full text for regex matches
matches = self.__findftmatch(item)
if len(matches) > 0:
results.extend(matches)
elif isinstance(iterable, dict):
for _, item in iterable.items():
if isinstance(item, list) or isinstance(item, dict):
Expand All @@ -189,7 +279,24 @@ def check_iterable(self, iterable):
'type': dt,
'value': item
})
#Check full text for regex matches
matches = self.__findftmatch(item)
if len(matches) > 0:
results.extend(matches)
else:
raise TypeError('Not supported type.')

#Deduplicate results for a cleaner result
results = self.deduplicate(results)
return results

def deduplicate(self, list_of_objects):
dedup_list = []
for object in list_of_objects:
present = False
for new_object in dedup_list:
if object['type'] == new_object['type'] and object['value'] == new_object['value']:
present = True
if not present:
dedup_list.append(object)
return dedup_list