qpid-dispatch git commit: DISPATCH-803 - The following changes were made to support unavailable distribution 1. Added a new attribute to the router entity called called defaultDistribution which defaults to balanced but can be set to unavailable 2. Attac

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

qpid-dispatch git commit: DISPATCH-803 - The following changes were made to support unavailable distribution 1. Added a new attribute to the router entity called called defaultDistribution which defaults to balanced but can be set to unavailable 2. Attac

gmurthy
Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 043c5ccf8 -> 1ca80b6e2


DISPATCH-803 - The following changes were made to support unavailable distribution
1. Added a new attribute to the router entity called called defaultDistribution which defaults to balanced
but can be set to unavailable
2. Attaches to addresses with distribution unavailable are rejected and the link is detached
3. Anonymous senders sending to unavailable addresses will be sent back a disposition of PN_REJECTED but link will not be closed
4. Added system test system_tests_default_distribution.py to test the above cases

(cherry picked from commit 49f643e9fabfe381934b26b679b4f2bda39f2e4a)


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1ca80b6e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1ca80b6e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1ca80b6e

Branch: refs/heads/master
Commit: 1ca80b6e29d105a3de43a57aa800855276c64caf
Parents: 043c5cc
Author: Ganesh Murthy <[hidden email]>
Authored: Wed Aug 9 13:41:08 2017 -0400
Committer: Ganesh Murthy <[hidden email]>
Committed: Thu Aug 10 12:07:30 2017 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router.h                |   3 +-
 python/qpid_dispatch/management/qdrouter.json |  13 +-
 src/dispatch.c                                |  23 ++-
 src/dispatch_private.h                        |   1 +
 src/router_core/agent_address.c               |  11 +-
 src/router_core/agent_config_address.c        |   1 +
 src/router_core/connections.c                 |  44 ++++--
 src/router_core/router_core.c                 |  12 +-
 src/router_core/transfer.c                    |  17 +++
 tests/CMakeLists.txt                          |   1 +
 tests/system_tests_default_distribution.py    | 162 +++++++++++++++++++++
 tests/system_tests_one_router.py              |  69 +++++++++
 12 files changed, 332 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/include/qpid/dispatch/router.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router.h b/include/qpid/dispatch/router.h
index 78a7599..8004670 100644
--- a/include/qpid/dispatch/router.h
+++ b/include/qpid/dispatch/router.h
@@ -42,7 +42,8 @@ typedef enum {
     QD_TREATMENT_MULTICAST_ONCE   = 1,
     QD_TREATMENT_ANYCAST_CLOSEST  = 2,
     QD_TREATMENT_ANYCAST_BALANCED = 3,
-    QD_TREATMENT_LINK_BALANCED    = 4
+    QD_TREATMENT_LINK_BALANCED    = 4,
+    QD_TREATMENT_UNAVAILABLE      = 5
 } qd_address_treatment_t;
 
 #include <qpid/dispatch/router_core.h>

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 1101961..1a8eded 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -504,6 +504,13 @@
                     "deprecated": true,
                     "description": "(DEPRECATED) This value is no longer used in the router.",
                     "create": true
+                },
+                "defaultDistribution": {
+                    "type": ["multicast", "closest", "balanced", "unavailable"],
+                    "description": "Default forwarding treatment for any address without a specified treatment. multicast - one copy of each message delivered to all subscribers; closest - messages delivered to only the closest subscriber; balanced - messages delivered to one subscriber with load balanced across subscribers; unavailable - this address is unavailable, link attaches to an address of unavilable distribution will be rejected.",
+                    "create": true,
+                    "required": false,
+                    "default": "balanced"
                 }
             }
         },
@@ -962,7 +969,7 @@
                     "required": false
                 },
                 "distribution": {
-                    "type": ["multicast", "closest", "balanced"],
+                    "type": ["multicast", "closest", "balanced", "unavailable"],
                     "description": "Treatment of traffic associated with the address",
                     "create": true,
                     "required": false,
@@ -1178,8 +1185,8 @@
             "extends": "operationalEntity",
             "attributes": {
                 "distribution": {
-                    "type": ["flood", "multicast", "closest", "balanced", "linkBalanced"],
-                    "description": "Forwarding treatment for the address: flood - messages delivered to all subscribers along all available paths (this will cause duplicate deliveries if there are redundant paths); multi - one copy of each message delivered to all subscribers; anyClosest - messages delivered to only the closest subscriber; anyBalanced - messages delivered to one subscriber with load balanced across subscribers; linkBalanced - for link-routing, link attaches balanced across destinations."
+                    "type": ["flood", "multicast", "closest", "balanced", "linkBalanced", "unavailable"],
+                    "description": "Forwarding treatment for the address: flood - messages delivered to all subscribers along all available paths (this will cause duplicate deliveries if there are redundant paths); multicast - one copy of each message delivered to all subscribers; closest - messages delivered to only the closest subscriber; balanced - messages delivered to one subscriber with load balanced across subscribers; linkBalanced - for link-routing, link attaches balanced across destinations; unavailable - this address is unavailable, link attaches to an address of unavailable distribution will be rejected."
                 },
                 "inProcess": {
                     "type": "integer",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index eb0c6eb..74dd0c9 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -50,7 +50,10 @@ qd_router_t    *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *
 void            qd_router_setup_late(qd_dispatch_t *qd);
 void            qd_router_free(qd_router_t *router);
 void            qd_error_initialize();
-
+const char     *CLOSEST_DISTRIBUTION   = "closest";
+const char     *MULTICAST_DISTRIBUTION = "multicast";
+const char     *BALANCED_DISTRIBUTION  = "balanced";
+const char     *UNAVAILABLE_DISTRIBUTION = "unavailable";
 qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
 {
     qd_dispatch_t *qd = NEW(qd_dispatch_t);
@@ -76,6 +79,7 @@ qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
     qd_dispatch_set_router_area(qd, strdup("0"));
     qd_dispatch_set_router_id(qd, strdup("0"));
     qd->router_mode = QD_ROUTER_MODE_ENDPOINT;
+    qd->default_treatment   = QD_TREATMENT_LINK_BALANCED;
 
     qd_python_initialize(qd, python_pkgdir);
     if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
@@ -163,10 +167,27 @@ qd_error_t qd_dispatch_configure_container(qd_dispatch_t *qd, qd_entity_t *entit
     return QD_ERROR_NONE;
 }
 
+void qd_dispatch_set_router_default_distribution(qd_dispatch_t *qd, char *distribution)
+{
+    if (distribution) {
+        if (strcmp(distribution, MULTICAST_DISTRIBUTION) == 0)
+            qd->default_treatment = QD_TREATMENT_MULTICAST_ONCE;
+        else if (strcmp(distribution, CLOSEST_DISTRIBUTION) == 0)
+            qd->default_treatment = QD_TREATMENT_ANYCAST_CLOSEST;
+        else if (strcmp(distribution, BALANCED_DISTRIBUTION) == 0)
+            qd->default_treatment = QD_TREATMENT_ANYCAST_BALANCED;
+        else if (strcmp(distribution, UNAVAILABLE_DISTRIBUTION) == 0)
+            qd->default_treatment = QD_TREATMENT_UNAVAILABLE;
+    }
+    else
+        // The default for the router defaultDistribution field is QD_TREATMENT_ANYCAST_BALANCED
+        qd->default_treatment = QD_TREATMENT_ANYCAST_BALANCED;
+}
 
 qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity)
 {
     qd_dispatch_set_router_id(qd, qd_entity_opt_string(entity, "routerId", 0)); QD_ERROR_RET();
+    qd_dispatch_set_router_default_distribution(qd, qd_entity_opt_string(entity, "defaultDistribution", 0)); QD_ERROR_RET();
     if (! qd->router_id) {
         qd_dispatch_set_router_id(qd, qd_entity_opt_string(entity, "id", 0)); QD_ERROR_RET();
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/src/dispatch_private.h
----------------------------------------------------------------------
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index 8b46d71..6d67e38 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -52,6 +52,7 @@ struct qd_dispatch_t {
     qd_connection_manager_t *connection_manager;
     qd_policy_t             *policy;
     void                    *dl_handle;
+    qd_address_treatment_t   default_treatment;
 
     int    thread_count;
     char  *sasl_config_path;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index 43909b4..72fb310 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -80,11 +80,12 @@ static void qdr_insert_address_columns_CT(qdr_core_t          *core,
 
     case QDR_ADDRESS_DISTRIBUTION: {
         switch (addr->treatment) {
-        case QD_TREATMENT_MULTICAST_FLOOD:  qd_compose_insert_string(body, "flood");        break;
-        case QD_TREATMENT_MULTICAST_ONCE:   qd_compose_insert_string(body, "multicast");    break;
-        case QD_TREATMENT_ANYCAST_CLOSEST:  qd_compose_insert_string(body, "closest");      break;
-        case QD_TREATMENT_ANYCAST_BALANCED: qd_compose_insert_string(body, "balanced");     break;
-        case QD_TREATMENT_LINK_BALANCED:    qd_compose_insert_string(body, "linkBalanced"); break;
+            case QD_TREATMENT_MULTICAST_FLOOD:  qd_compose_insert_string(body, "flood");        break;
+            case QD_TREATMENT_MULTICAST_ONCE:   qd_compose_insert_string(body, "multicast");    break;
+            case QD_TREATMENT_ANYCAST_CLOSEST:  qd_compose_insert_string(body, "closest");      break;
+            case QD_TREATMENT_ANYCAST_BALANCED: qd_compose_insert_string(body, "balanced");     break;
+            case QD_TREATMENT_LINK_BALANCED:    qd_compose_insert_string(body, "linkBalanced"); break;
+            case QD_TREATMENT_UNAVAILABLE:      qd_compose_insert_string(body, "unavailable");  break;
         }
         break;
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/src/router_core/agent_config_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_config_address.c b/src/router_core/agent_config_address.c
index f73ecf3..e054c7d 100644
--- a/src/router_core/agent_config_address.c
+++ b/src/router_core/agent_config_address.c
@@ -224,6 +224,7 @@ static qd_address_treatment_t qdra_address_treatment_CT(qd_parsed_field_t *field
         if (qd_iterator_equal(iter, (unsigned char*) "multicast"))    return QD_TREATMENT_MULTICAST_ONCE;
         if (qd_iterator_equal(iter, (unsigned char*) "closest"))      return QD_TREATMENT_ANYCAST_CLOSEST;
         if (qd_iterator_equal(iter, (unsigned char*) "balanced"))     return QD_TREATMENT_ANYCAST_BALANCED;
+        if (qd_iterator_equal(iter, (unsigned char*) "unavailable"))  return QD_TREATMENT_UNAVAILABLE;
     }
     return QD_TREATMENT_ANYCAST_BALANCED;
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 7db6ecb..f9638bf 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -873,7 +873,8 @@ qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connec
     if (in_phase)  *in_phase  = addr ? addr->in_phase  : 0;
     if (out_phase) *out_phase = addr ? addr->out_phase : 0;
 
-    return addr ? addr->treatment : QD_TREATMENT_ANYCAST_BALANCED;
+
+    return addr ? addr->treatment : core->qd->default_treatment;
 }
 
 
@@ -884,7 +885,7 @@ qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_it
     char *copy    = storage;
     bool  on_heap = false;
     int   length  = qd_iterator_length(iter);
-    qd_address_treatment_t trt = QD_TREATMENT_ANYCAST_BALANCED;
+    qd_address_treatment_t trt = core->qd->default_treatment;
 
     if (length > HASH_STORAGE_SIZE) {
         copy    = (char*) malloc(length + 1);
@@ -975,7 +976,8 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t       *core,
                                                      qdr_terminus_t   *terminus,
                                                      bool              create_if_not_found,
                                                      bool              accept_dynamic,
-                                                     bool             *link_route)
+                                                     bool             *link_route,
+                                                     bool             *unavailable)
 {
     qdr_address_t *addr = 0;
 
@@ -983,6 +985,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t       *core,
     // Unless expressly stated, link routing is not indicated for this terminus.
     //
     *link_route = false;
+    *unavailable = false;
 
     if (qdr_terminus_is_dynamic(terminus)) {
         //
@@ -1085,6 +1088,11 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t       *core,
     int addr_phase;
     qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase);
 
+    if (treat == QD_TREATMENT_UNAVAILABLE) {
+        *unavailable = true;
+        return 0;
+    }
+
     qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
     addr_phase = dir == QD_INCOMING ? in_phase : out_phase;
     qd_iterator_annotate_phase(iter, (char) addr_phase + '0');
@@ -1092,8 +1100,10 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t       *core,
     qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
     if (!addr && create_if_not_found) {
         addr = qdr_address_CT(core, treat);
-        qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
-        DEQ_INSERT_TAIL(core->addrs, addr);
+        if (addr) {
+            qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+            DEQ_INSERT_TAIL(core->addrs, addr);
+        }
     }
 
     return addr;
@@ -1299,9 +1309,15 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 //
                 // This link has a target address
                 //
-                bool           link_route;
-                qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route);
-                if (!addr) {
+                bool  link_route;
+                bool  unavailable;
+                qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route, &unavailable);
+                if (unavailable) {
+                    qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
+                    qdr_terminus_free(source);
+                    qdr_terminus_free(target);
+                }
+                else if (!addr) {
                     //
                     // No route to this destination, reject the link
                     //
@@ -1356,9 +1372,15 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
         //
         switch (link->link_type) {
         case QD_LINK_ENDPOINT: {
-            bool           link_route;
-            qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route);
-            if (!addr) {
+            bool  link_route;
+            bool  unavailable;
+            qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route, &unavailable);
+            if (unavailable) {
+                qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
+                qdr_terminus_free(source);
+                qdr_terminus_free(target);
+            }
+            else if (!addr) {
                 //
                 // No route to this destination, reject the link
                 //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 13e0e62..510b427 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -274,6 +274,8 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action)
 
 qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment)
 {
+    if (treatment == QD_TREATMENT_UNAVAILABLE)
+        return 0;
     qdr_address_t *addr = new_qdr_address_t();
     ZERO(addr);
     addr->treatment = treatment;
@@ -295,10 +297,12 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const cha
     qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
     if (!addr) {
         addr = qdr_address_CT(core, treatment);
-        qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
-        DEQ_INSERT_TAIL(core->addrs, addr);
-        addr->block_deletion = true;
-        addr->local = (aclass == 'L');
+        if (addr) {
+            qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+            DEQ_INSERT_TAIL(core->addrs, addr);
+            addr->block_deletion = true;
+            addr->local = (aclass == 'L');
+        }
     }
     qd_iterator_free(iter);
     return addr;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 66b9687..9453996 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -634,6 +634,23 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
             addr->deliveries_ingress++;
         link->total_deliveries++;
     }
+    //
+    // There is no address that we can send this delivery to, which means the addr was not found in our hastable. This
+    // can be because there were no receivers or because the address was not defined in the config file.
+    // If the treatment for such addresses is set to be unavailable, we send back a rejected disposition and detach the link
+    //
+    else if (core->qd->default_treatment == QD_TREATMENT_UNAVAILABLE) {
+        dlv->disposition = PN_REJECTED;
+        dlv->error = qdr_error(QD_AMQP_COND_NOT_FOUND, "Deliveries cannot be sent to an unavailable address");
+        qdr_delivery_push_CT(core, dlv);
+        //
+        // We will not detach this link because this could be anonymous sender. We don't know
+        // which address the sender will be sending to next
+        // If this was not an anonymous sender, the initial attach would have been rejected if the target address was unavailable.
+        //
+        return;
+    }
+
 
     if (fanout == 0) {
         //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 8a5cbd1..fbf2f94 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -81,6 +81,7 @@ foreach(py_test_module
     system_tests_drain
     system_tests_management
     system_tests_one_router
+    system_tests_default_distribution
     system_tests_policy
     system_tests_protocol_family
     system_tests_protocol_settings

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/tests/system_tests_default_distribution.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_default_distribution.py b/tests/system_tests_default_distribution.py
new file mode 100644
index 0000000..e97acf8
--- /dev/null
+++ b/tests/system_tests_default_distribution.py
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from system_test import TestCase, Qdrouterd, TIMEOUT
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from proton import Message
+
+class DefaultDistributionTest(TestCase):
+    """System tests for testing the defaultDistribution attribute of the router entity"""
+    @classmethod
+    def setUpClass(cls):
+        super(DefaultDistributionTest, cls).setUpClass()
+        name = "test-router"
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR', "defaultDistribution": 'unavailable'}),
+
+            ('listener', {'port': cls.tester.get_port()}),
+
+            ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+            ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+        ])
+        cls.router = cls.tester.qdrouterd(name, config)
+        cls.router.wait_ready()
+        cls.address = cls.router.addresses[0]
+
+    def test_create_unavailable_sender(self):
+        test = UnavailableSender(self.address)
+        test.run()
+        self.assertTrue(test.passed)
+
+    def test_create_unavailable_receiver(self):
+        test = UnavailableReceiver(self.address)
+        test.run()
+        self.assertTrue(test.passed)
+
+    def test_anonymous_sender(self):
+        test = UnavailableAnonymousSender(self.address)
+        test.run()
+        self.assertTrue(test.received_error)
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+class UnavailableBase(MessagingHandler):
+    def __init__(self, address):
+        super(UnavailableBase, self).__init__()
+        self.address = address
+        self.dest = "UnavailableBase"
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.link_error = False
+        self.link_closed = False
+        self.passed = False
+        self.timer = None
+        self.link_name = "base_link"
+
+    def check_if_done(self):
+        if self.link_error and self.link_closed:
+            self.passed = True
+            self.conn.close()
+            self.timer.cancel()
+
+    def on_link_error(self, event):
+        link = event.link
+        if event.link.name == self.link_name and link.remote_condition.description \
+                == "Node not found":
+            self.link_error = True
+        self.check_if_done()
+
+    def on_link_remote_close(self, event):
+        if event.link.name == self.link_name:
+            self.link_closed = True
+            self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+class UnavailableSender(UnavailableBase):
+    def __init__(self, address):
+        super(UnavailableSender, self).__init__(address)
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
+        # Creating a sender to an address with unavailable distribution
+        # The router will not allow this link to be established. It will close the link with an error of
+        # "Node not found"
+        self.sender = event.container.create_sender(self.conn, self.dest, name=self.link_name)
+
+class UnavailableReceiver(UnavailableBase):
+    def __init__(self, address):
+        super(UnavailableReceiver, self).__init__(address)
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
+        # Creating a receiver to an address with unavailable distribution
+        # The router will not allow this link to be established. It will close the link with an error of
+        # "Node not found"
+        self.receiver = event.container.create_receiver(self.conn, self.dest, name=self.link_name)
+
+class UnavailableAnonymousSender(MessagingHandler):
+    def __init__(self, address):
+        super(UnavailableAnonymousSender, self).__init__()
+        self.address = address
+        self.dest = "UnavailableBase"
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.received_error = False
+        self.timer = None
+        self.link_name = "anon_link"
+        self.error_description = "Deliveries cannot be sent to an unavailable address"
+        self.error_name = u'amqp:not-found'
+        self.num_sent = 0
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
+        # Creating an anonymous sender
+        self.sender = event.container.create_sender(self.conn, name=self.link_name)
+
+    def on_sendable(self, event):
+        if self.num_sent < 1:
+            msg = Message(id=1, body='Hello World')
+            # this is a unavailable address
+            msg.address = "SomeUnavailableAddress"
+            event.sender.send(msg)
+            self.num_sent += 1
+
+    def on_rejected(self, event):
+        if event.link.name == self.link_name and event.delivery.remote.condition.name == self.error_name \
+                and self.error_description == event.delivery.remote.condition.description:
+            self.received_error = True
+            self.conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1ca80b6e/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index be7c731..b8f6f9c 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -53,6 +53,7 @@ class RouterTest(TestCase):
             ('address', {'prefix': 'closest', 'distribution': 'closest'}),
             ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
             ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+            ('address', {'prefix': 'unavailable', 'distribution': 'unavailable'})
         ])
         cls.router = cls.tester.qdrouterd(name, config)
         cls.router.wait_ready()
@@ -1124,6 +1125,16 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_27_create_unavailable_sender(self):
+        test = UnavailableSender(self.address)
+        test.run()
+        self.assertTrue(test.passed)
+
+    def test_28_create_unavailable_receiver(self):
+        test = UnavailableReceiver(self.address)
+        test.run()
+        self.assertTrue(test.passed)
+
     def test_reject_disposition(self):
         test = RejectDispositionTest(self.address)
         test.run()
@@ -1234,6 +1245,64 @@ class ExcessDeliveriesReleasedTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+class UnavailableBase(MessagingHandler):
+    def __init__(self, address):
+        super(UnavailableBase, self).__init__()
+        self.address = address
+        self.dest = "unavailable"
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.link_error = False
+        self.link_closed = False
+        self.passed = False
+        self.timer = None
+        self.link_name = "test_link"
+
+    def check_if_done(self):
+        if self.link_error and self.link_closed:
+            self.passed = True
+            self.conn.close()
+            self.timer.cancel()
+
+    def on_link_error(self, event):
+        link = event.link
+        if event.link.name == self.link_name and link.remote_condition.description \
+                == "Node not found":
+            self.link_error = True
+        self.check_if_done()
+
+    def on_link_remote_close(self, event):
+        if event.link.name == self.link_name:
+            self.link_closed = True
+            self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+class UnavailableSender(UnavailableBase):
+    def __init__(self, address):
+        super(UnavailableSender, self).__init__(address)
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
+        # Creating a sender to an address with unavailable distribution
+        # The router will not allow this link to be established. It will close the link with an error of
+        # "Node not found"
+        self.sender = event.container.create_sender(self.conn, self.dest, name=self.link_name)
+
+class UnavailableReceiver(UnavailableBase):
+    def __init__(self, address):
+        super(UnavailableReceiver, self).__init__(address)
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
+        # Creating a receiver to an address with unavailable distribution
+        # The router will not allow this link to be established. It will close the link with an error of
+        # "Node not found"
+        self.receiver = event.container.create_receiver(self.conn, self.dest, name=self.link_name)
 
 class MulticastUnsettledTest(MessagingHandler):
     def __init__(self, address):


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Loading...