diff --git a/kmip/core/config_helper.py b/kmip/core/config_helper.py index a204666d..87def84a 100644 --- a/kmip/core/config_helper.py +++ b/kmip/core/config_helper.py @@ -38,7 +38,7 @@ class ConfigHelper(object): FILE_PATH, '../demos/certs/server.key')) DEFAULT_CA_CERTS = os.path.normpath(os.path.join( FILE_PATH, '../demos/certs/server.crt')) - DEFAULT_SSL_VERSION = 'PROTOCOL_SSLv23' + DEFAULT_TLS_CLIENT = 'PROTOCOL_TLS_CLIENT' DEFAULT_USERNAME = None DEFAULT_PASSWORD = None diff --git a/kmip/services/auth.py b/kmip/services/auth.py index a53af297..7cad6687 100644 --- a/kmip/services/auth.py +++ b/kmip/services/auth.py @@ -232,3 +232,61 @@ def __init__(self, cipher_suites=None): """ super(TLS12AuthenticationSuite, self).__init__(cipher_suites) self._protocol = ssl.PROTOCOL_TLSv1_2 + + +class ClientAuthenticationSuite(AuthenticationSuite): + """ + An authentication suite used to establish secure network connections. + + Supports TLS 1.3. + https://docs.openssl.org/3.3/man1/openssl-ciphers/#tls-v13-cipher-suites. + """ + + _default_cipher_suites = [ + 'TLS_AES_128_GCM_SHA256', + 'TLS_AES_256_GCM_SHA384', + 'TLS_CHACHA20_POLY1305_SHA256', + 'TLS_AES_128_CCM_SHA256', + 'TLS_AES_128_CCM_8_SHA256' + ] + + def __init__(self, cipher_suites=None): + """ + Create a ClientAuthenticationSuite object. + + Args: + cipher_suites (list): A list of strings representing the names of + cipher suites to use. Overrides the default set of cipher + suites. Optional, defaults to None. + """ + super(ClientAuthenticationSuite, self).__init__(cipher_suites) + self._protocol = ssl.PROTOCOL_TLS_CLIENT + + +class ServerAuthenticationSuite(AuthenticationSuite): + """ + An authentication suite used to establish secure network connections. + + Supports TLS 1.3. + https://docs.openssl.org/3.3/man1/openssl-ciphers/#tls-v13-cipher-suites. + """ + + _default_cipher_suites = [ + 'TLS_AES_128_GCM_SHA256', + 'TLS_AES_256_GCM_SHA384', + 'TLS_CHACHA20_POLY1305_SHA256', + 'TLS_AES_128_CCM_SHA256', + 'TLS_AES_128_CCM_8_SHA256' + ] + + def __init__(self, cipher_suites=None): + """ + Create a ServerAuthenticationSuite object. + + Args: + cipher_suites (list): A list of strings representing the names of + cipher suites to use. Overrides the default set of cipher + suites. Optional, defaults to None. + """ + super(ServerAuthenticationSuite, self).__init__(cipher_suites) + self._protocol = ssl.PROTOCOL_TLS_SERVER diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index 7f72adf7..4ab8ec81 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -285,13 +285,17 @@ def open(self): six.reraise(*last_error) def _create_socket(self, sock): - self.socket = ssl.wrap_socket( + context = ssl.SSLContext(self.ssl_version) + context.verify_mode = self.cert_reqs + if self.ca_certs: + context.load_verify_locations(self.ca_certs) + if self.keyfile and not self.certfile: + raise ValueError("certfile must be specified") + if self.certfile: + context.load_cert_chain(self.certfile, self.keyfile) + self.socket = context.wrap_socket( sock, - keyfile=self.keyfile, - certfile=self.certfile, - cert_reqs=self.cert_reqs, - ssl_version=self.ssl_version, - ca_certs=self.ca_certs, + server_side=False, do_handshake_on_connect=self.do_handshake_on_connect, suppress_ragged_eofs=self.suppress_ragged_eofs) self.socket.settimeout(self.timeout) @@ -1762,7 +1766,7 @@ def _set_variables(self, host, port, keyfile, certfile, cert_reqs, self.config, 'cert_reqs', 'CERT_REQUIRED')) self.ssl_version = getattr(ssl, conf.get_valid_value( - ssl_version, self.config, 'ssl_version', conf.DEFAULT_SSL_VERSION)) + ssl_version, self.config, 'ssl_version', conf.DEFAULT_TLS_CLIENT)) self.ca_certs = conf.get_valid_value( ca_certs, self.config, 'ca_certs', conf.DEFAULT_CA_CERTS) diff --git a/kmip/services/server/config.py b/kmip/services/server/config.py index 912d980d..8e23c8e4 100644 --- a/kmip/services/server/config.py +++ b/kmip/services/server/config.py @@ -38,21 +38,22 @@ def __init__(self): self.settings['tls_cipher_suites'] = [] self.settings['logging_level'] = logging.INFO self.settings['auth_plugins'] = [] + self.settings['auth_suite'] = 'TLS_SERVER' self._expected_settings = [ 'hostname', 'port', 'certificate_path', 'key_path', - 'ca_path', - 'auth_suite' + 'ca_path' ] self._optional_settings = [ 'policy_path', 'enable_tls_client_auth', 'tls_cipher_suites', 'logging_level', - 'database_path' + 'database_path', + 'auth_suite' ] def set_setting(self, setting, value): diff --git a/kmip/services/server/server.py b/kmip/services/server/server.py index 534ab61d..6f82ccc6 100644 --- a/kmip/services/server/server.py +++ b/kmip/services/server/server.py @@ -154,8 +154,10 @@ def __init__( cipher_suites = self.config.settings.get('tls_cipher_suites') if self.config.settings.get('auth_suite') == 'TLS1.2': self.auth_suite = auth.TLS12AuthenticationSuite(cipher_suites) - else: + elif self.config.settings.get('auth_suite') == 'Basic': self.auth_suite = auth.BasicAuthenticationSuite(cipher_suites) + else: + self.auth_suite = auth.ServerAuthenticationSuite(cipher_suites) self._session_id = 1 self._is_serving = False @@ -287,17 +289,27 @@ def interrupt_handler(trigger, frame): for cipher in auth_suite_ciphers: self._logger.debug(cipher) - self._socket = ssl.wrap_socket( + cafile = self.config.settings.get('ca_path') + context = ssl.SSLContext(self.auth_suite.protocol) + context.verify_mode = ssl.CERT_REQUIRED + if (self.auth_suite.ciphers and + self.auth_suite.protocol != ssl.PROTOCOL_TLS_SERVER): + context.set_ciphers(self.auth_suite.ciphers) + if cafile: + context.load_verify_locations(cafile) + certfile = self.config.settings.get('certificate_path') + + if certfile: + keyfile = self.config.settings.get('key_path') + context.load_cert_chain(certfile, keyfile=keyfile) + else: + raise ValueError("certfile must be specified for server-side operations") + + self._socket = context.wrap_socket( self._socket, - keyfile=self.config.settings.get('key_path'), - certfile=self.config.settings.get('certificate_path'), server_side=True, - cert_reqs=ssl.CERT_REQUIRED, - ssl_version=self.auth_suite.protocol, - ca_certs=self.config.settings.get('ca_path'), do_handshake_on_connect=False, - suppress_ragged_eofs=True, - ciphers=self.auth_suite.ciphers + suppress_ragged_eofs=True ) try: diff --git a/kmip/tests/unit/services/server/test_server.py b/kmip/tests/unit/services/server/test_server.py index a9e9f194..423208d9 100644 --- a/kmip/tests/unit/services/server/test_server.py +++ b/kmip/tests/unit/services/server/test_server.py @@ -50,7 +50,7 @@ def test_init(self, config_mock, logging_mock): self.assertTrue(config_mock.called) self.assertTrue(logging_mock.called) - self.assertIsInstance(s.auth_suite, auth.BasicAuthenticationSuite) + self.assertIsInstance(s.auth_suite, auth.ServerAuthenticationSuite) self.assertEqual(1, s._session_id) self.assertFalse(s._is_serving) @@ -210,9 +210,9 @@ def test_start(self, # Test that in ideal cases no errors are generated and the right # log messages are. with mock.patch('socket.socket') as socket_mock: - with mock.patch('ssl.wrap_socket') as ssl_mock: + with mock.patch('ssl.SSLContext') as ssl_mock: socket_mock.return_value = a_mock - ssl_mock.return_value = b_mock + ssl_mock.return_value.wrap_socket.return_value = b_mock manager_mock.assert_not_called() monitor_mock.assert_not_called() @@ -271,7 +271,7 @@ def test_start(self, # Test that a NetworkingError is generated if the socket bind fails. with mock.patch('socket.socket') as socket_mock: - with mock.patch('ssl.wrap_socket') as ssl_mock: + with mock.patch('ssl.SSLContext.wrap_socket') as ssl_mock: socket_mock.return_value = a_mock ssl_mock.return_value = b_mock diff --git a/kmip/tests/unit/services/test_auth.py b/kmip/tests/unit/services/test_auth.py index 92ec1f27..616184df 100644 --- a/kmip/tests/unit/services/test_auth.py +++ b/kmip/tests/unit/services/test_auth.py @@ -237,3 +237,117 @@ def test_custom_ciphers_empty(self): self.assertIn('ECDHE-ECDSA-AES256-GCM-SHA384', suites) self.assertIn('ECDHE-ECDSA-AES128-SHA256', suites) self.assertIn('ECDHE-ECDSA-AES256-SHA384', suites) + + +class TestClientAuthenticationSuite(testtools.TestCase): + """ + A test suite for the ClientAuthenticationSuite. + """ + + def setUp(self): + super(TestClientAuthenticationSuite, self).setUp() + + def tearDown(self): + super(TestClientAuthenticationSuite, self).tearDown() + + def test_init(self): + auth.ClientAuthenticationSuite() + + def test_protocol(self): + suite = auth.ClientAuthenticationSuite() + protocol = suite.protocol + + self.assertIsInstance(protocol, int) + self.assertEqual(ssl.PROTOCOL_TLS_CLIENT, suite.protocol) + + def test_ciphers(self): + suite = auth.ClientAuthenticationSuite() + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + + cipher_string = ':'.join(( + 'TLS_AES_128_GCM_SHA256', + 'TLS_AES_256_GCM_SHA384', + 'TLS_CHACHA20_POLY1305_SHA256', + 'TLS_AES_128_CCM_SHA256', + 'TLS_AES_128_CCM_8_SHA256', + )) + + self.assertEqual(cipher_string, ciphers) + + def test_custom_ciphers_empty(self): + """ + Test that providing a custom list of cipher suites that ultimately + yields an empty suite list causes the default cipher suite list to + be provided instead. + """ + suite = auth.ClientAuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_256_CBC_SHA' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(5, len(suites)) + self.assertIn('TLS_AES_128_GCM_SHA256', suites) + self.assertIn('TLS_CHACHA20_POLY1305_SHA256', suites) + + +class TestServerAuthenticationSuite(testtools.TestCase): + """ + A test suite for the ServerAuthenticationSuite. + """ + + def setUp(self): + super(TestServerAuthenticationSuite, self).setUp() + + def tearDown(self): + super(TestServerAuthenticationSuite, self).tearDown() + + def test_init(self): + auth.ServerAuthenticationSuite() + + def test_protocol(self): + suite = auth.ServerAuthenticationSuite() + protocol = suite.protocol + + self.assertIsInstance(protocol, int) + self.assertEqual(ssl.PROTOCOL_TLS_SERVER, suite.protocol) + + def test_ciphers(self): + suite = auth.ServerAuthenticationSuite() + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + + cipher_string = ':'.join(( + 'TLS_AES_128_GCM_SHA256', + 'TLS_AES_256_GCM_SHA384', + 'TLS_CHACHA20_POLY1305_SHA256', + 'TLS_AES_128_CCM_SHA256', + 'TLS_AES_128_CCM_8_SHA256', + )) + + self.assertEqual(cipher_string, ciphers) + + def test_custom_ciphers_empty(self): + """ + Test that providing a custom list of cipher suites that ultimately + yields an empty suite list causes the default cipher suite list to + be provided instead. + """ + suite = auth.ServerAuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_256_CBC_SHA' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(5, len(suites)) + self.assertIn('TLS_AES_128_GCM_SHA256', suites) + self.assertIn('TLS_CHACHA20_POLY1305_SHA256', suites)