This repository has been archived by the owner on Jul 15, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_pubsub_pipeline.py
143 lines (111 loc) · 4.08 KB
/
test_pubsub_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import json
from unittest.mock import MagicMock, DEFAULT
import pytest
from google.api_core.exceptions import DeadlineExceeded
from pubsub_pipeline import PubSubPipeline, BulkPubSubPipeline
def _mock_future(on_add_done_callback=None):
mock_future_ = MagicMock()
# python mock magic: when side_effect is a callable,
# it will be called with the same arguments as the
# mock function
if on_add_done_callback is not None:
mock_future_.add_done_callback.side_effect = on_add_done_callback
return mock_future_
def _mock_publisher(on_publish=None):
publisher = MagicMock()
publisher.topic_path.return_value = 'some/topic/path'
if on_publish is not None:
publisher.publish.side_effect = on_publish
return publisher
def _message_data():
return {
"data": "This is some json data that is to processed",
"nested": {
"nestedData": "This is just some more data"
}
}
def processor(_):
return _
def _mock_message():
mock_message = MagicMock()
mock_message.message.data = json.dumps(_message_data()).encode()
mock_message.ack_id = 'some_ack_id'
return mock_message
def _mock_subscriber(received_messages=(_mock_message(),)):
subscriber = MagicMock()
subscriber.subscription_path.return_value = 'some/subscription/path'
subscriber.pull.return_value.received_messages = list(received_messages)
return subscriber
@pytest.mark.parametrize('pipeline', [PubSubPipeline, BulkPubSubPipeline])
def test_message_is_acknowledged_on_successful_publish(pipeline):
def on_publish(topic_path, data):
assert topic_path == 'some/topic/path'
assert isinstance(data, bytes)
result = json.loads(data)
assert result == _message_data()
return mock_future
subscriber = _mock_subscriber()
publisher = _mock_publisher(on_publish)
def on_add_done_callback(callback):
callback(mock_future)
subscriber.acknowledge.assert_called_with(
'some/subscription/path',
['some_ack_id']
)
mock_future = _mock_future(on_add_done_callback)
pipeline(
google_cloud_project='',
incoming_subscription='',
outgoing_topic='',
processor=processor,
subscriber=subscriber,
publisher=publisher
).process(max_processed_messages=1)
@pytest.mark.parametrize('pipeline', [PubSubPipeline, BulkPubSubPipeline])
def test_message_is_not_acknowledged_on_failure(pipeline):
def on_add_done_callback(callback):
callback(mock_future)
subscriber.acknowledge.assert_not_called()
mock_future = _mock_future(on_add_done_callback)
mock_future.result.side_effect = Exception('Boom!')
subscriber = _mock_subscriber()
publisher = _mock_publisher()
pipeline(
google_cloud_project='',
incoming_subscription='',
outgoing_topic='',
processor=processor,
subscriber=subscriber,
publisher=publisher
).process(max_processed_messages=1)
@pytest.mark.parametrize('pipeline', [PubSubPipeline, BulkPubSubPipeline])
def test_ack_deadline_is_not_respected(pipeline):
subscriber = _mock_subscriber()
publisher = _mock_publisher()
def on_add_done_callback(callback):
callback(mock_future)
subscriber.acknowledge.assert_called_with(
'some/subscription/path',
['some_ack_id']
)
mock_future = _mock_future(on_add_done_callback)
class Pull:
do_raise = True
def __call__(self, *args, **kwargs):
if self.do_raise:
self.do_raise = False
raise DeadlineExceeded("")
else:
result = MagicMock()
result.received_messages = [_mock_message()]
return result
subscriber.pull.side_effect = Pull()
pipeline(
google_cloud_project='',
incoming_subscription='',
outgoing_topic='',
processor=processor,
subscriber=subscriber,
publisher=publisher,
deadline_exceeded_retry_wait_secs=0
).process(max_processed_messages=1)