Skip to content

Commit

Permalink
Change prefix of AwsDynamoDB hook module (apache#11209)
Browse files Browse the repository at this point in the history
* align import path of AwsDynamoDBHook in aws providers

Co-authored-by: Tomek Urbaszek <[email protected]>
  • Loading branch information
eladkal and turbaszek authored Oct 11, 2020
1 parent 42a23d1 commit c3e3405
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 62 deletions.
6 changes: 3 additions & 3 deletions airflow/contrib/hooks/aws_dynamodb_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.aws_dynamodb`."""
"""This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.dynamodb`."""

import warnings

# pylint: disable=unused-import
from airflow.providers.amazon.aws.hooks.aws_dynamodb import AwsDynamoDBHook # noqa
from airflow.providers.amazon.aws.hooks.dynamodb import AwsDynamoDBHook # noqa

warnings.warn(
"This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.aws_dynamodb`.",
"This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.dynamodb`.",
DeprecationWarning, stacklevel=2
)
26 changes: 26 additions & 0 deletions airflow/providers/amazon/aws/ADDITIONAL_INFO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Change in import paths

If you are upgrading from 2020.10.5 note the following changes in import paths

| Old path | New path |
| --------------------------------------------------------------- | ----------------------------------------------------------- |
| airflow.providers.amazon.aws.hooks.aws_dynamodb.AwsDynamoDBHook | airflow.providers.amazon.aws.hooks.dynamodb.AwsDynamoDBHook |
56 changes: 9 additions & 47 deletions airflow/providers/amazon/aws/hooks/aws_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.dynamodb`."""

import warnings

"""
This module contains the AWS DynamoDB hook
"""
from typing import Iterable, List, Optional
# pylint: disable=unused-import
from airflow.providers.amazon.aws.hooks.dynamodb import AwsDynamoDBHook # noqa

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class AwsDynamoDBHook(AwsBaseHook):
"""
Interact with AWS DynamoDB.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
:param table_keys: partition key and sort key
:type table_keys: list
:param table_name: target DynamoDB table
:type table_name: str
"""

def __init__(
self, *args, table_keys: Optional[List] = None, table_name: Optional[str] = None, **kwargs
) -> None:
self.table_keys = table_keys
self.table_name = table_name
kwargs["resource_type"] = "dynamodb"
super().__init__(*args, **kwargs)

def write_batch_data(self, items: Iterable) -> bool:
"""
Write batch items to DynamoDB table with provisioned throughout capacity.
"""
try:
table = self.get_conn().Table(self.table_name)

with table.batch_writer(overwrite_by_pkeys=self.table_keys) as batch:
for item in items:
batch.put_item(Item=item)
return True
except Exception as general_error:
raise AirflowException(
"Failed to insert items in dynamodb, error: {error}".format(error=str(general_error))
)
warnings.warn(
"This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.dynamodb`.",
DeprecationWarning,
stacklevel=2,
)
67 changes: 67 additions & 0 deletions airflow/providers/amazon/aws/hooks/dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


"""
This module contains the AWS DynamoDB hook
"""
from typing import Iterable, List, Optional

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class AwsDynamoDBHook(AwsBaseHook):
"""
Interact with AWS DynamoDB.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
:param table_keys: partition key and sort key
:type table_keys: list
:param table_name: target DynamoDB table
:type table_name: str
"""

def __init__(
self, *args, table_keys: Optional[List] = None, table_name: Optional[str] = None, **kwargs
) -> None:
self.table_keys = table_keys
self.table_name = table_name
kwargs["resource_type"] = "dynamodb"
super().__init__(*args, **kwargs)

def write_batch_data(self, items: Iterable) -> bool:
"""
Write batch items to DynamoDB table with provisioned throughout capacity.
"""
try:
table = self.get_conn().Table(self.table_name)

with table.batch_writer(overwrite_by_pkeys=self.table_keys) as batch:
for item in items:
batch.put_item(Item=item)
return True
except Exception as general_error:
raise AirflowException(
"Failed to insert items in dynamodb, error: {error}".format(error=str(general_error))
)
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from uuid import uuid4

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.aws_dynamodb import AwsDynamoDBHook
from airflow.providers.amazon.aws.hooks.dynamodb import AwsDynamoDBHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/transfers/hive_to_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import json

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.aws_dynamodb import AwsDynamoDBHook
from airflow.providers.amazon.aws.hooks.dynamodb import AwsDynamoDBHook
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
from airflow.utils.decorators import apply_defaults

Expand Down
2 changes: 1 addition & 1 deletion docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ These integrations allow you to perform various operations within the Amazon Web

* - `Amazon DynamoDB <https://aws.amazon.com/dynamodb/>`__
-
- :mod:`airflow.providers.amazon.aws.hooks.aws_dynamodb`
- :mod:`airflow.providers.amazon.aws.hooks.dynamodb`
-
-

Expand Down
34 changes: 28 additions & 6 deletions tests/core/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,23 @@ def test_providers_modules_should_have_tests(self):
"""
Assert every module in /airflow/providers has a corresponding test_ file in tests/airflow/providers.
"""
# Deprecated modules that don't have corresponded test
expected_missing_providers_modules = {('airflow/providers/amazon/aws/hooks/aws_dynamodb.py',
'tests/providers/amazon/aws/hooks/test_aws_dynamodb.py')}

# TODO: Should we extend this test to cover other directories?
expected_test_files = glob.glob(f"{ROOT_FOLDER}/airflow/providers/**/*.py", recursive=True)
modules_files = glob.glob(f"{ROOT_FOLDER}/airflow/providers/**/*.py", recursive=True)

# Make path relative
expected_test_files = (os.path.relpath(f, ROOT_FOLDER) for f in expected_test_files)
modules_files = (os.path.relpath(f, ROOT_FOLDER) for f in modules_files)
# Exclude example_dags
expected_test_files = (f for f in expected_test_files if "/example_dags/" not in f)
modules_files = (f for f in modules_files if "/example_dags/" not in f)
# Exclude __init__.py
expected_test_files = (f for f in expected_test_files if not f.endswith("__init__.py"))
modules_files = (f for f in modules_files if not f.endswith("__init__.py"))
# Change airflow/ to tests/
expected_test_files = (
f'tests/{f.partition("/")[2]}'
for f in expected_test_files if not f.endswith("__init__.py")
for f in modules_files if not f.endswith("__init__.py")
)
# Add test_ prefix to filename
expected_test_files = (
Expand All @@ -81,11 +86,28 @@ def test_providers_modules_should_have_tests(self):
# Exclude __init__.py
current_test_files = (f for f in current_test_files if not f.endswith("__init__.py"))

modules_files = set(modules_files)
expected_test_files = set(expected_test_files)
current_test_files = set(current_test_files)

missing_tests_files = expected_test_files - expected_test_files.intersection(current_test_files)
self.assertEqual(set(), missing_tests_files)

with self.subTest("Detect missing tests in providers module"):
expected_missing_test_modules = set(pair[1] for pair in expected_missing_providers_modules)
missing_tests_files = missing_tests_files - set(expected_missing_test_modules)
self.assertEqual(set(), missing_tests_files)

with self.subTest("Verify removed deprecated module also removed from deprecated list"):
expected_missing_modules = set(pair[0] for pair in expected_missing_providers_modules)
removed_deprecated_module = expected_missing_modules - modules_files
if removed_deprecated_module:
self.fail(
"You've removed a deprecated module:\n"
f"{removed_deprecated_module}"
"\n"
"Thank you very much.\n"
"Can you remove it from the list of expected missing modules tests, please?"
)


class TestGoogleProviderProjectStructure(unittest.TestCase):
Expand Down
2 changes: 1 addition & 1 deletion tests/deprecated_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@
'airflow.contrib.hooks.aws_hook.AwsHook',
),
(
'airflow.providers.amazon.aws.hooks.aws_dynamodb.AwsDynamoDBHook',
'airflow.providers.amazon.aws.hooks.dynamodb.AwsDynamoDBHook',
'airflow.contrib.hooks.aws_dynamodb_hook.AwsDynamoDBHook',
),
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import unittest
import uuid

from airflow.providers.amazon.aws.hooks.aws_dynamodb import AwsDynamoDBHook
from airflow.providers.amazon.aws.hooks.dynamodb import AwsDynamoDBHook

try:
from moto import mock_dynamodb2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import airflow.providers.amazon.aws.transfers.hive_to_dynamodb
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.hooks.aws_dynamodb import AwsDynamoDBHook
from airflow.providers.amazon.aws.hooks.dynamodb import AwsDynamoDBHook

DEFAULT_DATE = datetime.datetime(2015, 1, 1)
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
Expand Down

0 comments on commit c3e3405

Please sign in to comment.