This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/qpid-dispatch.gitcommit 806188101578c3d7d5cb1b78b115d15a43784d90
Author: Kenneth Giusti <
[hidden email]>
AuthorDate: Sun Jan 3 16:39:18 2021 -0500
DISPATCH-1744: re-factor how requests are retired
This patch makes the process of finishing up the HTTP request more
synchronous by preventing the core from pushing the next outgoing
request delivery until after the current request is completed.
---
src/adaptors/http1/http1_server.c | 239 +++++++++++++++++++-------------------
1 file changed, 119 insertions(+), 120 deletions(-)
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index c8adc62..1e062c6 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -127,7 +127,9 @@ static void _server_response_msg_free(_server_request_t *req, _server_response_m
static void _server_request_free(_server_request_t *hreq);
static void _write_pending_request(_server_request_t *req);
static void _cancel_request(_server_request_t *req);
-static bool _process_requests(qdr_http1_connection_t *hconn);
+static bool _process_request(_server_request_t *req);
+static void _encode_request_message(_server_request_t *hreq);
+static void _send_request_message(_server_request_t *hreq);
////////////////////////////////////////////////////////
@@ -424,20 +426,27 @@ static void _do_reconnect(void *context)
return;
}
- _process_requests(hconn);
+ _process_request((_server_request_t*) DEQ_HEAD(hconn->requests));
}
- // lock out core activation
- sys_mutex_lock(qdr_http1_adaptor->lock);
- hconn->raw_conn = pn_raw_connection();
- pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
- // this next call may immediately reschedule the connection on another I/O
- // thread. After this call hconn may no longer be valid!
- pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
- sys_mutex_unlock(qdr_http1_adaptor->lock);
+ // Do not attempt to re-connect if the current request is still in
+ // progress. This happens when the server has closed the connection before
+ // the request message has fully arrived (!rx_complete).
+ // qdr_connection_process() will continue to invoke the
+ // qdr_http1_server_core_link_deliver callback until the request message is
+ // complete.
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
- "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
+ if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) {
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
+ sys_mutex_lock(qdr_http1_adaptor->lock);
+ hconn->raw_conn = pn_raw_connection();
+ pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+ // this next call may immediately reschedule the connection on another I/O
+ // thread. After this call hconn may no longer be valid!
+ pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
+ sys_mutex_unlock(qdr_http1_adaptor->lock);
+ }
}
static void _accept_and_settle_request(_server_request_t *hreq)
@@ -480,17 +489,6 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
// notify the codec so it can complete the current response
// message (response body terminated on connection closed)
h1_codec_connection_rx_closed(hconn->http_conn);
-
- // if the response for the current request has not fully arrived cancel
- // the request
- _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
- if (hreq) {
- if (hreq->base.out_http1_octets > 0) { // req msg written to server
- if (!hreq->response_complete) {
- _cancel_request(hreq);
- }
- }
- }
pn_raw_connection_close(hconn->raw_conn);
break;
}
@@ -520,12 +518,13 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
if (_is_request_in_progress(hreq) && !hreq->response_complete)
_cancel_request(hreq);
- _process_requests(hconn);
+ _process_request(hreq);
//
- // reconnect to the server. Leave the links intact so pending requests
- // are not aborted. If we fail to reconnect after LINK_TIMEOUT_MSECS
- // drop the links to prevent additional request from arriving.
+ // Try to reconnect to the server. Leave the links intact so pending
+ // requests are not aborted. If we fail to reconnect after
+ // LINK_TIMEOUT_MSECS drop the links to prevent additional request from
+ // arriving.
//
bool reconnect = false;
@@ -554,12 +553,10 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
return;
}
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", hconn->conn_id);
- _write_pending_request((_server_request_t*) DEQ_HEAD(hconn->requests));
+ _send_request_message((_server_request_t*) DEQ_HEAD(hconn->requests));
break;
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
- qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
// @TODO(kgiusti): backpressure if no credit
// if (hconn->in_link_credit > 0 */)
int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
@@ -614,8 +611,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
qdr_http1_connection_free(hconn);
} else {
- bool need_close = _process_requests(hconn);
-
+ bool need_close = _process_request((_server_request_t*) DEQ_HEAD(hconn->requests));
if (need_close) {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
qdr_http1_close_connection(hconn, "HTTP Request requires connection close");
@@ -624,27 +620,26 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
}
-// See if the current request can be completed and the next pending request
-// started. Return true if the connection must be closed before starting the
-// next request.
-static bool _process_requests(qdr_http1_connection_t *hconn)
+// Check the head request for completion. Return true if the connection must be
+// closed before starting the next request.
+static bool _process_request(_server_request_t *hreq)
{
- bool need_close = false;
- _server_request_t *next_hreq = 0;
- _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+ bool need_close = false;
if (!hreq)
return need_close;
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
- "[C%"PRIu64"] Processing current HTTP request msg-id=%"PRIu64", state=%s",
- hconn->conn_id, hreq->base.msg_id,
- hreq->codec_completed ? "codec complete"
- : hreq->cancelled ? "request cancelled"
- : "in-progress");
+ assert(DEQ_PREV(&hreq->base) == 0); // preserve order!
+
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
if (hreq->cancelled) {
+ // have to wait until all buffers returned from proton
+ // before we can release the request
+ if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
+ return false;
+
// clean up the request message delivery
if (hreq->request_dlv) {
@@ -655,6 +650,8 @@ static bool _process_requests(qdr_http1_connection_t *hconn)
hreq->request_dispo = (hreq->base.out_http1_octets > 0
? PN_MODIFIED : PN_RELEASED);
+ qd_message_set_send_complete(qdr_delivery_message(hreq->request_dlv));
+ qdr_link_complete_sent_message(qdr_http1_adaptor->core, hconn->out_link);
qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
hreq->request_dlv,
hreq->request_dispo,
@@ -680,19 +677,12 @@ static bool _process_requests(qdr_http1_connection_t *hconn)
rmsg = DEQ_HEAD(hreq->responses);
}
- // have to wait until all buffers returned from proton
- // before we can release the request
- if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
- return false;
-
// it is safe to keep the connection up if this request has never been
// written to the connection, otherwise the state of the connection is
// unknown so close it
if (hreq->base.out_http1_octets > 0)
need_close = true;
- else
- next_hreq = (_server_request_t*) DEQ_NEXT(&hreq->base);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" cancelled",
hconn->conn_id, hreq->base.msg_id);
@@ -701,7 +691,6 @@ static bool _process_requests(qdr_http1_connection_t *hconn)
if (hconn->out_link)
qdr_link_flow(qdr_http1_adaptor->core, hconn->out_link, 1, false);
-
} else if (hreq->codec_completed) {
// The request message has been fully encoded and the response msg(s)
@@ -719,6 +708,11 @@ static bool _process_requests(qdr_http1_connection_t *hconn)
assert(hreq->request_dlv);
assert(hreq->request_dispo == PN_ACCEPTED);
hreq->request_settled = DEQ_IS_EMPTY(hreq->responses);
+
+ if (!hreq->request_acked) {
+ qd_message_set_send_complete(qdr_delivery_message(hreq->request_dlv));
+ qdr_link_complete_sent_message(qdr_http1_adaptor->core, hconn->out_link);
+ }
qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
hreq->request_dlv,
hreq->request_dispo,
@@ -737,12 +731,6 @@ static bool _process_requests(qdr_http1_connection_t *hconn)
if (hreq->request_acked && hreq->request_settled && DEQ_SIZE(hreq->out_data.fifo) == 0) {
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" completed!",
hconn->conn_id, hreq->base.msg_id);
-
- if (hreq->close_on_complete)
- need_close = true;
- else
- next_hreq = (_server_request_t*) DEQ_NEXT(&hreq->base);
-
_server_request_free(hreq);
if (hconn->out_link)
@@ -750,13 +738,6 @@ static bool _process_requests(qdr_http1_connection_t *hconn)
}
}
- if (next_hreq) {
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
- "[C%"PRIu64"] starting new HTTP request msg-id=%"PRIu64,
- hconn->conn_id, next_hreq->base.msg_id);
- _write_pending_request(next_hreq);
- }
-
return need_close;
}
@@ -1357,28 +1338,28 @@ exit:
}
-// Encode an outbound AMQP message as an HTTP Request. Returns PN_ACCEPTED
-// when complete, 0 if incomplete and PN_REJECTED if encoding error.
+// Encode an outbound AMQP message as an HTTP Request. Sets the request_dispo
+// when the encoding completes either successfully or in error.
//
-static uint64_t _encode_request_message(_server_request_t *hreq)
+static void _encode_request_message(_server_request_t *hreq)
{
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_message_t *msg = qdr_delivery_message(hreq->request_dlv);
if (!hreq->headers_encoded) {
- uint64_t outcome = _send_request_headers(hreq, msg);
+ hreq->request_dispo = _send_request_headers(hreq, msg);
hreq->headers_encoded = true;
- if (outcome) {
+ if (hreq->request_dispo) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting malformed message msg-id=%"PRIu64,
hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
- return outcome;
+ return;
}
}
- qd_message_stream_data_t *stream_data = 0;
+ while (hreq->request_dispo == 0) {
- while (true) {
+ qd_message_stream_data_t *stream_data = 0;
switch (qd_message_next_stream_data(msg, &stream_data)) {
case QD_MESSAGE_STREAM_DATA_BODY_OK: {
@@ -1390,7 +1371,7 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] body data encode failed",
hconn->conn_id, hconn->out_link_id);
- return PN_REJECTED;
+ hreq->request_dispo = PN_REJECTED;
}
break;
}
@@ -1401,20 +1382,58 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
case QD_MESSAGE_STREAM_DATA_NO_MORE:
// indicate this message is complete
- return PN_ACCEPTED;
+ qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Request %p body data encode complete",
+ hconn->conn_id, hconn->out_link_id, (void*) hreq);
+ hreq->request_dispo = PN_ACCEPTED;
+ break;
case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
- qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] body data need more",
- hconn->conn_id, hconn->out_link_id);
- return 0; // wait for more
+ return; // wait for more
case QD_MESSAGE_STREAM_DATA_INVALID:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
hconn->conn_id, hconn->out_link_id);
- return PN_REJECTED;
+ hreq->request_dispo = PN_REJECTED;
+ break;
+ }
+ }
+}
+
+
+// encode the request message and write it out to the server.
+static void _send_request_message(_server_request_t *hreq)
+{
+ if (hreq) {
+ assert(DEQ_PREV(&hreq->base) == 0); // preserve order!
+ qdr_http1_connection_t *hconn = hreq->base.hconn;
+ if (hreq->request_dispo == 0) {
+ _encode_request_message(hreq);
+ switch (hreq->request_dispo) {
+
+ case 0:
+ // streaming, not complete
+ break;
+
+ case PN_ACCEPTED: {
+ // completed successfully
+ bool ignore = false; // used client-facing only
+ h1_codec_tx_done(hreq->base.lib_rs, &ignore);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] HTTP %p request message msg-id=%"PRIu64" encoding complete",
+ hconn->conn_id, hconn->out_link_id, (void*)hreq, hreq->base.msg_id);
+ break;
+ }
+
+ default:
+ // encoding failure
+ _cancel_request(hreq);
+ return;
+ }
}
+ // write encoded data to raw conn
+ _write_pending_request(hreq);
}
}
@@ -1463,32 +1482,8 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor,
}
}
- if (!hreq->request_dispo) {
- hreq->request_dispo = _encode_request_message(hreq);
- if (hreq->request_dispo) {
- qd_message_set_send_complete(msg);
- if (hreq->request_dispo == PN_ACCEPTED) {
- bool need_close = false;
- h1_codec_tx_done(hreq->base.lib_rs, &need_close);
- hreq->close_on_complete = need_close || hreq->close_on_complete;
-
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
- "[C%"PRIu64"][L%"PRIu64"] HTTP request message msg-id=%"PRIu64" encoding complete",
- hconn->conn_id, link->identity, hreq->base.msg_id);
- } else {
- // message invalid
- _cancel_request(hreq);
-
- // returning a terminal disposition will cause the delivery to be updated and settled,
- // so drop our reference
- qdr_delivery_set_context(hreq->request_dlv, 0);
- qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 server releasing malformed HTTP1 request delivery");
- hreq->request_dlv = 0;
- hreq->request_acked = hreq->request_settled = true;
- return hreq->request_dispo;
- }
- }
- }
+ if (DEQ_HEAD(hconn->requests) == &hreq->base)
+ _send_request_message(hreq);
return 0;
}
@@ -1544,8 +1539,9 @@ static void _write_pending_request(_server_request_t *hreq)
uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &hreq->out_data);
hreq->base.out_http1_octets += written;
if (written)
- qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written",
- hreq->base.hconn->conn_id, written);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] %"PRIu64" request octets written to server",
+ hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id, written);
}
}
@@ -1562,18 +1558,21 @@ void qdr_http1_server_conn_cleanup(qdr_http1_connection_t *hconn)
static void _cancel_request(_server_request_t *hreq)
{
- qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
- "[C%"PRIu64"][L%"PRIu64"] Cancelling HTTP Request msg-id=%"PRIu64,
- hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id,
- hreq->base.msg_id);
+ if (!hreq->cancelled) {
- if (!hreq->base.lib_rs) {
- // never even got to encoding it - manually mark it cancelled
- hreq->cancelled = true;
- } else {
- // cleanup codec state - this will call _server_request_complete_cb()
- // with cancelled = true
- h1_codec_request_state_cancel(hreq->base.lib_rs);
+ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+ "[C%"PRIu64"][L%"PRIu64"] Cancelling HTTP Request msg-id=%"PRIu64,
+ hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id,
+ hreq->base.msg_id);
+
+ if (!hreq->base.lib_rs) {
+ // never even got to encoding it - manually mark it cancelled
+ hreq->cancelled = true;
+ } else {
+ // cleanup codec state - this will call _server_request_complete_cb()
+ // with cancelled = true
+ h1_codec_request_state_cancel(hreq->base.lib_rs);
+ }
}
// cleanup occurs at the end of the connection event handler
---------------------------------------------------------------------
To unsubscribe, e-mail:
[hidden email]
For additional commands, e-mail:
[hidden email]