|
Author: rhs
Date: Fri Apr 27 16:12:46 2012 New Revision: 1331487 URL: http://svn.apache.org/viewvc?rev=1331487&view=rev Log: improved state matching to allow querying of local and remote state independent of each other; added session flow control state; added timeout to pn_driver_wait; fixed bug in pn_driver_rebuild Modified: qpid/proton/trunk/proton-c/include/proton/driver.h qpid/proton/trunk/proton-c/include/proton/engine.h qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h qpid/proton/trunk/proton-c/src/driver.c qpid/proton/trunk/proton-c/src/engine/engine-internal.h qpid/proton/trunk/proton-c/src/engine/engine.c qpid/proton/trunk/proton-c/src/proton.c Modified: qpid/proton/trunk/proton-c/include/proton/driver.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1331487&r1=1331486&r2=1331487&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/include/proton/driver.h (original) +++ qpid/proton/trunk/proton-c/include/proton/driver.h Fri Apr 27 16:12:46 2012 @@ -33,7 +33,7 @@ typedef struct pn_connector_t pn_connect pn_driver_t *pn_driver(void); void pn_driver_trace(pn_driver_t *d, pn_trace_t trace); void pn_driver_wakeup(pn_driver_t *d); -void pn_driver_wait(pn_driver_t *d); +void pn_driver_wait(pn_driver_t *d, int timeout); pn_listener_t *pn_driver_listener(pn_driver_t *d); pn_connector_t *pn_driver_connector(pn_driver_t *d); void pn_driver_destroy(pn_driver_t *d); Modified: qpid/proton/trunk/proton-c/include/proton/engine.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1331487&r1=1331486&r2=1331487&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/include/proton/engine.h (original) +++ qpid/proton/trunk/proton-c/include/proton/engine.h Fri Apr 27 16:12:46 2012 @@ -50,6 +50,9 @@ typedef int pn_state_t; #define PN_REMOTE_ACTIVE (16) #define PN_REMOTE_CLOSED (32) +#define PN_LOCAL_MASK (PN_LOCAL_UNINIT | PN_LOCAL_ACTIVE | PN_LOCAL_CLOSED) +#define PN_REMOTE_MASK (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED) + typedef enum pn_disposition_t { PN_RECEIVED=1, PN_ACCEPTED=2, Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1331487&r1=1331486&r2=1331487&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original) +++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Fri Apr 27 16:12:46 2012 @@ -47,6 +47,8 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f disp->output = malloc(disp->capacity); disp->available = 0; + disp->halt = false; + return disp; } @@ -88,7 +90,7 @@ static void pn_do_trace(pn_dispatcher_t ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available) { size_t read = 0; - while (true) { + while (!disp->halt) { pn_frame_t frame; size_t n = pn_read_frame(&frame, bytes + read, available); if (n) { @@ -124,6 +126,8 @@ ssize_t pn_dispatcher_input(pn_dispatche available -= n; read += n; + + if (disp->halt) break; } else { break; } Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1331487&r1=1331486&r2=1331487&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original) +++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Fri Apr 27 16:12:46 2012 @@ -49,6 +49,7 @@ struct pn_dispatcher_t { size_t available; char *output; void *context; + bool halt; char scratch[SCRATCH]; }; Modified: qpid/proton/trunk/proton-c/src/driver.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1331487&r1=1331486&r2=1331487&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/driver.c (original) +++ qpid/proton/trunk/proton-c/src/driver.c Fri Apr 27 16:12:46 2012 @@ -81,6 +81,7 @@ struct pn_connector_t { time_t (*tick)(pn_connector_t *sel, time_t now); size_t input_size; char input[IO_BUF_SIZE]; + bool input_eos; size_t output_size; char output[IO_BUF_SIZE]; pn_sasl_t *sasl; @@ -88,6 +89,8 @@ struct pn_connector_t { pn_transport_t *transport; ssize_t (*process_input)(pn_connector_t *); ssize_t (*process_output)(pn_connector_t *); + bool input_done; + bool output_done; pn_listener_t *listener; void *context; }; @@ -294,12 +297,15 @@ pn_connector_t *pn_connector_fd(pn_drive c->write = pn_connector_write; c->tick = pn_connector_tick; c->input_size = 0; + c->input_eos = false; c->output_size = 0; c->sasl = pn_sasl(); c->connection = pn_connection(); c->transport = pn_transport(c->connection); c->process_input = pn_connector_read_sasl_header; c->process_output = pn_connector_write_sasl_header; + c->input_done = false; + c->output_done = false; c->context = context; c->listener = NULL; c->idx = 0; @@ -375,34 +381,39 @@ void pn_connector_destroy(pn_connector_t free(ctor); } -static void pn_connector_consume(pn_connector_t *ctor, int n) -{ - ctor->input_size -= n; - memmove(ctor->input, ctor->input + n, ctor->input_size); -} - static void pn_connector_read(pn_connector_t *ctor) { ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0); - if (n <= 0) { printf("disconnected: %zi\n", n); - pn_connector_close(ctor); - return; + ctor->status &= ~PN_SEL_RD; + ctor->input_eos = true; } else { ctor->input_size += n; } +} - while (ctor->input_size > 0) { - n = ctor->process_input(ctor); +static void pn_connector_consume(pn_connector_t *ctor, int n) +{ + ctor->input_size -= n; + memmove(ctor->input, ctor->input + n, ctor->input_size); +} + +static void pn_connector_process_input(pn_connector_t *ctor) +{ + while (!ctor->input_done && (ctor->input_size > 0 || ctor->input_eos)) { + ssize_t n = ctor->process_input(ctor); if (n > 0) { pn_connector_consume(ctor, n); } else if (n == 0) { - return; + break; } else { - if (n != PN_EOS) printf("error in process_input: %zi\n", n); - pn_connector_close(ctor); - return; + if (n != PN_EOS) { + printf("error in process_input: %zi\n", n); + } + ctor->input_done = true; + ctor->output_done = true; + break; } } } @@ -411,13 +422,20 @@ static ssize_t pn_connector_read_sasl_he { if (ctor->input_size >= 8) { if (memcmp(ctor->input, "AMQP\x03\x01\x00\x00", 8)) { - fprintf(stderr, "sasl header missmatch\n"); + fprintf(stderr, "sasl header missmatch: "); + pn_fprint_data(stderr, ctor->input, ctor->input_size); + fprintf(stderr, "\n"); return PN_ERR; } else { fprintf(stderr, " <- AMQP SASL 1.0\n"); ctor->process_input = pn_connector_read_sasl; return 8; } + } else if (ctor->input_eos) { + fprintf(stderr, "sasl header missmatch: "); + pn_fprint_data(stderr, ctor->input, ctor->input_size); + fprintf(stderr, "\n"); + return PN_ERR; } return 0; @@ -448,6 +466,11 @@ static ssize_t pn_connector_read_amqp_he ctor->process_input = pn_connector_read_amqp; return 8; } + } else if (ctor->input_eos) { + fprintf(stderr, "amqp header missmatch: "); + pn_fprint_data(stderr, ctor->input, ctor->input_size); + fprintf(stderr, "\n"); + return PN_ERR; } return 0; @@ -456,7 +479,13 @@ static ssize_t pn_connector_read_amqp_he static ssize_t pn_connector_read_amqp(pn_connector_t *ctor) { pn_transport_t *transport = ctor->transport; - return pn_input(transport, ctor->input, ctor->input_size); + size_t n = 0; + if (ctor->input_size) { + n = pn_input(transport, ctor->input, ctor->input_size); + } else if (ctor->input_eos) { + ctor->input_done = true; + } + return n; } static char *pn_connector_output(pn_connector_t *ctor) @@ -469,38 +498,46 @@ static size_t pn_connector_available(pn_ return IO_BUF_SIZE - ctor->output_size; } -static void pn_connector_write(pn_connector_t *ctor) +static void pn_connector_process_output(pn_connector_t *ctor) { - while (pn_connector_available(ctor) > 0) { + while (!ctor->output_done && pn_connector_available(ctor) > 0) { ssize_t n = ctor->process_output(ctor); if (n > 0) { ctor->output_size += n; } else if (n == 0) { break; } else { - if (n != PN_EOS) fprintf(stderr, "error in process_output: %zi\n", n); - pn_connector_close(ctor); - return; + if (n != PN_EOS) { + fprintf(stderr, "error in process_output: %zi\n", n); + } + ctor->output_done = true; + ctor->input_done = true; + break; } } + if (ctor->output_size) { + ctor->status |= PN_SEL_WR; + } +} + +static void pn_connector_write(pn_connector_t *ctor) +{ if (ctor->output_size > 0) { ssize_t n = send(ctor->fd, ctor->output, ctor->output_size, 0); if (n < 0) { // XXX perror("send"); - pn_connector_close(ctor); - return; + ctor->output_size = 0; + ctor->output_done = true; } else { ctor->output_size -= n; memmove(ctor->output, ctor->output + n, ctor->output_size); } - - if (ctor->output_size) - ctor->status |= PN_SEL_WR; - else - ctor->status &= ~PN_SEL_WR; } + + if (!ctor->output_size) + ctor->status &= ~PN_SEL_WR; } static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor) @@ -547,7 +584,8 @@ static time_t pn_connector_tick(pn_conne { // XXX: should probably have a function pointer for this and switch it with different layers time_t result = pn_tick(ctor->transport, now); - pn_connector_write(ctor); + pn_connector_process_input(ctor); + pn_connector_process_output(ctor); return result; } @@ -556,10 +594,19 @@ void pn_connector_process(pn_connector_t int idx = c->idx; if (!idx) return; pn_driver_t *d = c->driver; - if (d->fds[idx].revents & POLLIN) + if (d->fds[idx].revents & POLLIN) { c->read(c); - if (d->fds[idx].revents & POLLOUT) + d->fds[idx].revents &= ~POLLIN; + } + pn_connector_process_input(c); + pn_connector_process_output(c); + if (d->fds[idx].revents & POLLOUT) { c->write(c); + d->fds[idx].revents &= ~POLLOUT; + } + if (c->output_size == 0 && c->input_done && c->output_done) { + pn_connector_close(c); + } } } @@ -620,7 +667,6 @@ void pn_driver_wakeup(pn_driver_t *d) static void pn_driver_rebuild(pn_driver_t *d) { size_t size = d->listener_count + d->connector_count; - if (size == 0) return; while (d->capacity < size + 1) { d->capacity = d->capacity ? 2*d->capacity : 16; d->fds = realloc(d->fds, d->capacity*sizeof(struct pollfd)); @@ -653,17 +699,22 @@ static void pn_driver_rebuild(pn_driver_ } } -void pn_driver_wait(pn_driver_t *d) { +void pn_driver_wait(pn_driver_t *d, int timeout) { pn_driver_rebuild(d); pn_connector_t *c = d->connector_head; while (c) { - // XXX + // XXX: should do this in process + // XXX: should handle timing also c->tick(c, 0); c = c->next; } - DIE_IFE(poll(d->fds, 1 + d->listener_count + d->connector_count, -1)); + // XXX: double rebuild necessary now due to separating of read/write + // and processing + pn_driver_rebuild(d); + + DIE_IFE(poll(d->fds, 1 + d->listener_count + d->connector_count, timeout)); if (d->fds[0].revents & POLLIN) { //clear the pipe Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1331487&r1=1331486&r2=1331487&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original) +++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Apr 27 16:12:46 2012 @@ -85,7 +85,9 @@ typedef struct { pn_delivery_buffer_t incoming; pn_delivery_buffer_t outgoing; pn_sequence_t incoming_transfer_count; + pn_sequence_t incoming_window; pn_sequence_t outgoing_transfer_count; + pn_sequence_t outgoing_window; pn_link_state_t *links; size_t link_capacity; pn_link_state_t **handles; @@ -147,6 +149,7 @@ struct pn_link_t { pn_delivery_t *current; pn_delivery_t *settled_head; pn_delivery_t *settled_tail; + size_t unsettled_count; pn_sequence_t credit; size_t id; }; @@ -175,10 +178,10 @@ struct pn_delivery_t { }; #define PN_SET_LOCAL(OLD, NEW) \ - (OLD) = ((OLD) & (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED)) | (NEW) + (OLD) = ((OLD) & PN_REMOTE_MASK) | (NEW) #define PN_SET_REMOTE(OLD, NEW) \ - (OLD) = ((OLD) & (PN_LOCAL_UNINIT | PN_LOCAL_ACTIVE | PN_LOCAL_CLOSED)) | (NEW) + (OLD) = ((OLD) & PN_LOCAL_MASK) | (NEW) void pn_link_dump(pn_link_t *link); Modified: qpid/proton/trunk/proton-c/src/engine/engine.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1331487&r1=1331486&r2=1331487&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/engine/engine.c (original) +++ qpid/proton/trunk/proton-c/src/engine/engine.c Fri Apr 27 16:12:46 2012 @@ -485,7 +485,7 @@ void pn_work_update(pn_connection_t *con pn_add_work(connection, delivery); } else if (delivery == current) { if (link->endpoint.type == SENDER) { - if (link->credit > 0) { + if (pn_credit(link) > 0) { pn_add_work(connection, delivery); } else { pn_clear_work(connection, delivery); @@ -552,7 +552,13 @@ void pn_clear_modified(pn_connection_t * bool pn_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) { - return (endpoint->type == type) && (endpoint->state == state); + if (endpoint->type != type) return false; + + int st = endpoint->state; + if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0) + return st & state; + else + return st == state; } pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) @@ -606,6 +612,7 @@ pn_link_t *pn_link_next(pn_link_t *ssn, pn_session_t *pn_session(pn_connection_t *conn) { + if (!conn) return NULL; pn_session_t *ssn = malloc(sizeof(pn_session_t)); if (!ssn) return NULL; @@ -672,8 +679,7 @@ pn_session_state_t *pn_session_get_state { transport->sessions[i] = (pn_session_state_t) {.session=NULL, .local_channel=-1, - .remote_channel=-1, - .outgoing_transfer_count=0}; + .remote_channel=-1}; pn_delivery_buffer_init(&transport->sessions[i].incoming, 0, 1024); pn_delivery_buffer_init(&transport->sessions[i].outgoing, 0, 1024); } @@ -732,6 +738,7 @@ void pn_link_init(pn_link_t *link, int t link->remote_target = NULL; link->settled_head = link->settled_tail = NULL; link->head = link->tail = link->current = NULL; + link->unsettled_count = 0; link->credit = 0; } @@ -788,6 +795,7 @@ pn_link_state_t *pn_handle_state(pn_sess pn_link_t *pn_sender(pn_session_t *session, const wchar_t *name) { + if (!session) return NULL; pn_link_t *snd = malloc(sizeof(pn_link_t)); if (!snd) return NULL; pn_link_init(snd, SENDER, session, name); @@ -796,6 +804,7 @@ pn_link_t *pn_sender(pn_session_t *sessi pn_link_t *pn_receiver(pn_session_t *session, const wchar_t *name) { + if (!session) return NULL; pn_link_t *rcv = malloc(sizeof(pn_link_t)); if (!rcv) return NULL; pn_link_init(rcv, RECEIVER, session, name); @@ -829,6 +838,7 @@ pn_session_t *pn_get_session(pn_link_t * pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) { + if (!link) return NULL; pn_delivery_t *delivery = link->settled_head; LL_POP_PFX(link->settled_head, link->settled_tail, link_); if (!delivery) delivery = malloc(sizeof(pn_delivery_t)); @@ -856,6 +866,8 @@ pn_delivery_t *pn_delivery(pn_link_t *li if (!link->current) link->current = delivery; + link->unsettled_count++; + pn_work_update(link->session->connection, delivery); return delivery; @@ -906,7 +918,7 @@ pn_delivery_t *pn_current(pn_link_t *lin void pn_advance_sender(pn_link_t *link) { - if (link->credit > 0) { + if (pn_credit(link) > 0) { link->current->done = true; link->credit--; pn_add_tpwork(link->current); @@ -939,7 +951,26 @@ bool pn_advance(pn_link_t *link) int pn_credit(pn_link_t *link) { - return link ? link->credit : 0; + if (!link) return 0; + + if (pn_is_receiver(link)) + return link->credit; + + pn_session_t *ssn = link->session; + pn_connection_t *conn = ssn->connection; + pn_transport_t *transport = conn->transport; + int available = 0; + if (transport) { + pn_session_state_t *ssn_state = pn_session_get_state(transport, ssn); + available = ssn_state->outgoing.capacity - link->unsettled_count; + if (ssn_state->outgoing_window < available) + available = ssn_state->outgoing_window; + } + + int credit = link->credit; + if (available < credit) credit = available; + + return credit; } void pn_real_settle(pn_delivery_t *delivery) @@ -950,6 +981,7 @@ void pn_real_settle(pn_delivery_t *deliv LL_ADD_PFX(link->settled_head, link->settled_tail, delivery, link_); pn_clear_tag(delivery); pn_clear_bytes(delivery); + link->unsettled_count--; } void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery) @@ -968,6 +1000,16 @@ void pn_settle(pn_delivery_t *delivery) pn_work_update(delivery->link->session->connection, delivery); } +void pn_post_close(pn_transport_t *transport) +{ + pn_init_frame(transport->disp); + const char *condition = transport->endpoint.error.condition; + if (condition) + // XXX: description + pn_field(transport->disp, CLOSE_ERROR, pn_value("L([s])", ERROR, condition)); + pn_post_frame(transport->disp, 0, CLOSE); +} + void pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...) { va_list ap; @@ -976,9 +1018,12 @@ void pn_do_error(pn_transport_t *transpo // XXX: result vsnprintf(transport->endpoint.error.description, DESCRIPTION, fmt, ap); va_end(ap); - PN_SET_LOCAL(transport->endpoint.state, PN_LOCAL_CLOSED); fprintf(stderr, "ERROR %s %s\n", condition, transport->endpoint.error.description); - // XXX: need to write close frame if appropriate + if (!transport->close_sent) + pn_post_close(transport); + PN_SET_LOCAL(transport->endpoint.state, PN_LOCAL_CLOSED); + PN_SET_REMOTE(transport->endpoint.state, PN_REMOTE_CLOSED); + transport->disp->halt = true; } void pn_do_open(pn_dispatcher_t *disp) @@ -1112,6 +1157,7 @@ void pn_do_transfer(pn_dispatcher_t *dis delivery->done = !pn_to_bool(pn_list_get(args, TRANSFER_MORE)); ssn_state->incoming_transfer_count++; + ssn_state->incoming_window--; } void pn_do_flow(pn_dispatcher_t *disp) @@ -1120,6 +1166,17 @@ void pn_do_flow(pn_dispatcher_t *disp) pn_list_t *args = disp->args; pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel); + pn_sequence_t iwin = pn_to_uint32(pn_list_get(args, FLOW_INCOMING_WINDOW)); + //pn_sequence_t owin = pn_to_uint32(pn_list_get(args, FLOW_OUTGOING_WINDOW)); + //pn_sequence_t onext = pn_to_uint32(pn_list_get(args, FLOW_NEXT_OUTGOING_ID)); + pn_value_t vinext = pn_list_get(args, FLOW_NEXT_INCOMING_ID); + if (vinext.type == EMPTY) { + ssn_state->outgoing_window = iwin; + } else { + pn_sequence_t inext = pn_to_uint32(vinext); + ssn_state->outgoing_window = inext + iwin - ssn_state->outgoing_transfer_count; + } + pn_value_t vhandle = pn_list_get(args, FLOW_HANDLE); if (vhandle.type != EMPTY) { uint32_t handle = pn_to_uint32(vhandle); @@ -1153,21 +1210,22 @@ void pn_do_disposition(pn_dispatcher_t * pn_sequence_t last = lastv.type == EMPTY ? first : pn_to_int32(lastv); bool settled = pn_to_bool(pn_list_get(args, DISPOSITION_SETTLED)); pn_tag_t *dstate = pn_to_tag(pn_list_get(args, DISPOSITION_STATE)); - uint64_t code = pn_to_uint32(pn_tag_descriptor(dstate)); - pn_disposition_t dispo; - switch (code) - { - case ACCEPTED: - dispo = PN_ACCEPTED; - break; - case REJECTED: - dispo = PN_REJECTED; - break; - default: - // XXX - fprintf(stderr, "default %" PRIu64 "\n", code); - dispo = 0; - break; + pn_disposition_t dispo = 0; + if (dstate) { + uint64_t code = pn_to_uint32(pn_tag_descriptor(dstate)); + switch (code) + { + case ACCEPTED: + dispo = PN_ACCEPTED; + break; + case REJECTED: + dispo = PN_REJECTED; + break; + default: + // XXX + fprintf(stderr, "default %" PRIu64 "\n", code); + break; + } } pn_delivery_buffer_t *deliveries; @@ -1328,8 +1386,8 @@ void pn_post_flow(pn_transport_t *transp pn_init_frame(transport->disp); if ((int16_t) ssn_state->remote_channel >= 0) pn_field(transport->disp, FLOW_NEXT_INCOMING_ID, pn_value("I", ssn_state->incoming_transfer_count)); - pn_field(transport->disp, FLOW_INCOMING_WINDOW, - pn_value("I", pn_delivery_buffer_available(&ssn_state->incoming))); + ssn_state->incoming_window = pn_delivery_buffer_available(&ssn_state->incoming); + pn_field(transport->disp, FLOW_INCOMING_WINDOW, pn_value("I", ssn_state->incoming_window)); pn_field(transport->disp, FLOW_NEXT_OUTGOING_ID, pn_value("I", ssn_state->outgoing.next)); pn_field(transport->disp, FLOW_OUTGOING_WINDOW, pn_value("I", pn_delivery_buffer_available(&ssn_state->outgoing))); @@ -1402,7 +1460,6 @@ void pn_process_disp_receiver(pn_transpo pn_link_t *link = delivery->link; if (link->endpoint.type == RECEIVER) { // XXX: need to prevent duplicate disposition sending - fprintf(stderr, "settled=%u\n", delivery->local_settled); pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session); if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled) { pn_post_disp(transport, delivery); @@ -1411,8 +1468,8 @@ void pn_process_disp_receiver(pn_transpo if (delivery->local_settled) { size_t available = pn_delivery_buffer_available(&ssn_state->incoming); pn_full_settle(&ssn_state->incoming, delivery); - fprintf(stderr, "%zi, %zi\n", available, pn_delivery_buffer_available(&ssn_state->incoming)); - if (pn_delivery_buffer_available(&ssn_state->incoming) > available) { + if (!ssn_state->incoming_window && + pn_delivery_buffer_available(&ssn_state->incoming) > available) { pn_post_flow(transport, ssn_state, NULL); } } @@ -1453,6 +1510,7 @@ void pn_process_msg_data(pn_transport_t } pn_post_frame(transport->disp, ssn_state->local_channel, TRANSFER); ssn_state->outgoing_transfer_count++; + ssn_state->outgoing_window--; if (delivery->done) { state->sent = true; link_state->delivery_count++; @@ -1541,11 +1599,7 @@ void pn_process_conn_teardown(pn_transpo if (endpoint->type == CONNECTION) { if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) { - pn_init_frame(transport->disp); - /*if (condition) - // XXX: symbol - pn_field(eng, CLOSE_ERROR, pn_value("B([zS])", ERROR, condition, description));*/ - pn_post_frame(transport->disp, 0, CLOSE); + pn_post_close(transport); transport->close_sent = true; } } @@ -1691,7 +1745,7 @@ bool pn_writable(pn_delivery_t *delivery if (!delivery) return false; pn_link_t *link = delivery->link; - return pn_is_sender(link) && pn_is_current(delivery) && link->credit > 0; + return pn_is_sender(link) && pn_is_current(delivery) && pn_credit(link) > 0; } bool pn_readable(pn_delivery_t *delivery) Modified: qpid/proton/trunk/proton-c/src/proton.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1331487&r1=1331486&r2=1331487&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/proton.c (original) +++ qpid/proton/trunk/proton-c/src/proton.c Fri Apr 27 16:12:46 2012 @@ -177,7 +177,7 @@ void server_callback(pn_connector_t *cto if (pn_updated(delivery)) { printf("disposition for %s: %u\n", tagstr, pn_remote_disp(delivery)); - pn_clear(delivery); + pn_settle(delivery); } delivery = pn_work_next(delivery); @@ -403,7 +403,7 @@ int main(int argc, char **argv) mbstowcs(ctx.address, address, 1024); if (!pn_connector(drv, host, port, &ctx)) pn_fatal("connector failed\n"); while (!ctx.done) { - pn_driver_wait(drv); + pn_driver_wait(drv, -1); pn_connector_t *c; while ((c = pn_driver_connector(drv))) { pn_connector_process(c); @@ -417,7 +417,7 @@ int main(int argc, char **argv) struct server_context ctx = {0}; if (!pn_listener(drv, host, port, &ctx)) pn_fatal("listener failed\n"); while (true) { - pn_driver_wait(drv); + pn_driver_wait(drv, -1); pn_listener_t *l; pn_connector_t *c; --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
| Powered by Nabble | Edit this page |
