diff --git a/README.md b/README.md index 1f140137f7..554b5879d0 100644 --- a/README.md +++ b/README.md @@ -236,6 +236,7 @@ Human Name Extractor|Attempt to identify human names in fetched content.|Interna IBAN Number Extractor|Identify International Bank Account Numbers (IBANs) in any data.|Internal [Iknowwhatyoudownload.com](https://iknowwhatyoudownload.com/en/peer/)|Check iknowwhatyoudownload.com for IP addresses that have been using torrents.|Tiered API [IntelligenceX](https://intelx.io/)|Obtain information from IntelligenceX about identified IP addresses, domains, e-mail addresses and phone numbers.|Tiered API +[InternetDB](https://internetdb.shodan.io/)|Obtain information from InternetDB about identified IP addresses.|Free API Interesting File Finder|Identifies potential files of interest, e.g. office documents, zip files.|Internal [Internet Storm Center](https://isc.sans.edu)|Check if an IP address is malicious according to SANS ISC.|Free API [ipapi.co](https://ipapi.co/)|Queries ipapi.co to identify geolocation of IP Addresses using ipapi.co API|Tiered API diff --git a/modules/sfp_internetdb.py b/modules/sfp_internetdb.py new file mode 100644 index 0000000000..351eff885f --- /dev/null +++ b/modules/sfp_internetdb.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# ------------------------------------------------------------------------------- +# Name: sfp_internetdb +# Purpose: Search the InternetDB API for information about IP addresses. +# +# Author: Justin Sohl +# +# Created: 2/12/2023 +# Copyright: (c) Steve Micallef +# Licence: MIT +# ------------------------------------------------------------------------------- + +import json +import time + +from netaddr import IPNetwork + +from spiderfoot import SpiderFootEvent, SpiderFootPlugin + + +class sfp_internetdb(SpiderFootPlugin): + + meta = { + 'name': "InternetDB", + 'summary': "Obtain information from the InternetDB API about identified IP addresses.", + 'useCases': ["Footprint", "Investigate", "Passive"], + 'categories': ["Search Engines"], + 'dataSource': { + 'website': "https://internetdb.shodan.io/", + 'model': "FREE_NOAUTH_UNLIMITED", + 'references': [ + "https://internetdb.shodan.io/", + "https://internetdb.shodan.io/docs" + ], + 'favIcon': "https://static.shodan.io/shodan/img/favicon.png", + 'logo': "https://static.shodan.io/developer/img/logo.png", + 'description': "The InternetDB API provides a fast way to see the open ports for an IP address." + "It gives a quick, at-a-glance view of the type of device that is running behind an IP address to help you make decisions based on the open ports.", + } + } + + # Default options + opts = { + 'netblocklookup': True, + 'maxnetblock': 24 + } + + # Option descriptions + optdescs = { + 'netblocklookup': "Look up all IPs on netblocks deemed to be owned by your target for possible hosts on the same target subdomain/domain?", + 'maxnetblock': "If looking up owned netblocks, the maximum netblock size to look up all IPs within (CIDR value, 24 = /24, 16 = /16, etc.)" + } + + results = None + errorState = False + + def setup(self, sfc, userOpts=dict()): + self.sf = sfc + self.results = self.tempStorage() + + for opt in list(userOpts.keys()): + self.opts[opt] = userOpts[opt] + + # What events is this module interested in for input + def watchedEvents(self): + return ["IP_ADDRESS", "NETBLOCK_OWNER"] + + # What events this module produces + def producedEvents(self): + return ["INTERNET_NAME", + 'IP_ADDRESS', + 'RAW_RIR_DATA', + "TCP_PORT_OPEN", + 'VULNERABILITY_CVE_CRITICAL', + 'VULNERABILITY_CVE_HIGH', + 'VULNERABILITY_CVE_MEDIUM', + 'VULNERABILITY_CVE_LOW', + 'VULNERABILITY_GENERAL'] + + def queryHost(self, qry): + res = self.sf.fetchUrl( + f"https://internetdb.shodan.io/{qry}", + timeout=self.opts['_fetchtimeout'], + useragent="SpiderFoot" + ) + time.sleep(1) + + # InternetDB does not document a rate limit, but they might implement one + if res['code'] in ["403", "401"]: + self.error("You have exceeded usage limits.") + self.errorState = True + return None + + if res['code'] == "404" or res['content'] is None: + self.info(f"No InternetDB info found for {qry}") + return None + + if res['code'] == "422": + self.error(f"Validation error for {qry}") + + try: + r = json.loads(res['content']) + if "error" in r: + self.error(f"Error returned from InternetDB: {r['error']}") + return None + return r + except Exception as e: + self.error(f"Error processing JSON response from InternetDB: {e}") + return None + + return None + + # Handle events sent to this module + def handleEvent(self, event): + eventName = event.eventType + srcModuleName = event.module + eventData = event.data + + if self.errorState: + return + + self.debug(f"Received event, {eventName}, from {srcModuleName}") + + if eventData in self.results: + self.debug(f"Skipping {eventData}, already checked.") + return + + self.results[eventData] = True + + if eventName == 'NETBLOCK_OWNER': + if not self.opts['netblocklookup']: + return + max_netblock = self.opts['maxnetblock'] + if IPNetwork(eventData).prefixlen < max_netblock: + self.debug(f"Network size bigger than permitted: {IPNetwork(eventData).prefixlen} > {max_netblock}") + return + + qrylist = list() + if eventName.startswith("NETBLOCK_"): + for ipaddr in IPNetwork(eventData): + qrylist.append(str(ipaddr)) + self.results[str(ipaddr)] = True + else: + qrylist.append(eventData) + + for addr in qrylist: + rec = self.queryHost(addr) + if rec is None: + continue + + # For netblocks, we need to create the IP address event so that + # the threat intel event is more meaningful. + if eventName == 'NETBLOCK_OWNER': + pevent = SpiderFootEvent("IP_ADDRESS", addr, self.__name__, event) + self.notifyListeners(pevent) + else: + pevent = event + + evt = SpiderFootEvent("RAW_RIR_DATA", str(rec), self.__name__, pevent) + self.notifyListeners(evt) + + if self.checkForStop(): + return + + self.info(f"Found InternetDB data for {eventData}") + ports = rec.get('ports') + vulns = rec.get('vulns') + hostnames = rec.get('hostnames') + + for port in ports: + cp = addr + ":" + str(port) + # InternetDB does not specify TCP or UDP, however, there is no generic PORT_OPEN event. + evt = SpiderFootEvent("TCP_PORT_OPEN", cp, self.__name__, pevent) + self.notifyListeners(evt) + + for vuln in vulns: + etype, cvetext = self.sf.cveInfo(vuln) + evt = SpiderFootEvent(etype, cvetext, self.__name__, pevent) + self.notifyListeners(evt) + + for hostname in hostnames: + evt = SpiderFootEvent("INTERNET_NAME", hostname, self.__name__, pevent) + self.notifyListeners(evt) + +# End of sfp_internetdb class diff --git a/test/integration/modules/test_sfp_internetdb.py b/test/integration/modules/test_sfp_internetdb.py new file mode 100644 index 0000000000..68c6e2e55b --- /dev/null +++ b/test/integration/modules/test_sfp_internetdb.py @@ -0,0 +1,31 @@ +import pytest +import unittest + +from modules.sfp_internetdb import sfp_internetdb +from sflib import SpiderFoot +from spiderfoot import SpiderFootEvent, SpiderFootTarget + + +@pytest.mark.usefixtures +class TestModuleIntegrationInternetDB(unittest.TestCase): + + def test_handleEvent(self): + sf = SpiderFoot(self.default_options) + + module = sfp_internetdb() + module.setup(sf, dict()) + + target_value = 'example target value' + target_type = 'IP_ADDRESS' + target = SpiderFootTarget(target_value, target_type) + module.setTarget(target) + + event_type = 'ROOT' + event_data = 'example data' + event_module = '' + source_event = '' + evt = SpiderFootEvent(event_type, event_data, event_module, source_event) + + result = module.handleEvent(evt) + + self.assertIsNone(result) diff --git a/test/unit/modules/test_sfp_internetdb.py b/test/unit/modules/test_sfp_internetdb.py new file mode 100644 index 0000000000..1df7723b12 --- /dev/null +++ b/test/unit/modules/test_sfp_internetdb.py @@ -0,0 +1,26 @@ +import pytest +import unittest + +from modules.sfp_internetdb import sfp_internetdb +from sflib import SpiderFoot + + +@pytest.mark.usefixtures +class TestModuleInternetDB(unittest.TestCase): + + def test_opts(self): + module = sfp_internetdb() + self.assertEqual(len(module.opts), len(module.optdescs)) + + def test_setup(self): + sf = SpiderFoot(self.default_options) + module = sfp_internetdb() + module.setup(sf, dict()) + + def test_watchedEvents_should_return_list(self): + module = sfp_internetdb() + self.assertIsInstance(module.watchedEvents(), list) + + def test_producedEvents_should_return_list(self): + module = sfp_internetdb() + self.assertIsInstance(module.producedEvents(), list)