[qpid-proton] branch master updated (981ec3e -> ebfe89f)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[qpid-proton] branch master updated (981ec3e -> ebfe89f)

Andrew Stitcher-2
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git.


    from 981ec3e  PROTON-2319: A (hopefully) final piece of python 2.6 detritus
     new a7257c9  PROTON-2314: [Python] reconnect/failover makeover - Trivial typos - Fixed failover to interact more sensibly with reconnect delays - Added useful keyword parameters to Backoff class - Allow specification of backoff delay by using an iterator/generator - Improved connection logging a bit   - Can now see connection messages without all events - Remove unused (and now obsolete) address keyword to Container.connect - Added some failover tests - Fixed tests to allow logging I [...]
     new ebfe89f  PROTON-2315: [Python] Support multiple connection URLs in the BlockingConnection class - [Originated by mprahl. modified by astitcher] - closes #243

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 python/proton/_reactor.py            | 191 ++++++++++++++++++++---------------
 python/proton/_utils.py              |  29 ++++--
 python/tests/proton_tests/main.py    |   3 +-
 python/tests/proton_tests/reactor.py |  68 ++++++++++---
 4 files changed, 192 insertions(+), 99 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

[qpid-proton] 01/02: PROTON-2314: [Python] reconnect/failover makeover - Trivial typos - Fixed failover to interact more sensibly with reconnect delays - Added useful keyword parameters to Backoff class - Allow specification of backoff delay by using an iterator/generator - Improved connection logging a bit - Can now see connection messages without all events - Remove unused (and now obsolete) address keyword to Container.connect - Added some failover tests - Fixed tests to allow logging INFO messages

Andrew Stitcher-2
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit a7257c9c8e2500d126616aaf501e4819f48a9a36
Author: Andrew Stitcher <[hidden email]>
AuthorDate: Fri Dec 18 16:00:59 2020 -0500

    PROTON-2314: [Python] reconnect/failover makeover
    - Trivial typos
    - Fixed failover to interact more sensibly with reconnect delays
    - Added useful keyword parameters to Backoff class
    - Allow specification of backoff delay by using an iterator/generator
    - Improved connection logging a bit
      - Can now see connection messages without all events
    - Remove unused (and now obsolete) address keyword to Container.connect
    - Added some failover tests
    - Fixed tests to allow logging INFO messages
---
 python/proton/_reactor.py            | 191 ++++++++++++++++++++---------------
 python/proton/_utils.py              |   2 +-
 python/tests/proton_tests/main.py    |   3 +-
 python/tests/proton_tests/reactor.py |  68 ++++++++++---
 4 files changed, 171 insertions(+), 93 deletions(-)

diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
index 3a82e9b..fdffbaf 100644
--- a/python/proton/_reactor.py
+++ b/python/proton/_reactor.py
@@ -885,7 +885,7 @@ class Acceptor(Handler):
         s = event.selectable
 
         sock, name = IO.accept(self._selectable)
-        _logger.debug("Accepted connection from %s", name)
+        _logger.info("Accepted connection from %s", name)
 
         r = self._reactor
         handler = self._handler or r.handler
@@ -902,7 +902,73 @@ class Acceptor(Handler):
         t._selectable = s
         IOHandler.update(t, s, r.now)
 
-class Connector(Handler):
+
+def delay_iter(initial=0.1, factor=2.0, max_delay=10.0, max_tries=None):
+    """
+    iterator yielding the next delay in the sequence of delays. The first
+    delay is 0 seconds, the second 0.1 seconds, and each subsequent
+    call to :meth:`next` doubles the next delay period until a
+    maximum value of 10 seconds is reached.
+    """
+    yield 0.0
+    tries = 1
+    delay = initial
+    while max_tries is None or tries < max_tries:
+        yield delay
+        tries += 1
+        delay = min(max_delay, factor * delay)
+
+
+class Backoff(object):
+    """
+    A reconnect strategy involving an increasing delay between
+    retries, up to a maximum or 10 seconds. Repeated calls
+    to :meth:`next` returns a value for the next delay, starting
+    with an initial value of 0 seconds.
+    """
+
+    def __init__(self, **kwargs):
+        self.kwargs = kwargs
+        self.iter = delay_iter(**self.kwargs)
+
+    def __iter__(self):
+        return self.iter
+
+
+def make_backoff_wrapper(backoff):
+    """
+    Make a wrapper for a backoff object:
+    If the object conforms to the old protocol (has reset and next methods) then
+    wrap it in an iterable that returns an iterator suitable for the new backoff approach
+    otherwise assume it is fine as it is!
+    :param backoff:
+    :return:
+    """
+    class WrappedBackoff(object):
+        def __init__(self, backoff):
+            self.backoff = backoff
+
+        def __iter__(self):
+            self.backoff.reset()
+            return self
+
+        def __next__(self):
+            return self.backoff.next()
+    if hasattr(backoff, 'reset') and hasattr(backoff, 'next'):
+        return WrappedBackoff(backoff)
+    else:
+        return backoff
+
+
+class Urls(object):
+    def __init__(self, values):
+        self.values = [Url(v) for v in values]
+
+    def __iter__(self):
+        return iter(self.values)
+
+
+class _Connector(Handler):
     """
     Internal handler that triggers the necessary socket connect for an
     opened connection.
@@ -922,14 +988,15 @@ class Connector(Handler):
         self.virtual_host = None
         self.ssl_sni = None
         self.max_frame_size = None
+        self._connect_sequence = None
+        self._next_url = None
 
-    def _connect(self, connection):
-        url = self.address.next()
+    def _connect(self, connection, url):
         connection.url = url
         # if virtual-host not set, use host from address as default
         if self.virtual_host is None:
             connection.hostname = url.host
-        _logger.debug("connecting to %r..." % url)
+        _logger.info("Connecting to %r..." % url)
 
         transport = Transport()
         if self.sasl_enabled:
@@ -957,86 +1024,55 @@ class Connector(Handler):
             transport.max_frame_size = self.max_frame_size
 
     def on_connection_local_open(self, event):
-        self._connect(event.connection)
+        if self.reconnect is None:
+            self._connect_sequence = ((delay, url) for delay in delay_iter() for url in self.address)
+        elif self.reconnect is False:
+            self._connect_sequence = ((delay, url) for delay in delay_iter(max_tries=1) for url in self.address)
+        else:
+            self._connect_sequence = ((delay, url) for delay in self.reconnect for url in self.address)
+        _, url = next(self._connect_sequence) # Ignore delay as we assume first delay must be 0
+        self._connect(event.connection, url)
 
     def on_connection_remote_open(self, event):
-        _logger.debug("connected to %s" % event.connection.hostname)
-        if self.reconnect:
-            self.reconnect.reset()
+        _logger.info("Connected to %s" % event.connection.hostname)
+        if self.reconnect is None:
+            self._connect_sequence = ((delay, url) for delay in delay_iter() for url in self.address)
+        elif self.reconnect:
+            self._connect_sequence = ((delay, url) for delay in self.reconnect for url in self.address)
+        else:
+            self._connect_sequence = None # Help take out the garbage
 
     def on_transport_closed(self, event):
-        if self.connection is None: return
-        if self.connection.state & Endpoint.LOCAL_ACTIVE:
+        if self.connection is None:
+            return
 
-            if self.reconnect:
+        if not self.connection.state & Endpoint.LOCAL_ACTIVE:
+            _logger.info("Disconnected, already closed")
+        elif self.reconnect is False:
+            _logger.info("Disconnected, reconnect disabled")
+        else:
+            try:
                 event.transport.unbind()
-                delay = self.reconnect.next()
+                delay, url = next(self._connect_sequence)
                 if delay == 0:
-                    _logger.info("Disconnected, reconnecting...")
-                    self._connect(self.connection)
+                    _logger.info("Disconnected, reconnecting immediately...")
+                    self._connect(self.connection, url)
                     return
                 else:
                     _logger.info("Disconnected will try to reconnect after %s seconds" % delay)
+                    self._next_url = url
                     event.reactor.schedule(delay, self)
                     return
-            else:
-                _logger.debug("Disconnected")
+            except StopIteration:
+                _logger.info("Disconnected, giving up retrying")
+
         # See connector.cpp: conn.free()/pn_connection_release() here?
         self.connection = None
 
     def on_timer_task(self, event):
-        self._connect(self.connection)
-
-
-class Backoff(object):
-    """
-    A reconnect strategy involving an increasing delay between
-    retries, up to a maximum or 10 seconds. Repeated calls
-    to :meth:`next` returns a value for the next delay, starting
-    with an initial value of 0 seconds.
-    """
-
-    def __init__(self):
-        self.delay = 0
-
-    def reset(self):
-        """
-        Reset the backoff delay to 0 seconds.
-        """
-        self.delay = 0
-
-    def next(self):
-        """
-        Start the next delay in the sequence of delays. The first
-        delay is 0 seconds, the second 0.1 seconds, and each subsequent
-        call to :meth:`next` doubles the next delay period until a
-        maximum value of 10 seconds is reached.
-
-        :return: The next delay in seconds.
-        :rtype: ``float``
-        """
-        current = self.delay
-        if current == 0:
-            self.delay = 0.1
-        else:
-            self.delay = min(10, 2 * current)
-        return current
-
-
-class Urls(object):
-    def __init__(self, values):
-        self.values = [Url(v) for v in values]
-        self.i = iter(self.values)
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        try:
-            return next(self.i)
-        except StopIteration:
-            self.i = iter(self.values)
-            return next(self.i)
+        if self._next_url:
+            self._connect(self.connection, self._next_url)
+            self._next_url = None
 
 
 class SSLConfig(object):
@@ -1126,7 +1162,7 @@ class Container(Reactor):
         for it as follows:
 
             1.  The location set in the environment variable ``MESSAGING_CONNECT_FILE``
-            2.  ``.connect.json``
+            2.  ``./connect.json``
             3.  ``~/.config/messaging/connect.json``
             4.  ``/etc/messaging/connect.json``
 
@@ -1250,14 +1286,14 @@ class Container(Reactor):
         else:
             return self._connect(url=url, urls=urls, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=ssl_domain, **kwargs)
 
-    def _connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
+    def _connect(self, url=None, urls=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
         conn = self.connection(handler)
         conn.container = self.container_id or str(_generate_uuid())
         conn.offered_capabilities = kwargs.get('offered_capabilities')
         conn.desired_capabilities = kwargs.get('desired_capabilities')
         conn.properties = kwargs.get('properties')
 
-        connector = Connector(conn)
+        connector = _Connector(conn)
         connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
         connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
         connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
@@ -1275,16 +1311,13 @@ class Container(Reactor):
             connector.address = Urls([url])
         elif urls:
             connector.address = Urls(urls)
-        elif address:
-            connector.address = address
         else:
-            raise ValueError("One of url, urls or address required")
+            raise ValueError("One of url or urls required")
         if heartbeat:
             connector.heartbeat = heartbeat
-        if reconnect:
-            connector.reconnect = reconnect
-        elif reconnect is None:
-            connector.reconnect = Backoff()
+
+        connector.reconnect = make_backoff_wrapper(reconnect)
+
         # use container's default client domain if none specified.  This is
         # only necessary of the URL specifies the "amqps:" scheme
         connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
diff --git a/python/proton/_utils.py b/python/proton/_utils.py
index 8a10b0f..a9812bb 100644
--- a/python/proton/_utils.py
+++ b/python/proton/_utils.py
@@ -523,7 +523,7 @@ class AtomicCount(object):
     def next(self):
         """Get the next value"""
         self.lock.acquire()
-        self.count += self.step;
+        self.count += self.step
         result = self.count
         self.lock.release()
         return result
diff --git a/python/tests/proton_tests/main.py b/python/tests/proton_tests/main.py
index bb1009d..e560cf1 100644
--- a/python/tests/proton_tests/main.py
+++ b/python/tests/proton_tests/main.py
@@ -22,7 +22,7 @@
 import optparse, os, struct, sys, time, traceback, types, cgi
 from fnmatch import fnmatchcase as match
 from logging import getLogger, StreamHandler, Formatter, Filter, \
-    WARN, DEBUG, ERROR
+    WARN, DEBUG, ERROR, INFO
 
 from .common import SkipTest
 
@@ -33,6 +33,7 @@ else:
 
 levels = {
   "DEBUG": DEBUG,
+  "INFO": INFO,
   "WARN": WARN,
   "ERROR": ERROR
   }
diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py
index 99ea996..c4e1227 100644
--- a/python/tests/proton_tests/reactor.py
+++ b/python/tests/proton_tests/reactor.py
@@ -21,11 +21,11 @@ from __future__ import absolute_import
 
 import time
 
-from proton.reactor import Container, ApplicationEvent, EventInjector, Selector
+from proton.reactor import Container, ApplicationEvent, EventInjector, Selector, Backoff
 from proton.handlers import Handshaker, MessagingHandler
 from proton import Handler, Url, symbol
 
-from .common import Test, SkipTest, TestServer, free_tcp_port, ensureCanTestExtendedSASL
+from .common import Test, SkipTest, TestServer, free_tcp_port, free_tcp_ports, ensureCanTestExtendedSASL
 
 class Barf(Exception):
     pass
@@ -442,18 +442,21 @@ class ContainerTest(Test):
         def __init__(self):
             super(ContainerTest._ClientHandler, self).__init__()
             self.server_addr = None
+            self.errors = 0
 
         def on_connection_opened(self, event):
             self.server_addr = event.connected_address
             event.connection.close()
 
+        def on_transport_error(self, event):
+            self.errors += 1
+
     def test_numeric_hostname(self):
         ensureCanTestExtendedSASL()
         server_handler = ContainerTest._ServerHandler("127.0.0.1")
         client_handler = ContainerTest._ClientHandler()
         container = Container(server_handler)
-        container.connect(url=Url(host="127.0.0.1",
-                                  port=server_handler.port),
+        container.connect(url="127.0.0.1:%s" % (server_handler.port),
                           handler=client_handler)
         container.run()
         assert server_handler.client_addr
@@ -466,8 +469,7 @@ class ContainerTest(Test):
         server_handler = ContainerTest._ServerHandler("localhost")
         client_handler = ContainerTest._ClientHandler()
         container = Container(server_handler)
-        container.connect(url=Url(host="localhost",
-                                  port=server_handler.port),
+        container.connect(url="localhost:%s" % (server_handler.port),
                           handler=client_handler)
         container.run()
         assert server_handler.client_addr
@@ -479,8 +481,7 @@ class ContainerTest(Test):
         ensureCanTestExtendedSASL()
         server_handler = ContainerTest._ServerHandler("localhost")
         container = Container(server_handler)
-        conn = container.connect(url=Url(host="localhost",
-                                         port=server_handler.port),
+        conn = container.connect(url="localhost:%s" % (server_handler.port),
                                  handler=ContainerTest._ClientHandler(),
                                  virtual_host="a.b.c.org")
         container.run()
@@ -492,8 +493,7 @@ class ContainerTest(Test):
         # Python Container.
         server_handler = ContainerTest._ServerHandler("localhost")
         container = Container(server_handler)
-        conn = container.connect(url=Url(host="localhost",
-                                         port=server_handler.port),
+        conn = container.connect(url="localhost:%s" % (server_handler.port),
                                  handler=ContainerTest._ClientHandler(),
                                  virtual_host="")
         container.run()
@@ -536,11 +536,55 @@ class ContainerTest(Test):
             self.connect_failed = True
             self.server_handler.listen(event.container)
 
+    def test_failover(self):
+        server_handler = ContainerTest._ServerHandler("localhost")
+        client_handler = ContainerTest._ClientHandler()
+        free_ports = free_tcp_ports(2)
+        container = Container(server_handler)
+        container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1]),
+                                "localhost:%s" % (server_handler.port)],
+                          handler=client_handler)
+        container.run()
+        assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname
+        assert client_handler.server_addr == Url(host='localhost', port=server_handler.port), client_handler.server_addr
+
+    def test_failover_fail(self):
+        client_handler = ContainerTest._ClientHandler()
+        free_ports = free_tcp_ports(2)
+        container = Container(client_handler)
+        start = time.time()
+        container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])],
+                          reconnect=Backoff(max_tries=5),
+                          handler=client_handler)
+        container.run()
+        end = time.time()
+        assert client_handler.errors == 10
+        # Total time for failure should be greater than but close to 3s
+        # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time!
+        assert 3.0 < end-start, end-start
+        assert client_handler.server_addr is None, client_handler.server_addr
+
+    def test_failover_fail_custom_reconnect(self):
+        client_handler = ContainerTest._ClientHandler()
+        free_ports = free_tcp_ports(2)
+        container = Container(client_handler)
+        start = time.time()
+        container.connect(urls=["localhost:%s" % (free_ports[0]), "localhost:%s" % (free_ports[1])],
+                          reconnect=[0, 0.5, 1],
+                          handler=client_handler)
+        container.run()
+        end = time.time()
+        assert client_handler.errors == 6
+        # Total time for failure should be greater than but close to 3s
+        # would like to have an upper bound of about 3.2 too - but loaded CI machines can take a loooong time!
+        assert 3.0 < end-start, end-start
+        assert client_handler.server_addr is None, client_handler.server_addr
+
     def test_reconnect(self):
         server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=True)
         client_handler = ContainerTest._ReconnectClientHandler(server_handler)
         container = Container(server_handler)
-        container.connect(url=Url(host="localhost", port=server_handler.port),
+        container.connect(url="localhost:%s" % (server_handler.port),
                           handler=client_handler)
         container.run()
         assert server_handler.peer_hostname == 'localhost', server_handler.peer_hostname
@@ -551,7 +595,7 @@ class ContainerTest(Test):
         server_handler = ContainerTest._ReconnectServerHandler("localhost", listen_on_error=False)
         client_handler = ContainerTest._ReconnectClientHandler(server_handler)
         container = Container(server_handler)
-        container.connect(url=Url(host="localhost", port=server_handler.port),
+        container.connect(url="localhost:%s" % (server_handler.port),
                           handler=client_handler, reconnect=False)
         container.run()
         assert server_handler.peer_hostname == None, server_handler.peer_hostname


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

[qpid-proton] 02/02: PROTON-2315: [Python] Support multiple connection URLs in the BlockingConnection class - [Originated by mprahl. modified by astitcher] - closes #243

Andrew Stitcher-2
In reply to this post by Andrew Stitcher-2
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit ebfe89f10e9f16e9347ebdb766480b7f4fdb5780
Author: mprahl <[hidden email]>
AuthorDate: Mon Apr 27 14:24:21 2020 -0400

    PROTON-2315: [Python] Support multiple connection URLs in the BlockingConnection class
    - [Originated by mprahl. modified by astitcher]
    - closes #243
---
 python/proton/_utils.py | 27 +++++++++++++++++++++------
 1 file changed, 21 insertions(+), 6 deletions(-)

diff --git a/python/proton/_utils.py b/python/proton/_utils.py
index a9812bb..5100f4c 100644
--- a/python/proton/_utils.py
+++ b/python/proton/_utils.py
@@ -331,8 +331,8 @@ class BlockingConnection(Handler):
     object operations are enclosed in a try block and that close() is
     always executed on exit.
 
-    :param url: Connection URL
-    :type url: :class:`proton.Url` or ``str``
+    :param url: The connection URL.
+    :type url: ``str``
     :param timeout: Connection timeout in seconds. If ``None``, defaults to 60 seconds.
     :type timeout: ``None`` or float
     :param container: Container to process the events on the connection. If ``None``,
@@ -341,23 +341,29 @@ class BlockingConnection(Handler):
     :param heartbeat: A value in seconds indicating the desired frequency of
         heartbeats used to test the underlying socket is alive.
     :type heartbeat: ``float``
+    :param urls: A list of connection URLs to try to connect to.
+    :type urls: ``list``[``str``]
     :param kwargs: Container keyword arguments. See :class:`proton.reactor.Container`
         for a list of the valid kwargs.
     """
 
-    def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
+    def __init__(self, url=None, timeout=None, container=None, ssl_domain=None, heartbeat=None, urls=None,
+                 reconnect=None, **kwargs):
         self.disconnected = False
         self.timeout = timeout or 60
         self.container = container or Container()
         self.container.timeout = self.timeout
         self.container.start()
-        self.url = Url(url).defaults()
         self.conn = None
         self.closing = False
+        # Preserve previous behaviour if neither reconnect nor urls are supplied
+        if url is not None and urls is None and reconnect is None:
+            reconnect = False
+            url = Url(url).defaults()
         failed = True
         try:
-            self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False,
-                                               heartbeat=heartbeat, **kwargs)
+            self.conn = self.container.connect(url=url, handler=self, ssl_domain=ssl_domain, reconnect=reconnect,
+                                               heartbeat=heartbeat, urls=urls, **kwargs)
             self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
                       msg="Opening connection")
             failed = False
@@ -437,6 +443,15 @@ class BlockingConnection(Handler):
             self.container.stop_events()
             self.container = None
 
+    @property
+    def url(self):
+        """
+        The address for this connection.
+
+        :type: ``str``
+        """
+        return self.conn and self.conn.connected_address
+
     def _is_closed(self):
         return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]