diff --git a/README.rst b/README.rst index 5a31463..4104bce 100644 --- a/README.rst +++ b/README.rst @@ -80,6 +80,14 @@ can also specify remote logger by passing the options. # for remote fluent logger = sender.FluentSender('app', host='host', port=24224) +The logger will prefer using IPv4 and fall back to IPv6 by default. Should you wish to prefer +IPv6 and fall back to IPv4, specify `prefer_ipv6` option as `True`. + +.. code:: python + + # for remote fluent preferring IPv6, falling back to IPv4 + logger = sender.FluentSender('app', host='host', port=24224, prefer_ipv6=True) + For sending event, call `emit` method with your event. Following example will send the event to fluentd, with tag 'app.follow' and the attributes 'from' and 'to'. diff --git a/fluent/sender.py b/fluent/sender.py index 72e8c36..16924da 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -54,6 +54,7 @@ def __init__(self, buffer_overflow_handler=None, nanosecond_precision=False, msgpack_kwargs=None, + prefer_ipv6=False, **kwargs): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. @@ -69,6 +70,8 @@ def __init__(self, self.msgpack_kwargs = {} if msgpack_kwargs is None else msgpack_kwargs self.socket = None + self.prefer_ipv6 = prefer_ipv6 + self.ip_addr_family = None self.pendings = None self.lock = threading.Lock() self._closed = False @@ -120,6 +123,20 @@ def close(self): self._close() self.pendings = None + def _find_ip_addr_family(self): + if not self.prefer_ipv6: + try: + socket.getaddrinfo(self.host, None, socket.AF_INET) + return socket.AF_INET + except socket.error: + return socket.AF_INET6 + else: + try: + socket.getaddrinfo(self.host, None, socket.AF_INET6) + return socket.AF_INET6 + except socket.error: + return socket.AF_INET + def _make_packet(self, label, timestamp, data): if label: tag = '.'.join((self.tag, label)) @@ -180,8 +197,13 @@ def _check_recv_side(self): self.socket.settimeout(self.timeout) def _send_data(self, bytes_): - # reconnect if possible - self._reconnect() + try: + # reconnect if possible + self._reconnect() + except Exception as e: + # try once more but redetermine v4/v6 capability + self.ip_addr_family = None + self._reconnect() # send message bytes_to_send = len(bytes_) bytes_sent = 0 @@ -201,7 +223,10 @@ def _reconnect(self): sock.settimeout(self.timeout) sock.connect(self.host[len('unix://'):]) else: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if not self.ip_addr_family: + self.ip_addr_family = self._find_ip_addr_family() + sock = socket.socket(self.ip_addr_family, + socket.SOCK_STREAM) sock.settimeout(self.timeout) # This might be controversial and may need to be removed sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) diff --git a/tests/mockserver.py b/tests/mockserver.py index 77ecdd3..eab105c 100644 --- a/tests/mockserver.py +++ b/tests/mockserver.py @@ -16,7 +16,7 @@ class MockRecvServer(threading.Thread): Single threaded server accepts one connection and recv until EOF. """ - def __init__(self, host='localhost', port=0): + def __init__(self, host='localhost', port=0, inet_family=socket.AF_INET): super(MockRecvServer, self).__init__() if host.startswith('unix://'): @@ -24,13 +24,13 @@ def __init__(self, host='localhost', port=0): self.socket_type = socket.SOCK_STREAM self.socket_addr = host[len('unix://'):] else: - self.socket_proto = socket.AF_INET + self.socket_proto = inet_family self.socket_type = socket.SOCK_STREAM self.socket_addr = (host, port) self._sock = socket.socket(self.socket_proto, self.socket_type) self._sock.bind(self.socket_addr) - if self.socket_proto == socket.AF_INET: + if self.socket_proto == inet_family: self.port = self._sock.getsockname()[1] self._sock.listen(1) @@ -76,7 +76,7 @@ def close(self): pass try: - conn = socket.socket(socket.AF_INET, + conn = socket.socket(self.socket_proto, socket.SOCK_STREAM) try: conn.connect((self.socket_addr[0], self.port)) diff --git a/tests/test_sender.py b/tests/test_sender.py index 1c0fbe9..330ef82 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -8,6 +8,7 @@ import unittest from shutil import rmtree from tempfile import mkdtemp +from unittest.mock import patch import msgpack @@ -291,6 +292,103 @@ def recv(self, bufsize, flags=0): finally: self._sender.socket = old_sock + def test_ipv6_preferred_but_not_avail(self): + real_getaddrinfo = socket.getaddrinfo + + def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): + if family == socket.AF_INET6: + raise socket.gaierror("mock: IPv4 Only") + else: + return real_getaddrinfo(host, port, family, type, proto, flags) + with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo): + sender = fluent.sender.FluentSender(tag='test', + host='localhost', + port=self._server.port, + prefer_ipv6=True) + sender.emit('foo', {'bar': 'baz'}) + sender._close() + data = self.get_data() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2], {'bar': 'baz'}) + + def test_ipv6_disappeared(self): + sender = fluent.sender.FluentSender(tag='test', + host='127.0.0.1', + port=self._server.port, + prefer_ipv6=True) + # here we cause sender to believe it already determined IPv6, but want + # it to re-test when IPv6 stops working + sender.ip_addr_family = socket.AF_INET6 + sender.emit('foo', {'bar': 'baz'}) + sender._close() + data = self.get_data() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2], {'bar': 'baz'}) + + def test_ipv6_only(self): + # Test if our host supports IPv6 before running this test + try: + socket.gethostbyaddr('::1') + except socket.herror: + self.skipTest("Host does not support IPv6, cannot run this test") + + self.tearDown() + + real_getaddrinfo = socket.getaddrinfo + + def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): + if family == socket.AF_INET: + raise socket.gaierror("mock: IPv6 Only") + else: + return real_getaddrinfo(host, port, family, type, proto, flags) + + self._server = mockserver.MockRecvServer(host='localhost', + inet_family=socket.AF_INET6) + + + with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo): + sender = fluent.sender.FluentSender(tag='test', + host='localhost', + port=self._server.port) + sender.emit('foo', {'bar': 'baz'}) + sender._close() + data = self.get_data() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2], {'bar': 'baz'}) + + def test_ipv6_preferred(self): + # Test if our host supports IPv6 before running this test + try: + socket.gethostbyaddr('::1') + except socket.herror: + self.skipTest("Host does not support IPv6, cannot run this test") + + self.tearDown() + + real_getaddrinfo = socket.getaddrinfo + + def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): + if family == socket.AF_INET: + raise socket.gaierror("mock: IPv6 Only") + else: + return real_getaddrinfo(host, port, family, type, proto, flags) + + self._server = mockserver.MockRecvServer(host='localhost', + inet_family=socket.AF_INET6) + + + with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo): + sender = fluent.sender.FluentSender(tag='test', + host='localhost', + port=self._server.port, + prefer_ipv6=True) + sender.emit('foo', {'bar': 'baz'}) + sender._close() + data = self.get_data() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2], {'bar': 'baz'}) + + @unittest.skipIf(sys.platform == "win32", "Unix socket not supported") def test_unix_socket(self): self.tearDown()