This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/qpid-dispatch.gitThe following commit(s) were added to refs/heads/master by this push:
new b463667 DISPATCH-1907 - Separated policy-spec, distribute policy spec with connection and core subscriptions.
b463667 is described below
commit b46366762aeebcf3f03f03c200f0b6d2e3d5688e
Author: Ted Ross <
[hidden email]>
AuthorDate: Thu Jan 7 11:04:05 2021 -0500
DISPATCH-1907 - Separated policy-spec, distribute policy spec with connection and core subscriptions.
---
include/qpid/dispatch/policy_spec.h | 39 ++++++++++++++++
include/qpid/dispatch/protocol_adaptor.h | 4 +-
include/qpid/dispatch/router_core.h | 3 +-
src/adaptors/http1/http1_client.c | 3 +-
src/adaptors/http1/http1_server.c | 3 +-
src/adaptors/http2/http2_adaptor.c | 6 +--
src/adaptors/reference_adaptor.c | 3 +-
src/adaptors/tcp_adaptor.c | 58 +++++++++++------------
src/policy.c | 70 ++++++++++++++--------------
src/policy.h | 28 ++++-------
src/python_embedded.c | 2 +-
src/remote_sasl.c | 4 +-
src/router_core/agent_conn_link_route.c | 3 +-
src/router_core/agent_connection.c | 3 +-
src/router_core/connections.c | 6 +--
src/router_core/forwarder.c | 5 +-
src/router_core/management_agent.c | 2 +-
src/router_core/modules/mobile_sync/mobile.c | 11 +++--
src/router_core/router_core_private.h | 6 +--
src/router_node.c | 5 +-
src/server.c | 2 +-
21 files changed, 146 insertions(+), 120 deletions(-)
diff --git a/include/qpid/dispatch/policy_spec.h b/include/qpid/dispatch/policy_spec.h
new file mode 100644
index 0000000..69e50ed
--- /dev/null
+++ b/include/qpid/dispatch/policy_spec.h
@@ -0,0 +1,39 @@
+#ifndef __policy_spec_h__
+#define __policy_spec_h__
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef struct {
+ int maxFrameSize;
+ int maxSessionWindow;
+ int maxSessions;
+ int maxSenders;
+ int maxReceivers;
+ uint64_t maxMessageSize;
+ bool allowDynamicSource;
+ bool allowAnonymousSender;
+ bool allowUserIdProxy;
+ bool allowWaypointLinks;
+ bool allowFallbackLinks;
+ bool allowDynamicLinkRoutes;
+ bool allowAdminStatusUpdate;
+ bool outgoingConnection;
+} qd_policy_spec_t;
+
+#endif
diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index ae7036f..537970e 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -20,6 +20,7 @@
*/
#include <qpid/dispatch/router_core.h>
+#include <qpid/dispatch/policy_spec.h>
typedef struct qdr_protocol_adaptor_t qdr_protocol_adaptor_t;
typedef struct qdr_connection_t qdr_connection_t;
@@ -369,10 +370,9 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
const char *remote_container_id,
bool strip_annotations_in,
bool strip_annotations_out,
- bool policy_allow_dynamic_link_routes,
- bool policy_allow_admin_status_update,
int link_capacity,
const char *vhost,
+ const qd_policy_spec_t *policy_spec,
qdr_connection_info_t *connection_info,
qdr_connection_bind_context_t context_binder,
void *bind_token);
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 2b54bb8..cd70f43 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -24,6 +24,7 @@
#include <qpid/dispatch/compose.h>
#include <qpid/dispatch/parse.h>
#include <qpid/dispatch/router.h>
+#include <qpid/dispatch/policy_spec.h>
/**
@@ -98,7 +99,7 @@ void qdr_core_route_table_handlers(qdr_core_t *core,
******************************************************************************
*/
typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit, int inter_router_cost,
- uint64_t conn_id);
+ uint64_t conn_id, const qd_policy_spec_t *policy);
/**
* qdr_core_subscribe
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 3c58f3b..5115ce4 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -302,10 +302,9 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn)
0, // remote container id
false, // strip annotations in
false, // strip annotations out
- false, // allow dynamic link routes
- false, // allow admin status update
DEFAULT_CAPACITY,
0, // vhost
+ 0, // policy_spec
info,
0, // bind context
0); // bind token
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 45d31f2..9481a4a 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -194,10 +194,9 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct
0, // remote container id
false, // strip annotations in
false, // strip annotations out
- false, // allow dynamic link routes
- false, // allow admin status update
DEFAULT_CAPACITY,
0, // vhost
+ 0, // policy_spec
info,
0, // bind context
0); // bind token
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index 250cf33..00c1041 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -1991,10 +1991,9 @@ qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_
0,
false,
false,
- false,
- false,
250,
0,
+ 0,
info,
0,
0);
@@ -2216,10 +2215,9 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto
0,
false,
false,
- false,
- false,
250,
0,
+ 0,
info,
0,
0);
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 08e7433..2fd3ff2 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -427,10 +427,9 @@ static void on_startup(void *context)
0, // remote_container_id
false, // strip_annotations_in
false, // strip_annotations_out
- false, // policy_allow_dynamic_link_routes
- false, // policy_allow_admin_status_update
250, // link_capacity
0, // vhost
+ 0, // policy_spec
info, // connection_info
0, // context_binder
0); // bind_token
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 3b886b9..6ea544f 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -455,21 +455,20 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
tc->conn_id = qd_server_allocate_connection_id(tc->server);
qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
tcp_adaptor->adaptor,
- true,
- QDR_ROLE_NORMAL,
- 1,
- tc->conn_id,
- 0,
- 0,
- false,
- false,
- false,
- false,
- 250,
- 0,
- info,
- 0,
- 0);
+ true, // incoming
+ QDR_ROLE_NORMAL, // role
+ 1, // cost
+ tc->conn_id, // management_id
+ 0, // label
+ 0, // remote_container_id
+ false, // strip_annotations_in
+ false, // strip_annotations_out
+ 250, // link_capacity
+ 0, // vhost
+ 0, // policy_spec
+ info, // connection_info
+ 0, // context_binder
+ 0); // bind_token
tc->qdr_conn = conn;
qdr_connection_set_context(conn, tc);
@@ -640,21 +639,20 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
tcp_adaptor->adaptor,
- false,
- QDR_ROLE_NORMAL,
- 1,
- tc->conn_id,
- 0,
- 0,
- false,
- false,
- false,
- false,
- 250,
- 0,
- info,
- 0,
- 0);
+ false, // incoming
+ QDR_ROLE_NORMAL, // role
+ 1, // cost
+ tc->conn_id, // management_id
+ 0, // label
+ 0, // remote_container_id
+ false, // strip_annotations_in
+ false, // strip_annotations_out
+ 250, // link_capacity
+ 0, // vhost
+ 0, // policy_spec
+ info, // connection_info
+ 0, // context_binder
+ 0); // bind_token
tc->qdr_conn = conn;
qdr_connection_set_context(conn, tc);
diff --git a/src/policy.c b/src/policy.c
index 7486dd7..42697b6 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -549,27 +549,27 @@ bool qd_policy_open_fetch_settings(
if (result2) {
int truthy = PyObject_IsTrue(result2);
if (truthy) {
- settings->maxFrameSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0);
- settings->maxSessionWindow = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0);
- settings->maxSessions = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0);
- settings->maxSenders = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0);
- settings->maxReceivers = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0);
- settings->maxMessageSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0);
- if (!settings->allowAnonymousSender) { //don't override if enabled by authz plugin
- settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false);
+ settings->spec.maxFrameSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0);
+ settings->spec.maxSessionWindow = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0);
+ settings->spec.maxSessions = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0);
+ settings->spec.maxSenders = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0);
+ settings->spec.maxReceivers = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0);
+ settings->spec.maxMessageSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0);
+ if (!settings->spec.allowAnonymousSender) { //don't override if enabled by authz plugin
+ settings->spec.allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false);
}
- if (!settings->allowDynamicSource) { //don't override if enabled by authz plugin
- settings->allowDynamicSource = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSource", false);
+ if (!settings->spec.allowDynamicSource) { //don't override if enabled by authz plugin
+ settings->spec.allowDynamicSource = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSource", false);
}
- settings->allowUserIdProxy = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false);
- settings->allowWaypointLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true);
- settings->allowFallbackLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowFallbackLinks", true);
- settings->allowDynamicLinkRoutes = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicLinkRoutes", true);
+ settings->spec.allowUserIdProxy = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false);
+ settings->spec.allowWaypointLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true);
+ settings->spec.allowFallbackLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowFallbackLinks", true);
+ settings->spec.allowDynamicLinkRoutes = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicLinkRoutes", true);
//
// By default, deleting connections are enabled. To disable, set the allowAdminStatusUpdate to false in a policy.
//
- settings->allowAdminStatusUpdate = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAdminStatusUpdate", true);
+ settings->spec.allowAdminStatusUpdate = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAdminStatusUpdate", true);
if (settings->sources == 0) { //don't override if configured by authz plugin
settings->sources = qd_entity_get_string((qd_entity_t*)upolicy, "sources");
}
@@ -640,8 +640,8 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
{
bool result = true;
if (qd_conn->policy_settings) {
- if (qd_conn->policy_settings->maxSessions) {
- if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) {
+ if (qd_conn->policy_settings->spec.maxSessions) {
+ if (qd_conn->n_sessions == qd_conn->policy_settings->spec.maxSessions) {
qd_policy_deny_amqp_session(ssn, qd_conn);
result = false;
}
@@ -672,9 +672,9 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn)
{
size_t capacity;
- if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow
- && !qd_conn->policy_settings->outgoingConnection) {
- capacity = qd_conn->policy_settings->maxSessionWindow;
+ if (qd_conn->policy_settings && qd_conn->policy_settings->spec.maxSessionWindow
+ && !qd_conn->policy_settings->spec.outgoingConnection) {
+ capacity = qd_conn->policy_settings->spec.maxSessionWindow;
} else {
const qd_server_config_t * cf = qd_connection_config(qd_conn);
capacity = cf->incoming_capacity;
@@ -1107,8 +1107,8 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_
const char *hostip = qd_connection_remote_ip(qd_conn);
const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn));
- if (qd_conn->policy_settings->maxSenders) {
- if (qd_conn->n_senders == qd_conn->policy_settings->maxSenders) {
+ if (qd_conn->policy_settings->spec.maxSenders) {
+ if (qd_conn->n_senders == qd_conn->policy_settings->spec.maxSenders) {
// Max sender limit specified and violated.
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
"[C%"PRIu64"] DENY AMQP Attach sender for user '%s', rhost '%s', vhost '%s' based on maxSenders limit",
@@ -1126,7 +1126,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_
bool lookup;
if (target && *target) {
// a target is specified
- if (!qd_conn->policy_settings->allowWaypointLinks) {
+ if (!qd_conn->policy_settings->spec.allowWaypointLinks) {
bool waypoint = qd_policy_terminus_is_waypoint(pn_link_remote_target(pn_link));
if (waypoint) {
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
@@ -1137,7 +1137,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_
}
}
- if (!qd_conn->policy_settings->allowFallbackLinks) {
+ if (!qd_conn->policy_settings->spec.allowFallbackLinks) {
bool fallback = qd_policy_terminus_is_fallback(pn_link_remote_target(pn_link));
if (fallback) {
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
@@ -1161,7 +1161,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_
} else {
// A sender with no remote target.
// This happens all the time with anonymous relay
- lookup = qd_conn->policy_settings->allowAnonymousSender;
+ lookup = qd_conn->policy_settings->spec.allowAnonymousSender;
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO),
"[C%"PRIu64"] %s AMQP Attach anonymous sender for user '%s', rhost '%s', vhost '%s'",
qd_conn->connection_id, (lookup ? "ALLOW" : "DENY"), qd_conn->user_id, hostip, vhost);
@@ -1180,8 +1180,8 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q
const char *hostip = qd_connection_remote_ip(qd_conn);
const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn));
- if (qd_conn->policy_settings->maxReceivers) {
- if (qd_conn->n_receivers == qd_conn->policy_settings->maxReceivers) {
+ if (qd_conn->policy_settings->spec.maxReceivers) {
+ if (qd_conn->n_receivers == qd_conn->policy_settings->spec.maxReceivers) {
// Max sender limit specified and violated.
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
"[C%"PRIu64"] DENY AMQP Attach receiver for user '%s', rhost '%s', vhost '%s' based on maxReceivers limit",
@@ -1197,7 +1197,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q
// Approve receiver link based on source
bool dynamic_src = pn_terminus_is_dynamic(pn_link_remote_source(pn_link));
if (dynamic_src) {
- bool lookup = qd_conn->policy_settings->allowDynamicSource;
+ bool lookup = qd_conn->policy_settings->spec.allowDynamicSource;
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO),
"[C%"PRIu64"] %s AMQP Attach receiver dynamic source for user '%s', rhost '%s', vhost '%s',",
qd_conn->connection_id, (lookup ? "ALLOW" : "DENY"), qd_conn->user_id, hostip, vhost);
@@ -1210,7 +1210,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q
const char * source = pn_terminus_get_address(pn_link_remote_source(pn_link));
if (source && *source) {
// a source is specified
- if (!qd_conn->policy_settings->allowWaypointLinks) {
+ if (!qd_conn->policy_settings->spec.allowWaypointLinks) {
bool waypoint = qd_policy_terminus_is_waypoint(pn_link_remote_source(pn_link));
if (waypoint) {
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
@@ -1221,7 +1221,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q
}
}
- if (!qd_conn->policy_settings->allowFallbackLinks) {
+ if (!qd_conn->policy_settings->spec.allowFallbackLinks) {
bool fallback = qd_policy_terminus_is_fallback(pn_link_remote_source(pn_link));
if (fallback) {
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
@@ -1286,10 +1286,10 @@ void qd_policy_amqp_open(qd_connection_t *qd_conn) {
// This connection is allowed by policy.
// Apply transport policy settings
if (qd_policy_open_fetch_settings(policy, vhost, settings_name, qd_conn->policy_settings)) {
- if (qd_conn->policy_settings->maxFrameSize > 0)
- pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize);
- if (qd_conn->policy_settings->maxSessions > 0)
- pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions - 1);
+ if (qd_conn->policy_settings->spec.maxFrameSize > 0)
+ pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->spec.maxFrameSize);
+ if (qd_conn->policy_settings->spec.maxSessions > 0)
+ pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->spec.maxSessions - 1);
const qd_server_config_t *cf = qd_connection_config(qd_conn);
if (cf && cf->multi_tenant) {
char vhost_name_buf[SETTINGS_NAME_SIZE];
@@ -1348,7 +1348,7 @@ void qd_policy_amqp_open_connector(qd_connection_t *qd_conn) {
ZERO(qd_conn->policy_settings);
if (qd_policy_open_fetch_settings(policy, policy_vhost, POLICY_VHOST_GROUP, qd_conn->policy_settings)) {
- qd_conn->policy_settings->outgoingConnection = true;
+ qd_conn->policy_settings->spec.outgoingConnection = true;
qd_conn->policy_counted = true; // Count senders and receivers for this connection
} else {
qd_log(policy->log_source,
diff --git a/src/policy.h b/src/policy.h
index ea6ecd7..f05c0d9 100644
--- a/src/policy.h
+++ b/src/policy.h
@@ -25,6 +25,7 @@
#include "qpid/dispatch/static_assert.h"
#include "qpid/dispatch/alloc.h"
#include "qpid/dispatch/alloc_pool.h"
+#include "qpid/dispatch/policy_spec.h"
#include "config.h"
#include "entity.h"
@@ -44,21 +45,12 @@ struct qd_policy_denial_counts_s {
typedef struct qd_policy_t qd_policy_t;
+//
+// Policy settings are defined in include/qpid/dispatch/policy_settings.h
+//
+
struct qd_policy__settings_s {
- int maxFrameSize;
- int maxSessionWindow;
- int maxSessions;
- int maxSenders;
- int maxReceivers;
- uint64_t maxMessageSize;
- bool allowDynamicSource;
- bool allowAnonymousSender;
- bool allowUserIdProxy;
- bool allowWaypointLinks;
- bool allowFallbackLinks;
- bool allowDynamicLinkRoutes;
- bool allowAdminStatusUpdate;
- bool outgoingConnection;
+ qd_policy_spec_t spec;
char *sources;
char *targets;
char *sourcePattern;
@@ -208,10 +200,10 @@ void qd_policy_settings_free(qd_policy_settings_t *settings);
* @param[in] isReceiver indication to check using receiver settings
*/
bool qd_policy_approve_link_name(const char *username,
- const qd_policy_settings_t *settings,
- const char *proposed,
- bool isReceiver
- );
+ const qd_policy_settings_t *settings,
+ const char *proposed,
+ bool isReceiver
+ );
/** Add a hostname to the lookup parse_tree
* Note that the parse_tree may store an 'optimised' pattern for a given
diff --git a/src/python_embedded.c b/src/python_embedded.c
index 2207563..dadf768 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -631,7 +631,7 @@ static qd_error_t iter_to_py_attr(qd_iterator_t *iter,
}
static void qd_io_rx_handler(void *context, qd_message_t *msg, int link_id, int inter_router_cost,
- uint64_t ignore)
+ uint64_t ignore, const qd_policy_spec_t *policy_spec)
{
IoAdapter *self = (IoAdapter*) context;
diff --git a/src/remote_sasl.c b/src/remote_sasl.c
index d8ffb22..624ee91 100644
--- a/src/remote_sasl.c
+++ b/src/remote_sasl.c
@@ -331,8 +331,8 @@ static void set_policy_settings(pn_connection_t* conn, permissions_t* permission
if (permissions->sources.start && permissions->sources.capacity) {
qd_conn->policy_settings->sources = qd_policy_compile_allowed_csv(permissions->sources.start);
}
- qd_conn->policy_settings->allowDynamicSource = true;
- qd_conn->policy_settings->allowAnonymousSender = true;
+ qd_conn->policy_settings->spec.allowDynamicSource = true;
+ qd_conn->policy_settings->spec.allowAnonymousSender = true;
}
}
diff --git a/src/router_core/agent_conn_link_route.c b/src/router_core/agent_conn_link_route.c
index b2feefe..7e8b0d3 100644
--- a/src/router_core/agent_conn_link_route.c
+++ b/src/router_core/agent_conn_link_route.c
@@ -180,7 +180,8 @@ void qdra_conn_link_route_create_CT(qdr_core_t *core,
}
// fail if forbidden by policy
- if (!conn->policy_allow_dynamic_link_routes) {
+ bool allow = conn->policy_spec ? conn->policy_spec->allowDynamicLinkRoutes : true;
+ if (!allow) {
query->status = QD_AMQP_FORBIDDEN;
goto exit;
}
diff --git a/src/router_core/agent_connection.c b/src/router_core/agent_connection.c
index 3e5dbc8..b8cfa7b 100644
--- a/src/router_core/agent_connection.c
+++ b/src/router_core/agent_connection.c
@@ -608,7 +608,8 @@ void qdra_connection_update_CT(qdr_core_t *core,
admin_status_bad_or_forbidden = true;
}
else {
- if (!user_conn->policy_allow_admin_status_update) {
+ bool allow = user_conn->policy_spec ? user_conn->policy_spec->allowAdminStatusUpdate : true;
+ if (!allow) {
//
// Policy on the connection that is requesting that some other connection be deleted does not allow
// for the other connection to be deleted.Set the status to QD_AMQP_FORBIDDEN and just quit.
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 05f69c6..2d833e1 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -77,10 +77,9 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
const char *remote_container_id,
bool strip_annotations_in,
bool strip_annotations_out,
- bool policy_allow_dynamic_link_routes,
- bool policy_allow_admin_status_update,
int link_capacity,
const char *vhost,
+ const qd_policy_spec_t *policy_spec,
qdr_connection_info_t *connection_info,
qdr_connection_bind_context_t context_binder,
void *bind_token)
@@ -99,8 +98,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
conn->inter_router_cost = cost;
conn->strip_annotations_in = strip_annotations_in;
conn->strip_annotations_out = strip_annotations_out;
- conn->policy_allow_dynamic_link_routes = policy_allow_dynamic_link_routes;
- conn->policy_allow_admin_status_update = policy_allow_admin_status_update;
+ conn->policy_spec = policy_spec;
conn->link_capacity = link_capacity;
conn->mask_bit = -1;
conn->admin_status = QDR_CONN_ADMIN_ENABLED;
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 1d1f884..09df514 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -309,7 +309,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
void qdr_forward_on_message(qdr_core_t *core, qdr_general_work_t *work)
{
- work->on_message(work->on_message_context, work->msg, work->maskbit, work->inter_router_cost, work->in_conn_id);
+ work->on_message(work->on_message_context, work->msg, work->maskbit, work->inter_router_cost, work->in_conn_id, work->policy_spec);
qd_message_free(work->msg);
}
@@ -324,7 +324,7 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li
//
// The handler runs in-core. Invoke it right now.
//
- sub->on_message(sub->on_message_context, msg, mask_bit, cost, identity);
+ sub->on_message(sub->on_message_context, msg, mask_bit, cost, identity, link ? link->conn->policy_spec : 0);
} else {
//
// The handler runs in an IO thread. Defer its invocation.
@@ -336,6 +336,7 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li
work->maskbit = mask_bit;
work->inter_router_cost = cost;
work->in_conn_id = identity;
+ work->policy_spec = link ? link->conn->policy_spec : 0;
qdr_post_general_work_CT(core, work);
}
}
diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c
index 9f729e9..564991c 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -499,7 +499,7 @@ static bool qd_can_handle_request(qd_parsed_field_t *properties_fld,
*
*/
void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id, int unused_cost,
- uint64_t in_conn_id)
+ uint64_t in_conn_id, const qd_policy_spec_t *policy_spec)
{
qdr_core_t *core = (qdr_core_t*) context;
qd_iterator_t *app_properties_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
diff --git a/src/router_core/modules/mobile_sync/mobile.c b/src/router_core/modules/mobile_sync/mobile.c
index 948bc38..daf947c 100644
--- a/src/router_core/modules/mobile_sync/mobile.c
+++ b/src/router_core/modules/mobile_sync/mobile.c
@@ -694,11 +694,12 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field
}
-static void qcm_mobile_sync_on_message_CT(void *context,
- qd_message_t *msg,
- int unused_link_maskbit,
- int unused_inter_router_cost,
- uint64_t unused_conn_id)
+static void qcm_mobile_sync_on_message_CT(void *context,
+ qd_message_t *msg,
+ int unused_link_maskbit,
+ int unused_inter_router_cost,
+ uint64_t unused_conn_id,
+ const qd_policy_spec_t *unused_policy_spec)
{
qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context;
qd_iterator_t *ap_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 14da0b8..e877bbd 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -239,6 +239,7 @@ struct qdr_general_work_t {
void *on_message_context;
uint64_t in_conn_id;
uint64_t mobile_seq;
+ const qd_policy_spec_t *policy_spec;
qdr_delivery_cleanup_list_t delivery_cleanup_list;
qdr_global_stats_handler_t stats_handler;
void *context;
@@ -671,8 +672,6 @@ struct qdr_connection_t {
qdr_conn_identifier_t *alt_conn_id;
bool strip_annotations_in;
bool strip_annotations_out;
- bool policy_allow_dynamic_link_routes;
- bool policy_allow_admin_status_update;
int link_capacity;
int mask_bit; ///< set only if inter-router connection
qdr_connection_work_list_t work_list;
@@ -693,6 +692,7 @@ struct qdr_connection_t {
bool enable_protocol_trace; // Has trace level logging been turned on for this connection.
bool has_streaming_links; ///< one or more of this connection's links are for streaming messages
qdr_link_list_t streaming_link_pool; ///< pool of links available for streaming messages
+ const qd_policy_spec_t *policy_spec;
};
DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t);
@@ -942,7 +942,7 @@ ALLOC_DECLARE(qdr_terminus_t);
void *router_core_thread(void *arg);
uint64_t qdr_identifier(qdr_core_t* core);
-void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost, uint64_t in_conn_id);
+void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost, uint64_t in_conn_id, const qd_policy_spec_t *policy_spec);
void qdr_route_table_setup_CT(qdr_core_t *core);
qdr_agent_t *qdr_agent(qdr_core_t *core);
void qdr_agent_setup_subscriptions(qdr_agent_t *agent, qdr_core_t *core);
diff --git a/src/router_node.c b/src/router_node.c
index a61fe87..f1a2496 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -508,7 +508,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
int tenant_space_len;
const char *tenant_space = qdr_connection_get_tenant_space(qdr_conn, &tenant_space_len);
if (conn->policy_settings)
- check_user = !conn->policy_settings->allowUserIdProxy;
+ check_user = !conn->policy_settings->spec.allowUserIdProxy;
//
// Validate the content of the delivery as an AMQP message. This is done partially, only
@@ -1225,10 +1225,9 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
pn_connection_remote_container(pn_conn),
conn->strip_annotations_in,
conn->strip_annotations_out,
- conn->policy_settings ? conn->policy_settings->allowDynamicLinkRoutes : true,
- conn->policy_settings ? conn->policy_settings->allowAdminStatusUpdate : true,
link_capacity,
vhost,
+ !!conn->policy_settings ? &conn->policy_settings->spec : 0,
connection_info,
bind_connection_context,
conn);
diff --git a/src/server.c b/src/server.c
index 4c8cb00..58d06a6 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1782,5 +1782,5 @@ sys_mutex_t *qd_server_get_activation_lock(qd_server_t * server)
}
uint64_t qd_connection_max_message_size(const qd_connection_t *c) {
- return (c && c->policy_settings) ? c->policy_settings->maxMessageSize : 0;
+ return (c && c->policy_settings) ? c->policy_settings->spec.maxMessageSize : 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail:
[hidden email]
For additional commands, e-mail:
[hidden email]