diff --git a/django_mongodb_backend/__init__.py b/django_mongodb_backend/__init__.py index 00700421..3181b927 100644 --- a/django_mongodb_backend/__init__.py +++ b/django_mongodb_backend/__init__.py @@ -2,7 +2,12 @@ # Check Django compatibility before other imports which may fail if the # wrong version of Django is installed. -from .utils import check_django_compatability, parse_uri +from .utils import ( + check_django_compatability, + get_auto_encryption_opts, + get_kms_providers, + parse_uri, +) check_django_compatability() @@ -15,7 +20,7 @@ from .lookups import register_lookups # noqa: E402 from .query import register_nodes # noqa: E402 -__all__ = ["parse_uri"] +__all__ = ["get_auto_encryption_opts", "get_kms_providers", "parse_uri"] register_aggregates() register_checks() diff --git a/django_mongodb_backend/utils.py b/django_mongodb_backend/utils.py index ced60bc8..419c4d80 100644 --- a/django_mongodb_backend/utils.py +++ b/django_mongodb_backend/utils.py @@ -8,6 +8,7 @@ from django.utils.functional import SimpleLazyObject from django.utils.text import format_lazy from django.utils.version import get_version_tuple +from pymongo.encryption_options import AutoEncryptionOpts from pymongo.uri_parser import parse_uri as pymongo_parse_uri @@ -28,7 +29,54 @@ def check_django_compatability(): ) -def parse_uri(uri, *, db_name=None, test=None): +def get_auto_encryption_opts(crypt_shared_lib_path=None, kms_providers=None): + key_vault_database_name = "encryption" + key_vault_collection_name = "__keyVault" + key_vault_namespace = f"{key_vault_database_name}.{key_vault_collection_name}" + return AutoEncryptionOpts( + key_vault_namespace=key_vault_namespace, + kms_providers=kms_providers, + crypt_shared_lib_path=crypt_shared_lib_path, + ) + + +# This file is intended for local testing only. +# The returned key is hard-coded and should NOT be used in production. + + +def get_customer_master_key(): + """ + Returns a 96-byte local master key for use with MongoDB Client-Side Field Level Encryption. + + For local testing purposes only. In production, use a secure KMS like AWS, Azure, GCP, or KMIP. + + Returns: + bytes: A 96-byte key. + """ + # WARNING: This is a static key for testing only. + # Generate with: os.urandom(96) + return bytes.fromhex( + "000102030405060708090a0b0c0d0e0f" + "101112131415161718191a1b1c1d1e1f" + "202122232425262728292a2b2c2d2e2f" + "303132333435363738393a3b3c3d3e3f" + "404142434445464748494a4b4c4d4e4f" + "505152535455565758595a5b5c5d5e5f" + ) + + +def get_kms_providers(): + """ + Return supported KMS providers for MongoDB Client-Side Field Level Encryption. + """ + return { + "local": { + "key": get_customer_master_key(), + }, + } + + +def parse_uri(uri, *, db_name=None, test=None, options=None): """ Convert the given uri into a dictionary suitable for Django's DATABASES setting. @@ -48,6 +96,9 @@ def parse_uri(uri, *, db_name=None, test=None): db_name = db_name or uri["database"] if not db_name: raise ImproperlyConfigured("You must provide the db_name parameter.") + opts = uri.get("options") + if options: + opts = {**opts, **options} settings_dict = { "ENGINE": "django_mongodb_backend", "NAME": db_name, @@ -55,7 +106,7 @@ def parse_uri(uri, *, db_name=None, test=None): "PORT": port, "USER": uri.get("username"), "PASSWORD": uri.get("password"), - "OPTIONS": uri.get("options"), + "OPTIONS": opts, } if "authSource" not in settings_dict["OPTIONS"] and uri["database"]: settings_dict["OPTIONS"]["authSource"] = uri["database"] diff --git a/tests/backend_/utils/test_parse_uri.py b/tests/backend_/utils/test_parse_uri.py index 3198a463..65d5c43e 100644 --- a/tests/backend_/utils/test_parse_uri.py +++ b/tests/backend_/utils/test_parse_uri.py @@ -4,7 +4,7 @@ from django.core.exceptions import ImproperlyConfigured from django.test import SimpleTestCase -from django_mongodb_backend import parse_uri +from django_mongodb_backend import get_auto_encryption_opts, parse_uri class ParseURITests(SimpleTestCase): @@ -94,3 +94,13 @@ def test_invalid_credentials(self): def test_no_scheme(self): with self.assertRaisesMessage(pymongo.errors.InvalidURI, "Invalid URI scheme"): parse_uri("cluster0.example.mongodb.net") + + def test_options(self): + settings_dict = parse_uri( + "mongodb://cluster0.example.mongodb.net/myDatabase", + options={"auto_encryption_opts": get_auto_encryption_opts()}, + ) + self.assertIsInstance( + settings_dict["OPTIONS"]["auto_encryption_opts"], + pymongo.encryption_options.AutoEncryptionOpts, + )