From 6f9b0b0dcb399000c847f6753ccbde6bb27c6080 Mon Sep 17 00:00:00 2001 From: valentin benozillo Date: Tue, 24 Apr 2018 15:28:46 +0000 Subject: [PATCH 1/4] Add compatibility for Ipv6 address Signed-off-by: Matt Cipperly --- fluent/sender.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/fluent/sender.py b/fluent/sender.py index 72e8c36..69fdd1c 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -120,6 +120,13 @@ def close(self): self._close() self.pendings = None + def _is_ipv4_host(self): + try: + socket.getaddrinfo(self.host, None, socket.AF_INET) + return True + except socket.error: + return False + def _make_packet(self, label, timestamp, data): if label: tag = '.'.join((self.tag, label)) @@ -201,7 +208,12 @@ def _reconnect(self): sock.settimeout(self.timeout) sock.connect(self.host[len('unix://'):]) else: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if self._is_ipv4_host(): + sock = socket.socket(socket.AF_INET, + socket.SOCK_STREAM) + else: + sock = socket.socket(socket.AF_INET6, + socket.SOCK_STREAM) sock.settimeout(self.timeout) # This might be controversial and may need to be removed sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) From 4a30c098444aa3ac026fd36f878c728d4e7d1a35 Mon Sep 17 00:00:00 2001 From: Matt Cipperly Date: Wed, 21 Apr 2021 14:12:46 -0400 Subject: [PATCH 2/4] add test for ipv6 functionality Signed-off-by: Matt Cipperly --- tests/mockserver.py | 8 ++++---- tests/test_sender.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/tests/mockserver.py b/tests/mockserver.py index 77ecdd3..eab105c 100644 --- a/tests/mockserver.py +++ b/tests/mockserver.py @@ -16,7 +16,7 @@ class MockRecvServer(threading.Thread): Single threaded server accepts one connection and recv until EOF. """ - def __init__(self, host='localhost', port=0): + def __init__(self, host='localhost', port=0, inet_family=socket.AF_INET): super(MockRecvServer, self).__init__() if host.startswith('unix://'): @@ -24,13 +24,13 @@ def __init__(self, host='localhost', port=0): self.socket_type = socket.SOCK_STREAM self.socket_addr = host[len('unix://'):] else: - self.socket_proto = socket.AF_INET + self.socket_proto = inet_family self.socket_type = socket.SOCK_STREAM self.socket_addr = (host, port) self._sock = socket.socket(self.socket_proto, self.socket_type) self._sock.bind(self.socket_addr) - if self.socket_proto == socket.AF_INET: + if self.socket_proto == inet_family: self.port = self._sock.getsockname()[1] self._sock.listen(1) @@ -76,7 +76,7 @@ def close(self): pass try: - conn = socket.socket(socket.AF_INET, + conn = socket.socket(self.socket_proto, socket.SOCK_STREAM) try: conn.connect((self.socket_addr[0], self.port)) diff --git a/tests/test_sender.py b/tests/test_sender.py index 1c0fbe9..e389698 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -8,6 +8,7 @@ import unittest from shutil import rmtree from tempfile import mkdtemp +from unittest.mock import patch import msgpack @@ -291,6 +292,37 @@ def recv(self, bufsize, flags=0): finally: self._sender.socket = old_sock + def test_ipv6_only(self): + # Test if our host supports IPv6 before running this test + try: + socket.gethostbyaddr('::1') + except socket.herror: + self.skipTest("Host does not support IPv6, cannot run this test") + + self.tearDown() + + real_getaddrinfo = socket.getaddrinfo + + def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): + if family == socket.AF_INET: + raise socket.gaierror("mock: IPv6 Only") + else: + return real_getaddrinfo(host, port, family, type, proto, flags) + + self._server = mockserver.MockRecvServer(host='localhost', + inet_family=socket.AF_INET6) + + + with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo): + sender = fluent.sender.FluentSender(tag='test', + host='localhost', + port=self._server.port) + sender.emit('foo', {'bar': 'baz'}) + sender._close() + data = self.get_data() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2], {'bar': 'baz'}) + @unittest.skipIf(sys.platform == "win32", "Unix socket not supported") def test_unix_socket(self): self.tearDown() From 291a76ad222ba9daa5c1898008ec62020fc7c6d1 Mon Sep 17 00:00:00 2001 From: Matt Cipperly Date: Tue, 11 May 2021 17:01:44 -0400 Subject: [PATCH 3/4] add prefer_ipv6 option, remember if we are in v4 or v6 mode Signed-off-by: Matt Cipperly --- README.rst | 8 +++++++ fluent/sender.py | 32 +++++++++++++++++---------- tests/test_sender.py | 52 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 12 deletions(-) diff --git a/README.rst b/README.rst index 5a31463..4104bce 100644 --- a/README.rst +++ b/README.rst @@ -80,6 +80,14 @@ can also specify remote logger by passing the options. # for remote fluent logger = sender.FluentSender('app', host='host', port=24224) +The logger will prefer using IPv4 and fall back to IPv6 by default. Should you wish to prefer +IPv6 and fall back to IPv4, specify `prefer_ipv6` option as `True`. + +.. code:: python + + # for remote fluent preferring IPv6, falling back to IPv4 + logger = sender.FluentSender('app', host='host', port=24224, prefer_ipv6=True) + For sending event, call `emit` method with your event. Following example will send the event to fluentd, with tag 'app.follow' and the attributes 'from' and 'to'. diff --git a/fluent/sender.py b/fluent/sender.py index 69fdd1c..fe514d7 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -54,6 +54,7 @@ def __init__(self, buffer_overflow_handler=None, nanosecond_precision=False, msgpack_kwargs=None, + prefer_ipv6=False, **kwargs): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. @@ -69,6 +70,8 @@ def __init__(self, self.msgpack_kwargs = {} if msgpack_kwargs is None else msgpack_kwargs self.socket = None + self.prefer_ipv6 = prefer_ipv6 + self.ip_addr_family = None self.pendings = None self.lock = threading.Lock() self._closed = False @@ -120,12 +123,19 @@ def close(self): self._close() self.pendings = None - def _is_ipv4_host(self): - try: - socket.getaddrinfo(self.host, None, socket.AF_INET) - return True - except socket.error: - return False + def _find_ip_addr_family(self): + if not self.prefer_ipv6: + try: + socket.getaddrinfo(self.host, None, socket.AF_INET) + return socket.AF_INET + except socket.error: + return socket.AF_INET6 + else: + try: + socket.getaddrinfo(self.host, None, socket.AF_INET6) + return socket.AF_INET6 + except socket.error: + return socket.AF_INET def _make_packet(self, label, timestamp, data): if label: @@ -208,12 +218,10 @@ def _reconnect(self): sock.settimeout(self.timeout) sock.connect(self.host[len('unix://'):]) else: - if self._is_ipv4_host(): - sock = socket.socket(socket.AF_INET, - socket.SOCK_STREAM) - else: - sock = socket.socket(socket.AF_INET6, - socket.SOCK_STREAM) + if not self.ip_addr_family: + self.ip_addr_family = self._find_ip_addr_family() + sock = socket.socket(self.ip_addr_family, + socket.SOCK_STREAM) sock.settimeout(self.timeout) # This might be controversial and may need to be removed sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) diff --git a/tests/test_sender.py b/tests/test_sender.py index e389698..c75f4e7 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -292,6 +292,25 @@ def recv(self, bufsize, flags=0): finally: self._sender.socket = old_sock + def test_ipv6_preferred_but_not_avail(self): + real_getaddrinfo = socket.getaddrinfo + + def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): + if family == socket.AF_INET6: + raise socket.gaierror("mock: IPv4 Only") + else: + return real_getaddrinfo(host, port, family, type, proto, flags) + with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo): + sender = fluent.sender.FluentSender(tag='test', + host='localhost', + port=self._server.port, + prefer_ipv6=True) + sender.emit('foo', {'bar': 'baz'}) + sender._close() + data = self.get_data() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2], {'bar': 'baz'}) + def test_ipv6_only(self): # Test if our host supports IPv6 before running this test try: @@ -323,6 +342,39 @@ def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): self.assertEqual(len(data), 1) self.assertEqual(data[0][2], {'bar': 'baz'}) + def test_ipv6_preferred(self): + # Test if our host supports IPv6 before running this test + try: + socket.gethostbyaddr('::1') + except socket.herror: + self.skipTest("Host does not support IPv6, cannot run this test") + + self.tearDown() + + real_getaddrinfo = socket.getaddrinfo + + def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): + if family == socket.AF_INET: + raise socket.gaierror("mock: IPv6 Only") + else: + return real_getaddrinfo(host, port, family, type, proto, flags) + + self._server = mockserver.MockRecvServer(host='localhost', + inet_family=socket.AF_INET6) + + + with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo): + sender = fluent.sender.FluentSender(tag='test', + host='localhost', + port=self._server.port, + prefer_ipv6=True) + sender.emit('foo', {'bar': 'baz'}) + sender._close() + data = self.get_data() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2], {'bar': 'baz'}) + + @unittest.skipIf(sys.platform == "win32", "Unix socket not supported") def test_unix_socket(self): self.tearDown() From a1017fa8064885733ac7db759bc7fa4a808fdce3 Mon Sep 17 00:00:00 2001 From: Matt Cipperly Date: Tue, 11 May 2021 18:20:29 -0400 Subject: [PATCH 4/4] redetermine IPv4/v6 capabilities on failure to connect Signed-off-by: Matt Cipperly --- fluent/sender.py | 9 +++++++-- tests/test_sender.py | 14 ++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/fluent/sender.py b/fluent/sender.py index fe514d7..16924da 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -197,8 +197,13 @@ def _check_recv_side(self): self.socket.settimeout(self.timeout) def _send_data(self, bytes_): - # reconnect if possible - self._reconnect() + try: + # reconnect if possible + self._reconnect() + except Exception as e: + # try once more but redetermine v4/v6 capability + self.ip_addr_family = None + self._reconnect() # send message bytes_to_send = len(bytes_) bytes_sent = 0 diff --git a/tests/test_sender.py b/tests/test_sender.py index c75f4e7..330ef82 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -311,6 +311,20 @@ def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): self.assertEqual(len(data), 1) self.assertEqual(data[0][2], {'bar': 'baz'}) + def test_ipv6_disappeared(self): + sender = fluent.sender.FluentSender(tag='test', + host='127.0.0.1', + port=self._server.port, + prefer_ipv6=True) + # here we cause sender to believe it already determined IPv6, but want + # it to re-test when IPv6 stops working + sender.ip_addr_family = socket.AF_INET6 + sender.emit('foo', {'bar': 'baz'}) + sender._close() + data = self.get_data() + self.assertEqual(len(data), 1) + self.assertEqual(data[0][2], {'bar': 'baz'}) + def test_ipv6_only(self): # Test if our host supports IPv6 before running this test try: