Skip to content

Commit

Permalink
Remove config side effects from tests (apache#8607)
Browse files Browse the repository at this point in the history
* Remove config side effects

* Fix LatestOnlyOperator return type to be json serializable

* Fix tests/test_configuration.py

* Fix tests/executors/test_dask_executor.py

* Fix tests/jobs/test_scheduler_job.py

* Fix tests/models/test_cleartasks.py

* Fix tests/models/test_taskinstance.py

* Fix tests/models/test_xcom.py

* Fix tests/security/test_kerberos.py

* Fix tests/test_configuration.py

* Fix tests/test_logging_config.py

* Fix tests/utils/test_dag_processing.py

* Apply isort

* Fix tests/utils/test_email.py

* Fix tests/utils/test_task_handler_with_custom_formatter.py

* Fix tests/www/api/experimental/test_kerberos_endpoints.py

* Fix tests/www/test_views.py

* Code refactor

* Fix tests/www/api/experimental/test_kerberos_endpoints.py

* Fix requirements

* fixup! Fix tests/www/test_views.py
  • Loading branch information
turbaszek authored May 4, 2020
1 parent 5ddc458 commit caa60b1
Show file tree
Hide file tree
Showing 16 changed files with 161 additions and 317 deletions.
3 changes: 1 addition & 2 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,9 @@ def serialize_value(value: Any):
# "pickling" will be removed in Airflow 2.0.
if conf.getboolean('core', 'enable_xcom_pickling'):
return pickle.dumps(value)

try:
return json.dumps(value).encode('UTF-8')
except ValueError:
except (ValueError, TypeError):
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
Expand Down
4 changes: 2 additions & 2 deletions airflow/operators/latest_only_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
if context['dag_run'] and context['dag_run'].external_trigger:
self.log.info(
"Externally triggered DAG_Run: allowing execution to proceed.")
return context['task'].get_direct_relative_ids(upstream=False)
return list(context['task'].get_direct_relative_ids(upstream=False))

now = pendulum.utcnow()
left_window = context['dag'].following_schedule(
Expand All @@ -64,4 +64,4 @@ def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
return []
else:
self.log.info('Latest, allowing execution to proceed.')
return context['task'].get_direct_relative_ids(upstream=False)
return list(context['task'].get_direct_relative_ids(upstream=False))
42 changes: 20 additions & 22 deletions tests/executors/test_dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import pytest

from airflow.configuration import conf
from airflow.jobs.backfill_job import BackfillJob
from airflow.models import DagBag
from airflow.utils import timezone
from tests.test_utils.config import conf_vars

try:
from airflow.executors.dask_executor import DaskExecutor
Expand Down Expand Up @@ -112,29 +112,27 @@ class TestDaskExecutorTLS(TestBaseDask):
def setUp(self):
self.dagbag = DagBag(include_examples=True)

@conf_vars({
('dask', 'tls_ca'): get_cert('tls-ca-cert.pem'),
('dask', 'tls_cert'): get_cert('tls-key-cert.pem'),
('dask', 'tls_key'): get_cert('tls-key.pem'),
})
def test_tls(self):
# These use test certs that ship with dask/distributed and should not be
# used in production
with dask_testing_cluster(
worker_kwargs={'security': tls_security(), "protocol": "tls"},
scheduler_kwargs={'security': tls_security(), "protocol": "tls"}) as (cluster, _):

# These use test certs that ship with dask/distributed and should not be
# used in production
conf.set('dask', 'tls_ca', get_cert('tls-ca-cert.pem'))
conf.set('dask', 'tls_cert', get_cert('tls-key-cert.pem'))
conf.set('dask', 'tls_key', get_cert('tls-key.pem'))
try:
executor = DaskExecutor(cluster_address=cluster['address'])

self.assert_tasks_on_executor(executor)

executor.end()
# close the executor, the cluster context manager expects all listeners
# and tasks to have completed.
executor.client.close()
finally:
conf.set('dask', 'tls_ca', '')
conf.set('dask', 'tls_key', '')
conf.set('dask', 'tls_cert', '')
worker_kwargs={'security': tls_security(), "protocol": "tls"},
scheduler_kwargs={'security': tls_security(), "protocol": "tls"}
) as (cluster, _):

executor = DaskExecutor(cluster_address=cluster['address'])

self.assert_tasks_on_executor(executor)

executor.end()
# close the executor, the cluster context manager expects all listeners
# and tasks to have completed.
executor.client.close()

@mock.patch('airflow.executors.dask_executor.DaskExecutor.sync')
@mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')
Expand Down
30 changes: 8 additions & 22 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@
TEMP_DAG_FILENAME = "temp_dag.py"


@pytest.fixture(scope="class")
def disable_load_example():
with conf_vars({('core', 'load_examples'): 'false'}):
yield


@pytest.mark.usefixtures("disable_load_example")
class TestDagFileProcessor(unittest.TestCase):
def setUp(self):
clear_db_runs()
Expand Down Expand Up @@ -107,17 +114,6 @@ def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timed
@classmethod
def setUpClass(cls):
cls.dagbag = DagBag()
cls.old_val = None
if conf.has_option('core', 'load_examples'):
cls.old_val = conf.get('core', 'load_examples')
conf.set('core', 'load_examples', 'false')

@classmethod
def tearDownClass(cls):
if cls.old_val is not None:
conf.set('core', 'load_examples', cls.old_val)
else:
conf.remove_option('core', 'load_examples')

def test_dag_file_processor_sla_miss_callback(self):
"""
Expand Down Expand Up @@ -1262,6 +1258,7 @@ def test_process_file_queries_count(
processor.process_file(ELASTIC_DAG_FILE, [])


@pytest.mark.usefixtures("disable_load_example")
class TestSchedulerJob(unittest.TestCase):

def setUp(self):
Expand All @@ -1278,17 +1275,6 @@ def setUp(self):
@classmethod
def setUpClass(cls):
cls.dagbag = DagBag()
cls.old_val = None
if conf.has_option('core', 'load_examples'):
cls.old_val = conf.get('core', 'load_examples')
conf.set('core', 'load_examples', 'false')

@classmethod
def tearDownClass(cls):
if cls.old_val is not None:
conf.set('core', 'load_examples', cls.old_val)
else:
conf.remove_option('core', 'load_examples')

def test_is_alive(self):
job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
Expand Down
16 changes: 5 additions & 11 deletions tests/models/test_cleartasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import unittest

from airflow import settings
from airflow.configuration import conf
from airflow.models import DAG, TaskInstance as TI, XCom, clear_task_instances
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
from tests.models import DEFAULT_DATE
from tests.test_utils.config import conf_vars


class TestClearTasks(unittest.TestCase):
Expand Down Expand Up @@ -237,15 +237,13 @@ def test_operator_clear(self):
# try_number (0) + retries(1)
self.assertEqual(ti2.max_tries, 1)

@conf_vars({("core", "enable_xcom_pickling"): "False"})
def test_xcom_disable_pickle_type(self):
json_obj = {"key": "value"}
execution_date = timezone.utcnow()
key = "xcom_test1"
dag_id = "test_dag1"
task_id = "test_task1"

conf.set("core", "enable_xcom_pickling", "False")

XCom.set(key=key,
value=json_obj,
dag_id=dag_id,
Expand All @@ -267,15 +265,13 @@ def test_xcom_disable_pickle_type(self):

self.assertEqual(ret_value, json_obj)

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_xcom_enable_pickle_type(self):
json_obj = {"key": "value"}
execution_date = timezone.utcnow()
key = "xcom_test2"
dag_id = "test_dag2"
task_id = "test_task2"

conf.set("core", "enable_xcom_pickling", "True")

XCom.set(key=key,
value=json_obj,
dag_id=dag_id,
Expand All @@ -297,20 +293,20 @@ def test_xcom_enable_pickle_type(self):

self.assertEqual(ret_value, json_obj)

@conf_vars({("core", "xcom_enable_pickling"): "False"})
def test_xcom_disable_pickle_type_fail_on_non_json(self):
class PickleRce:
def __reduce__(self):
return os.system, ("ls -alt",)

conf.set("core", "xcom_enable_pickling", "False")

self.assertRaises(TypeError, XCom.set,
key="xcom_test3",
value=PickleRce(),
dag_id="test_dag3",
task_id="test_task3",
execution_date=timezone.utcnow())

@conf_vars({("core", "xcom_enable_pickling"): "True"})
def test_xcom_get_many(self):
json_obj = {"key": "value"}
execution_date = timezone.utcnow()
Expand All @@ -320,8 +316,6 @@ def test_xcom_get_many(self):
dag_id2 = "test_dag5"
task_id2 = "test_task5"

conf.set("core", "xcom_enable_pickling", "True")

XCom.set(key=key,
value=json_obj,
dag_id=dag_id1,
Expand Down
9 changes: 5 additions & 4 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from sqlalchemy.orm.session import Session

from airflow import models, settings
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import (
DAG, DagRun, Pool, RenderedTaskInstanceFields, TaskFail, TaskInstance as TI, TaskReschedule, Variable,
Expand All @@ -50,6 +49,7 @@
from airflow.utils.state import State
from tests.models import DEFAULT_DATE
from tests.test_utils import db
from tests.test_utils.config import conf_vars


class CallbackWrapper:
Expand Down Expand Up @@ -1160,6 +1160,10 @@ def test_email_alert(self, mock_send_email):
self.assertIn('test_email_alert', body)
self.assertIn('Try 1', body)

@conf_vars({
('email', 'subject_template'): '/subject/path',
('email', 'html_content_template'): '/html_content/path',
})
@patch('airflow.models.taskinstance.send_email')
def test_email_alert_with_config(self, mock_send_email):
dag = models.DAG(dag_id='test_failure_email')
Expand All @@ -1173,9 +1177,6 @@ def test_email_alert_with_config(self, mock_send_email):
ti = TI(
task=task, execution_date=datetime.datetime.now())

conf.set('email', 'subject_template', '/subject/path')
conf.set('email', 'html_content_template', '/html_content/path')

opener = mock_open(read_data='template: {{ti.task_id}}')
with patch('airflow.models.taskinstance.open', opener, create=True):
try:
Expand Down
3 changes: 1 addition & 2 deletions tests/models/test_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ def test_resolve_xcom_class_fallback_to_basexcom(self):
assert cls().serialize_value([1]) == b"[1]"

@conf_vars({("core", "enable_xcom_pickling"): "False"})
@conf_vars({("core", "xcom_backend"): "to be removed"})
def test_resolve_xcom_class_fallback_to_basexcom_no_config(self):
init = conf.get("core", "xcom_backend")
conf.remove_option("core", "xcom_backend")
cls = resolve_xcom_backend()
assert issubclass(cls, BaseXCom)
assert cls().serialize_value([1]) == b"[1]"
conf.set("core", "xcom_backend", init)
4 changes: 2 additions & 2 deletions tests/providers/amazon/aws/operators/test_s3_to_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import boto3
from moto import mock_s3

from airflow.configuration import conf
from airflow.models import DAG, TaskInstance
from airflow.providers.amazon.aws.operators.s3_to_sftp import S3ToSFTPOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.utils import timezone
from airflow.utils.timezone import datetime
from tests.test_utils.config import conf_vars

TASK_ID = 'test_s3_to_sftp'
BUCKET = 'test-s3-bucket'
Expand Down Expand Up @@ -71,9 +71,9 @@ def setUp(self):
self.s3_key = S3_KEY

@mock_s3
@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_s3_to_sftp_operation(self):
# Setting
conf.set("core", "enable_xcom_pickling", "True")
test_remote_file_content = \
"This is remote file content \n which is also multiline " \
"another line here \n this is last line. EOF"
Expand Down
35 changes: 15 additions & 20 deletions tests/security/test_kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,42 @@
import unittest
from argparse import Namespace

from airflow.configuration import conf
from airflow.security import kerberos
from airflow.security.kerberos import renew_from_kt
from tests.test_utils.config import conf_vars

KRB5_KTNAME = os.environ.get('KRB5_KTNAME')

@unittest.skipIf('KRB5_KTNAME' not in os.environ,
'Skipping Kerberos API tests due to missing KRB5_KTNAME')

@unittest.skipIf(KRB5_KTNAME is None, 'Skipping Kerberos API tests due to missing KRB5_KTNAME')
class TestKerberos(unittest.TestCase):
def setUp(self):

if not conf.has_section("kerberos"):
conf.add_section("kerberos")
conf.set("kerberos", "keytab",
os.environ['KRB5_KTNAME'])
keytab_from_cfg = conf.get("kerberos", "keytab")
self.args = Namespace(keytab=keytab_from_cfg, principal=None, pid=None,
self.args = Namespace(keytab=KRB5_KTNAME, principal=None, pid=None,
daemon=None, stdout=None, stderr=None, log_file=None)

@conf_vars({('kerberos', 'keytab'): KRB5_KTNAME})
def test_renew_from_kt(self):
"""
We expect no result, but a successful run. No more TypeError
"""
self.assertIsNone(renew_from_kt(principal=self.args.principal, # pylint: disable=no-member
keytab=self.args.keytab))

@conf_vars({('kerberos', 'keytab'): ''})
def test_args_from_cli(self):
"""
We expect no result, but a run with sys.exit(1) because keytab not exist.
"""
self.args.keytab = "test_keytab"

with conf_vars({('kerberos', 'keytab'): ''}):
with self.assertRaises(SystemExit) as err:
renew_from_kt(principal=self.args.principal, # pylint: disable=no-member
keytab=self.args.keytab)
with self.assertRaises(SystemExit) as err:
renew_from_kt(principal=self.args.principal, # pylint: disable=no-member
keytab=self.args.keytab)

with self.assertLogs(kerberos.log) as log:
self.assertIn(
'kinit: krb5_init_creds_set_keytab: Failed to find '
'[email protected] in keytab FILE:{} '
'(unknown enctype)'.format(self.args.keytab), log.output)
with self.assertLogs(kerberos.log) as log:
self.assertIn(
'kinit: krb5_init_creds_set_keytab: Failed to find '
'[email protected] in keytab FILE:{} '
'(unknown enctype)'.format(self.args.keytab), log.output)

self.assertEqual(err.exception.code, 1)
self.assertEqual(err.exception.code, 1)
Loading

0 comments on commit caa60b1

Please sign in to comment.