Skip to content

Commit

Permalink
Controller event handling pipeline
Browse files Browse the repository at this point in the history
This patch introduces ControllerPipeline which is an entry point
handler for the Kuryr-Kubernetes controller. ControllerPipeline
allows registering ResourceEventHandlers and ensures the proper
handler is called for each event that is passed to the
ControllerPipeline. Also it ensures the following behavior:

- multiple ResourceEventHandlers can be registered for the same
  resource type (OBJECT_KIND)

- failing handlers (i.e. ones that raise Exceptions) are retried
  until either the handler succeeds or a finite amount of time
  passes, in which case the most recent exception is logged

- in case there are multiple handlers registered for the same
  resource type, all such handlers are considered independent (i.e.
  if one handler fails, other handlers will still be called
  regardless; and the order in which such handlers are called is not
  determined)

- events for different Kubernetes objects can be handled concurrently

- events for the same Kubernetes object are handled sequentially in
  the order of arrival

Change-Id: Ib17e0c7a2790cdbc31be4f59d50972b43c272480
Partially-Implements: blueprint kuryr-k8s-integration
  • Loading branch information
ivc committed Nov 3, 2016
1 parent ac672fd commit f0e1c20
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 18 deletions.
Empty file.
64 changes: 64 additions & 0 deletions kuryr_kubernetes/controller/handlers/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from oslo_log import log as logging

from kuryr_kubernetes.handlers import asynchronous as h_async
from kuryr_kubernetes.handlers import dispatch as h_dis
from kuryr_kubernetes.handlers import k8s_base as h_k8s
from kuryr_kubernetes.handlers import logging as h_log
from kuryr_kubernetes.handlers import retry as h_retry

LOG = logging.getLogger(__name__)


class ControllerPipeline(h_dis.EventPipeline):
"""Serves as an entry point for controller Kubernetes events.
`ControllerPipeline` is an entry point handler for the Kuryr-Kubernetes
controller. `ControllerPipeline` allows registering
:class:`kuryr_kubernetes.handlers.k8s_base.ResourceEventHandler`s and
ensures the proper handler is called for each event that is passed to the
`ControllerPipeline`. Also it ensures the following behavior:
- multiple `ResourceEventHandler`s can be registered for the same
resource type (`OBJECT_KIND`)
- failing handlers (i.e. ones that raise `Exception`s) are retried
until either the handler succeeds or a finite amount of time passes,
in which case the most recent exception is logged
- in case there are multiple handlers registered for the same resource
type, all such handlers are considered independent (i.e. if one
handler fails, other handlers will still be called regardless; and the
order in which such handlers are called is not determined)
- events for different Kubernetes objects can be handled concurrently
- events for the same Kubernetes object are handled sequentially in
the order of arrival
"""

def __init__(self, thread_group):
self._tg = thread_group
super(ControllerPipeline, self).__init__()

def _wrap_consumer(self, consumer):
# TODO(ivc): tune retry interval/timeout
return h_log.LogExceptions(h_retry.Retry(consumer))

def _wrap_dispatcher(self, dispatcher):
return h_log.LogExceptions(h_async.Async(dispatcher, self._tg,
h_k8s.object_uid))
33 changes: 15 additions & 18 deletions kuryr_kubernetes/controller/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants
from kuryr_kubernetes.handlers import asynchronous as h_async
from kuryr_kubernetes.handlers import dispatch as h_dis
from kuryr_kubernetes.controller.handlers import pipeline as h_pipeline
from kuryr_kubernetes.handlers import k8s_base as h_k8s
from kuryr_kubernetes.handlers import retry as h_retry
from kuryr_kubernetes import watcher

LOG = logging.getLogger(__name__)
Expand All @@ -38,7 +36,7 @@ def __init__(self):
super(KuryrK8sService, self).__init__()

class DummyHandler(h_k8s.ResourceEventHandler):
OBJECT_KIND = constants.K8S_OBJ_NAMESPACE
# TODO(ivc): remove once real handlers are ready

def __init__(self):
self.event_seq = 0
Expand All @@ -65,24 +63,23 @@ def on_present(self, event):
LOG.debug("present: %s",
event['object']['metadata']['selfLink'])

class DummyPipeline(h_dis.EventPipeline):
def __init__(self, thread_group):
self._tg = thread_group
super(DummyPipeline, self).__init__()
class DummyPodHandler(DummyHandler):
OBJECT_KIND = constants.K8S_OBJ_POD

def _wrap_consumer(self, consumer):
retry = h_retry.Retry(consumer)
return super(DummyPipeline, self)._wrap_consumer(retry)
class DummyServiceHandler(DummyHandler):
OBJECT_KIND = constants.K8S_OBJ_SERVICE

def _wrap_dispatcher(self, dispatcher):
handler = super(DummyPipeline, self)._wrap_dispatcher(
dispatcher)
return h_async.Async(handler, self._tg, h_k8s.object_uid)
class DummyEndpointsHandler(DummyHandler):
OBJECT_KIND = constants.K8S_OBJ_ENDPOINTS

pipeline = DummyPipeline(self.tg)
pipeline.register(DummyHandler())
pipeline = h_pipeline.ControllerPipeline(self.tg)
self.watcher = watcher.Watcher(pipeline, self.tg)
self.watcher.add(constants.K8S_API_NAMESPACES)
# TODO(ivc): pluggable resource/handler registration
for resource in ["pods", "services", "endpoints"]:
self.watcher.add("%s/%s" % (constants.K8S_API_BASE, resource))
pipeline.register(DummyPodHandler())
pipeline.register(DummyServiceHandler())
pipeline.register(DummyEndpointsHandler())

def start(self):
LOG.info(_LI("Service '%s' starting"), self.__class__.__name__)
Expand Down
Empty file.
60 changes: 60 additions & 0 deletions kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import mock

from kuryr_kubernetes.controller.handlers import pipeline as h_pipeline
from kuryr_kubernetes.handlers import dispatch as h_dis
from kuryr_kubernetes.handlers import k8s_base as h_k8s
from kuryr_kubernetes.tests import base as test_base


class TestControllerPipeline(test_base.TestCase):
@mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions')
@mock.patch('kuryr_kubernetes.handlers.retry.Retry')
def test_wrap_consumer(self, m_retry_type, m_logging_type):
consumer = mock.sentinel.consumer
retry_handler = mock.sentinel.retry_handler
logging_handler = mock.sentinel.logging_handler
m_retry_type.return_value = retry_handler
m_logging_type.return_value = logging_handler
thread_group = mock.sentinel.thread_group

with mock.patch.object(h_dis.EventPipeline, '__init__'):
pipeline = h_pipeline.ControllerPipeline(thread_group)
ret = pipeline._wrap_consumer(consumer)

self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(retry_handler)
m_retry_type.assert_called_with(consumer)

@mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions')
@mock.patch('kuryr_kubernetes.handlers.asynchronous.Async')
def test_wrap_dispatcher(self, m_async_type, m_logging_type):
dispatcher = mock.sentinel.dispatcher
async_handler = mock.sentinel.async_handler
logging_handler = mock.sentinel.logging_handler
m_async_type.return_value = async_handler
m_logging_type.return_value = logging_handler
thread_group = mock.sentinel.thread_group

with mock.patch.object(h_dis.EventPipeline, '__init__'):
pipeline = h_pipeline.ControllerPipeline(thread_group)
ret = pipeline._wrap_dispatcher(dispatcher)

self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(async_handler)
m_async_type.assert_called_with(dispatcher, thread_group,
h_k8s.object_uid)

0 comments on commit f0e1c20

Please sign in to comment.