|
Author: rhs
Date: Thu May 3 17:06:47 2012 New Revision: 1333552 URL: http://svn.apache.org/viewvc?rev=1333552&view=rev Log: added arg checking; fixed shutdown for sasl failure; removed pn_connector_eos; fixed driver bug Modified: qpid/proton/trunk/proton-c/bindings/php/examples/client.php qpid/proton/trunk/proton-c/bindings/php/examples/server.php qpid/proton/trunk/proton-c/include/proton/driver.h qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h qpid/proton/trunk/proton-c/src/driver.c qpid/proton/trunk/proton-c/src/engine/engine.c qpid/proton/trunk/proton-c/src/sasl/sasl.c Modified: qpid/proton/trunk/proton-c/bindings/php/examples/client.php URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/examples/client.php?rev=1333552&r1=1333551&r2=1333552&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/bindings/php/examples/client.php (original) +++ qpid/proton/trunk/proton-c/bindings/php/examples/client.php Thu May 3 17:06:47 2012 @@ -21,14 +21,12 @@ $handler = function($c) { $sasl = pn_connector_sasl($c); switch (pn_sasl_state($sasl)) { case PN_SASL_CONF: + case PN_SASL_STEP: + case PN_SASL_IDLE: case PN_SASL_FAIL: - pn_connector_eos($c); return; case PN_SASL_PASS: break; - case PN_SASL_STEP: - case PN_SASL_IDLE: - return; } global $count, $counter, $sent, $rcvd; @@ -88,7 +86,9 @@ $handler = function($c) { if (pn_updated($delivery)) { // the disposition was updated, let's report it and settle the delivery - //print("disposition for $tag: " . pn_remote_disp($delivery) . "\n"); + print "disposition for $tag: " . + pn_local_disp($delivery) . " " . + pn_remote_disp($delivery) . "\n"; // we could clear the updated flag if we didn't want to settle // pn_clear($delivery); pn_settle($delivery); Modified: qpid/proton/trunk/proton-c/bindings/php/examples/server.php URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/examples/server.php?rev=1333552&r1=1333551&r2=1333552&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/bindings/php/examples/server.php (original) +++ qpid/proton/trunk/proton-c/bindings/php/examples/server.php Thu May 3 17:06:47 2012 @@ -37,12 +37,10 @@ $handler = function($c) { } break; case PN_SASL_FAIL: - pn_connector_eos($c); + case PN_SASL_IDLE: return; case PN_SASL_PASS: break; - case PN_SASL_IDLE: - return; } } @@ -173,7 +171,6 @@ while (TRUE) { // cycle through all listeners with I/O activity while ($l = pn_driver_listener($driver)) { $c = pn_listener_accept($l); - print("listener $l -> $c\n"); pn_connector_set_context($c, $handler); } Modified: qpid/proton/trunk/proton-c/include/proton/driver.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1333552&r1=1333551&r2=1333552&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/include/proton/driver.h (original) +++ qpid/proton/trunk/proton-c/include/proton/driver.h Thu May 3 17:06:47 2012 @@ -57,7 +57,6 @@ pn_sasl_t *pn_connector_sasl(pn_connecto pn_connection_t *pn_connector_connection(pn_connector_t *ctor); void *pn_connector_context(pn_connector_t *ctor); void pn_connector_set_context(pn_connector_t *ctor, void *context); -void pn_connector_eos(pn_connector_t *ctor); void pn_connector_close(pn_connector_t *ctor); bool pn_connector_closed(pn_connector_t *ctor); void pn_connector_destroy(pn_connector_t *ctor); Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1333552&r1=1333551&r2=1333552&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original) +++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Thu May 3 17:06:47 2012 @@ -48,6 +48,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f disp->available = 0; disp->halt = false; + disp->batch = true; return disp; } @@ -127,7 +128,7 @@ ssize_t pn_dispatcher_input(pn_dispatche available -= n; read += n; - if (disp->halt) break; + if (!disp->batch) break; } else { break; } Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1333552&r1=1333551&r2=1333552&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original) +++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Thu May 3 17:06:47 2012 @@ -50,6 +50,7 @@ struct pn_dispatcher_t { char *output; void *context; bool halt; + bool batch; char scratch[SCRATCH]; }; Modified: qpid/proton/trunk/proton-c/src/driver.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1333552&r1=1333551&r2=1333552&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/driver.c (original) +++ qpid/proton/trunk/proton-c/src/driver.c Thu May 3 17:06:47 2012 @@ -61,6 +61,7 @@ struct pn_listener_t { pn_listener_t *next; pn_listener_t *prev; int idx; + bool pending; int fd; void *context; }; @@ -72,6 +73,9 @@ struct pn_connector_t { pn_connector_t *next; pn_connector_t *prev; int idx; + bool pending_tick; + bool pending_read; + bool pending_write; int fd; int status; bool closed; @@ -101,6 +105,7 @@ struct pn_connector_t { static void pn_driver_add_listener(pn_driver_t *d, pn_listener_t *l) { + if (!l->driver) return; LL_ADD(d->listener_head, d->listener_tail, l); l->driver = d; d->listener_count++; @@ -108,6 +113,8 @@ static void pn_driver_add_listener(pn_dr static void pn_driver_remove_listener(pn_driver_t *d, pn_listener_t *l) { + if (!l->driver) return; + if (l == d->listener_next) { d->listener_next = l->next; } @@ -155,14 +162,17 @@ pn_listener_t *pn_listener(pn_driver_t * pn_listener_t *pn_listener_fd(pn_driver_t *driver, int fd, void *context) { + if (!driver) return NULL; + pn_listener_t *l = malloc(sizeof(pn_listener_t)); if (!l) return NULL; l->driver = driver; l->next = NULL; l->prev = NULL; + l->idx = 0; + l->pending = false; l->fd = fd; l->context = context; - l->idx = 0; pn_driver_add_listener(driver, l); return l; @@ -188,9 +198,7 @@ static void pn_configure_sock(int sock) pn_connector_t *pn_listener_accept(pn_listener_t *l) { - if (!(l->idx && l->driver && l->driver->fds[l->idx].revents & POLLIN)) { - return NULL; - } + if (!l || !l->pending) return NULL; struct sockaddr_in addr = {0}; addr.sin_family = AF_INET; @@ -237,6 +245,7 @@ void pn_listener_destroy(pn_listener_t * static void pn_driver_add_connector(pn_driver_t *d, pn_connector_t *c) { + if (!c->driver) return; LL_ADD(d->connector_head, d->connector_tail, c); c->driver = d; d->connector_count++; @@ -244,6 +253,8 @@ static void pn_driver_add_connector(pn_d static void pn_driver_remove_connector(pn_driver_t *d, pn_connector_t *c) { + if (!c->driver) return; + if (c == d->connector_next) { d->connector_next = c->next; } @@ -256,6 +267,8 @@ static void pn_driver_remove_connector(p pn_connector_t *pn_connector(pn_driver_t *driver, const char *host, const char *port, void *context) { + if (!driver) return NULL; + struct addrinfo *addr; int code = getaddrinfo(host, port, NULL, &addr); if (code) { @@ -293,15 +306,20 @@ static ssize_t pn_connector_write_sasl_h static ssize_t pn_connector_write_sasl(pn_connector_t *ctor); static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor); static ssize_t pn_connector_write_amqp(pn_connector_t *ctor); -static ssize_t pn_connector_write_eos(pn_connector_t *ctor); pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context) { + if (!driver) return NULL; + pn_connector_t *c = malloc(sizeof(pn_connector_t)); if (!c) return NULL; c->driver = driver; c->next = NULL; c->prev = NULL; + c->pending_tick = false; + c->pending_read = false; + c->pending_write = false; + c->idx = 0; c->fd = fd; c->status = PN_SEL_RD | PN_SEL_WR; c->closed = false; @@ -321,7 +339,6 @@ pn_connector_t *pn_connector_fd(pn_drive c->output_done = false; c->context = context; c->listener = NULL; - c->idx = 0; pn_connector_trace(c, driver->trace); @@ -361,13 +378,6 @@ pn_listener_t *pn_connector_listener(pn_ return ctor ? ctor->listener : NULL; } -void pn_connector_eos(pn_connector_t *ctor) -{ - if (!ctor) return; - - ctor->process_input = pn_connector_write_eos; -} - void pn_connector_close(pn_connector_t *ctor) { // XXX: should probably signal engine and callback here @@ -585,11 +595,6 @@ static ssize_t pn_connector_write_amqp(p return pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor)); } -static ssize_t pn_connector_write_eos(pn_connector_t *ctor) -{ - return PN_EOS; -} - static time_t pn_connector_tick(pn_connector_t *ctor, time_t now) { // XXX: should probably have a function pointer for this and switch it with different layers @@ -601,18 +606,21 @@ static time_t pn_connector_tick(pn_conne void pn_connector_process(pn_connector_t *c) { if (c) { - int idx = c->idx; - if (!idx) return; - pn_driver_t *d = c->driver; - if (d->fds[idx].revents & POLLIN) { + if (c->pending_tick) { + // XXX: should handle timing also + c->tick(c, 0); + c->pending_tick = false; + } + + if (c->pending_read) { c->read(c); - d->fds[idx].revents &= ~POLLIN; + c->pending_read = false; } pn_connector_process_input(c); pn_connector_process_output(c); - if (d->fds[idx].revents & POLLOUT) { + if (c->pending_write) { c->write(c); - d->fds[idx].revents &= ~POLLOUT; + c->pending_write = false; } if (c->output_size == 0 && c->input_done && c->output_done) { fprintf(stderr, "closed\n"); @@ -672,7 +680,9 @@ void pn_driver_destroy(pn_driver_t *d) void pn_driver_wakeup(pn_driver_t *d) { - write(d->ctrl[1], "x", 1); + if (d) { + write(d->ctrl[1], "x", 1); + } } static void pn_driver_rebuild(pn_driver_t *d) @@ -713,18 +723,6 @@ static void pn_driver_rebuild(pn_driver_ void pn_driver_wait(pn_driver_t *d, int timeout) { pn_driver_rebuild(d); - pn_connector_t *c = d->connector_head; - while (c) { - // XXX: should do this in process - // XXX: should handle timing also - c->tick(c, 0); - c = c->next; - } - - // XXX: double rebuild necessary now due to separating of read/write - // and processing - pn_driver_rebuild(d); - DIE_IFE(poll(d->fds, 1 + d->listener_count + d->connector_count, timeout)); if (d->fds[0].revents & POLLIN) { @@ -740,21 +738,35 @@ void pn_driver_wait(pn_driver_t *d, int pn_listener_t *pn_driver_listener(pn_driver_t *d) { if (!d) return NULL; - pn_listener_t *l = d->listener_next; - if (!l) return NULL; + while (d->listener_next) { + pn_listener_t *l = d->listener_next; + d->listener_next = l->next; - if (!(l->idx && d->fds[l->idx].revents & POLLIN)) { - return NULL; + l->pending = (l->idx && d->fds[l->idx].revents & POLLIN); + + if (l->pending) { + return l; + } } - d->listener_next = l->next; - return l; + return NULL; } pn_connector_t *pn_driver_connector(pn_driver_t *d) { if (!d) return NULL; - pn_connector_t *c = d->connector_next; - if (c) { d->connector_next = c->next; } - return c; + while (d->connector_next) { + pn_connector_t *c = d->connector_next; + d->connector_next = c->next; + + int idx = c->idx; + c->pending_read = (idx && d->fds[idx].revents & POLLIN); + c->pending_write = (idx && d->fds[idx].revents & POLLOUT); + + if (c->pending_read || c->pending_write || c->pending_tick) { + return c; + } + } + + return NULL; } Modified: qpid/proton/trunk/proton-c/src/engine/engine.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1333552&r1=1333551&r2=1333552&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/engine/engine.c (original) +++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu May 3 17:06:47 2012 @@ -419,12 +419,14 @@ pn_error_t *pn_connection_error(pn_conne void pn_connection_set_container(pn_connection_t *connection, const char *container) { + if (!connection) return; if (connection->container) free(connection->container); connection->container = strdup(container); } void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname) { + if (!connection) return; if (connection->hostname) free(connection->hostname); connection->hostname = strdup(hostname); } Modified: qpid/proton/trunk/proton-c/src/sasl/sasl.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl.c?rev=1333552&r1=1333551&r2=1333552&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/sasl/sasl.c (original) +++ qpid/proton/trunk/proton-c/src/sasl/sasl.c Thu May 3 17:06:47 2012 @@ -58,6 +58,7 @@ pn_sasl_t *pn_sasl() { pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t)); sasl->disp = pn_dispatcher(1, sasl); + sasl->disp->batch = false; pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init); pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms); @@ -277,20 +278,28 @@ void pn_sasl_process(pn_sasl_t *sasl) pn_server_done(sasl); sasl->sent_done = true; sasl->rcvd_done = true; + sasl->disp->halt = true; } } ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available) { + ssize_t n = pn_dispatcher_input(sasl->disp, bytes, available); + if (n < 0) return n; + if (sasl->rcvd_done) { if (pn_sasl_state(sasl) == PN_SASL_PASS) { - return PN_EOS; + if (n) { + return n; + } else { + return PN_EOS; + } } else { // XXX: should probably do something better here return PN_ERR; } } else { - return pn_dispatcher_input(sasl->disp, bytes, available); + return n; } } @@ -341,4 +350,5 @@ void pn_do_outcome(pn_dispatcher_t *disp sasl->outcome = pn_to_uint8(pn_list_get(disp->args, SASL_OUTCOME_CODE)); sasl->rcvd_done = true; sasl->sent_done = true; + disp->halt = true; } --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
| Powered by Nabble | Edit this page |
