|
| 1 | +import json |
| 2 | + |
| 3 | +have_dynamodb = False |
| 4 | +try: |
| 5 | + import boto3 |
| 6 | + have_dynamodb = True |
| 7 | +except ImportError: |
| 8 | + pass |
| 9 | + |
| 10 | +from ldclient import log |
| 11 | +from ldclient.feature_store import CacheConfig |
| 12 | +from ldclient.feature_store_helpers import CachingStoreWrapper |
| 13 | +from ldclient.interfaces import FeatureStore, FeatureStoreCore |
| 14 | + |
| 15 | +# |
| 16 | +# Internal implementation of the DynamoDB feature store. |
| 17 | +# |
| 18 | +# Implementation notes: |
| 19 | +# |
| 20 | +# * Feature flags, segments, and any other kind of entity the LaunchDarkly client may wish |
| 21 | +# to store, are all put in the same table. The only two required attributes are "key" (which |
| 22 | +# is present in all storeable entities) and "namespace" (a parameter from the client that is |
| 23 | +# used to disambiguate between flags and segments). |
| 24 | +# |
| 25 | +# * Because of DynamoDB's restrictions on attribute values (e.g. empty strings are not |
| 26 | +# allowed), the standard DynamoDB marshaling mechanism with one attribute per object property |
| 27 | +# is not used. Instead, the entire object is serialized to JSON and stored in a single |
| 28 | +# attribute, "item". The "version" property is also stored as a separate attribute since it |
| 29 | +# is used for updates. |
| 30 | +# |
| 31 | +# * Since DynamoDB doesn't have transactions, the init() method - which replaces the entire data |
| 32 | +# store - is not atomic, so there can be a race condition if another process is adding new data |
| 33 | +# via upsert(). To minimize this, we don't delete all the data at the start; instead, we update |
| 34 | +# the items we've received, and then delete all other items. That could potentially result in |
| 35 | +# deleting new data from another process, but that would be the case anyway if the init() |
| 36 | +# happened to execute later than the upsert(); we are relying on the fact that normally the |
| 37 | +# process that did the init() will also receive the new data shortly and do its own upsert(). |
| 38 | +# |
| 39 | +# * DynamoDB has a maximum item size of 400KB. Since each feature flag or user segment is |
| 40 | +# stored as a single item, this mechanism will not work for extremely large flags or segments. |
| 41 | +# |
| 42 | + |
| 43 | +class _DynamoDBFeatureStoreCore(FeatureStoreCore): |
| 44 | + PARTITION_KEY = 'namespace' |
| 45 | + SORT_KEY = 'key' |
| 46 | + VERSION_ATTRIBUTE = 'version' |
| 47 | + ITEM_JSON_ATTRIBUTE = 'item' |
| 48 | + |
| 49 | + def __init__(self, table_name, prefix, dynamodb_opts): |
| 50 | + if not have_dynamodb: |
| 51 | + raise NotImplementedError("Cannot use DynamoDB feature store because AWS SDK (boto3 package) is not installed") |
| 52 | + self._table_name = table_name |
| 53 | + self._prefix = None if prefix == "" else prefix |
| 54 | + self._client = boto3.client('dynamodb', **dynamodb_opts) |
| 55 | + |
| 56 | + def init_internal(self, all_data): |
| 57 | + # Start by reading the existing keys; we will later delete any of these that weren't in all_data. |
| 58 | + unused_old_keys = self._read_existing_keys(all_data.keys()) |
| 59 | + requests = [] |
| 60 | + num_items = 0 |
| 61 | + inited_key = self._inited_key() |
| 62 | + |
| 63 | + # Insert or update every provided item |
| 64 | + for kind, items in all_data.items(): |
| 65 | + for key, item in items.items(): |
| 66 | + encoded_item = self._marshal_item(kind, item) |
| 67 | + requests.append({ 'PutRequest': { 'Item': encoded_item } }) |
| 68 | + combined_key = (self._namespace_for_kind(kind), key) |
| 69 | + unused_old_keys.discard(combined_key) |
| 70 | + num_items = num_items + 1 |
| 71 | + |
| 72 | + # Now delete any previously existing items whose keys were not in the current data |
| 73 | + for combined_key in unused_old_keys: |
| 74 | + if combined_key[0] != inited_key: |
| 75 | + requests.append({ 'DeleteRequest': { 'Key': self._make_keys(combined_key[0], combined_key[1]) } }) |
| 76 | + |
| 77 | + # Now set the special key that we check in initialized_internal() |
| 78 | + requests.append({ 'PutRequest': { 'Item': self._make_keys(inited_key, inited_key) } }) |
| 79 | + |
| 80 | + _DynamoDBHelpers.batch_write_requests(self._client, self._table_name, requests) |
| 81 | + log.info('Initialized table %s with %d items', self._table_name, num_items) |
| 82 | + |
| 83 | + def get_internal(self, kind, key): |
| 84 | + resp = self._get_item_by_keys(self._namespace_for_kind(kind), key) |
| 85 | + return self._unmarshal_item(resp.get('Item')) |
| 86 | + |
| 87 | + def get_all_internal(self, kind): |
| 88 | + items_out = {} |
| 89 | + paginator = self._client.get_paginator('query') |
| 90 | + for resp in paginator.paginate(**self._make_query_for_kind(kind)): |
| 91 | + for item in resp['Items']: |
| 92 | + item_out = self._unmarshal_item(item) |
| 93 | + items_out[item_out['key']] = item_out |
| 94 | + return items_out |
| 95 | + |
| 96 | + def upsert_internal(self, kind, item): |
| 97 | + encoded_item = self._marshal_item(kind, item) |
| 98 | + try: |
| 99 | + req = { |
| 100 | + 'TableName': self._table_name, |
| 101 | + 'Item': encoded_item, |
| 102 | + 'ConditionExpression': 'attribute_not_exists(#namespace) or attribute_not_exists(#key) or :version > #version', |
| 103 | + 'ExpressionAttributeNames': { |
| 104 | + '#namespace': self.PARTITION_KEY, |
| 105 | + '#key': self.SORT_KEY, |
| 106 | + '#version': self.VERSION_ATTRIBUTE |
| 107 | + }, |
| 108 | + 'ExpressionAttributeValues': { |
| 109 | + ':version': { 'N': str(item['version']) } |
| 110 | + } |
| 111 | + } |
| 112 | + self._client.put_item(**req) |
| 113 | + except self._client.exceptions.ConditionalCheckFailedException: |
| 114 | + # The item was not updated because there's a newer item in the database. We must now |
| 115 | + # read the item that's in the database and return it, so CachingStoreWrapper can cache it. |
| 116 | + return self.get_internal(kind, item['key']) |
| 117 | + return item |
| 118 | + |
| 119 | + def initialized_internal(self): |
| 120 | + resp = self._get_item_by_keys(self._inited_key(), self._inited_key()) |
| 121 | + return resp.get('Item') is not None and len(resp['Item']) > 0 |
| 122 | + |
| 123 | + def _prefixed_namespace(self, base): |
| 124 | + return base if self._prefix is None else (self._prefix + ':' + base) |
| 125 | + |
| 126 | + def _namespace_for_kind(self, kind): |
| 127 | + return self._prefixed_namespace(kind.namespace) |
| 128 | + |
| 129 | + def _inited_key(self): |
| 130 | + return self._prefixed_namespace('$inited') |
| 131 | + |
| 132 | + def _make_keys(self, namespace, key): |
| 133 | + return { |
| 134 | + self.PARTITION_KEY: { 'S': namespace }, |
| 135 | + self.SORT_KEY: { 'S': key } |
| 136 | + } |
| 137 | + |
| 138 | + def _make_query_for_kind(self, kind): |
| 139 | + return { |
| 140 | + 'TableName': self._table_name, |
| 141 | + 'ConsistentRead': True, |
| 142 | + 'KeyConditions': { |
| 143 | + self.PARTITION_KEY: { |
| 144 | + 'AttributeValueList': [ |
| 145 | + { 'S': self._namespace_for_kind(kind) } |
| 146 | + ], |
| 147 | + 'ComparisonOperator': 'EQ' |
| 148 | + } |
| 149 | + } |
| 150 | + } |
| 151 | + |
| 152 | + def _get_item_by_keys(self, namespace, key): |
| 153 | + return self._client.get_item(TableName=self._table_name, Key=self._make_keys(namespace, key)) |
| 154 | + |
| 155 | + def _read_existing_keys(self, kinds): |
| 156 | + keys = set() |
| 157 | + for kind in kinds: |
| 158 | + req = self._make_query_for_kind(kind) |
| 159 | + req['ProjectionExpression'] = '#namespace, #key' |
| 160 | + req['ExpressionAttributeNames'] = { |
| 161 | + '#namespace': self.PARTITION_KEY, |
| 162 | + '#key': self.SORT_KEY |
| 163 | + } |
| 164 | + paginator = self._client.get_paginator('query') |
| 165 | + for resp in paginator.paginate(**req): |
| 166 | + for item in resp['Items']: |
| 167 | + namespace = item[self.PARTITION_KEY]['S'] |
| 168 | + key = item[self.SORT_KEY]['S'] |
| 169 | + keys.add((namespace, key)) |
| 170 | + return keys |
| 171 | + |
| 172 | + def _marshal_item(self, kind, item): |
| 173 | + json_str = json.dumps(item) |
| 174 | + ret = self._make_keys(self._namespace_for_kind(kind), item['key']) |
| 175 | + ret[self.VERSION_ATTRIBUTE] = { 'N': str(item['version']) } |
| 176 | + ret[self.ITEM_JSON_ATTRIBUTE] = { 'S': json_str } |
| 177 | + return ret |
| 178 | + |
| 179 | + def _unmarshal_item(self, item): |
| 180 | + if item is None: |
| 181 | + return None |
| 182 | + json_attr = item.get(self.ITEM_JSON_ATTRIBUTE) |
| 183 | + return None if json_attr is None else json.loads(json_attr['S']) |
| 184 | + |
| 185 | + |
| 186 | +class _DynamoDBHelpers(object): |
| 187 | + @staticmethod |
| 188 | + def batch_write_requests(client, table_name, requests): |
| 189 | + batch_size = 25 |
| 190 | + for batch in (requests[i:i+batch_size] for i in range(0, len(requests), batch_size)): |
| 191 | + client.batch_write_item(RequestItems={ table_name: batch }) |
0 commit comments