Quantcast

svn commit: r1331487 - in /qpid/proton/trunk/proton-c: include/proton/driver.h include/proton/engine.h src/dispatcher/dispatcher.c src/dispatcher/dispatcher.h src/driver.c src/engine/engine-internal.h src/engine/engine.c src/proton.c

classic Classic list List threaded Threaded
1 message Options
rhs
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

svn commit: r1331487 - in /qpid/proton/trunk/proton-c: include/proton/driver.h include/proton/engine.h src/dispatcher/dispatcher.c src/dispatcher/dispatcher.h src/driver.c src/engine/engine-internal.h src/engine/engine.c src/proton.c

rhs
Author: rhs
Date: Fri Apr 27 16:12:46 2012
New Revision: 1331487

URL: http://svn.apache.org/viewvc?rev=1331487&view=rev
Log:
improved state matching to allow querying of local and remote state independent of each other; added session flow control state; added timeout to pn_driver_wait; fixed bug in pn_driver_rebuild

Modified:
    qpid/proton/trunk/proton-c/include/proton/driver.h
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/proton.c

Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1331487&r1=1331486&r2=1331487&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Fri Apr 27 16:12:46 2012
@@ -33,7 +33,7 @@ typedef struct pn_connector_t pn_connect
 pn_driver_t *pn_driver(void);
 void pn_driver_trace(pn_driver_t *d, pn_trace_t trace);
 void pn_driver_wakeup(pn_driver_t *d);
-void pn_driver_wait(pn_driver_t *d);
+void pn_driver_wait(pn_driver_t *d, int timeout);
 pn_listener_t *pn_driver_listener(pn_driver_t *d);
 pn_connector_t *pn_driver_connector(pn_driver_t *d);
 void pn_driver_destroy(pn_driver_t *d);

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1331487&r1=1331486&r2=1331487&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Fri Apr 27 16:12:46 2012
@@ -50,6 +50,9 @@ typedef int pn_state_t;
 #define PN_REMOTE_ACTIVE (16)
 #define PN_REMOTE_CLOSED (32)
 
+#define PN_LOCAL_MASK (PN_LOCAL_UNINIT | PN_LOCAL_ACTIVE | PN_LOCAL_CLOSED)
+#define PN_REMOTE_MASK (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED)
+
 typedef enum pn_disposition_t {
   PN_RECEIVED=1,
   PN_ACCEPTED=2,

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1331487&r1=1331486&r2=1331487&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Fri Apr 27 16:12:46 2012
@@ -47,6 +47,8 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
   disp->output = malloc(disp->capacity);
   disp->available = 0;
 
+  disp->halt = false;
+
   return disp;
 }
 
@@ -88,7 +90,7 @@ static void pn_do_trace(pn_dispatcher_t
 ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available)
 {
   size_t read = 0;
-  while (true) {
+  while (!disp->halt) {
     pn_frame_t frame;
     size_t n = pn_read_frame(&frame, bytes + read, available);
     if (n) {
@@ -124,6 +126,8 @@ ssize_t pn_dispatcher_input(pn_dispatche
 
       available -= n;
       read += n;
+
+      if (disp->halt) break;
     } else {
       break;
     }

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1331487&r1=1331486&r2=1331487&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Fri Apr 27 16:12:46 2012
@@ -49,6 +49,7 @@ struct pn_dispatcher_t {
   size_t available;
   char *output;
   void *context;
+  bool halt;
   char scratch[SCRATCH];
 };
 

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1331487&r1=1331486&r2=1331487&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Fri Apr 27 16:12:46 2012
@@ -81,6 +81,7 @@ struct pn_connector_t {
   time_t (*tick)(pn_connector_t *sel, time_t now);
   size_t input_size;
   char input[IO_BUF_SIZE];
+  bool input_eos;
   size_t output_size;
   char output[IO_BUF_SIZE];
   pn_sasl_t *sasl;
@@ -88,6 +89,8 @@ struct pn_connector_t {
   pn_transport_t *transport;
   ssize_t (*process_input)(pn_connector_t *);
   ssize_t (*process_output)(pn_connector_t *);
+  bool input_done;
+  bool output_done;
   pn_listener_t *listener;
   void *context;
 };
@@ -294,12 +297,15 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->write = pn_connector_write;
   c->tick = pn_connector_tick;
   c->input_size = 0;
+  c->input_eos = false;
   c->output_size = 0;
   c->sasl = pn_sasl();
   c->connection = pn_connection();
   c->transport = pn_transport(c->connection);
   c->process_input = pn_connector_read_sasl_header;
   c->process_output = pn_connector_write_sasl_header;
+  c->input_done = false;
+  c->output_done = false;
   c->context = context;
   c->listener = NULL;
   c->idx = 0;
@@ -375,34 +381,39 @@ void pn_connector_destroy(pn_connector_t
   free(ctor);
 }
 
-static void pn_connector_consume(pn_connector_t *ctor, int n)
-{
-  ctor->input_size -= n;
-  memmove(ctor->input, ctor->input + n, ctor->input_size);
-}
-
 static void pn_connector_read(pn_connector_t *ctor)
 {
   ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
-
   if (n <= 0) {
     printf("disconnected: %zi\n", n);
-    pn_connector_close(ctor);
-    return;
+    ctor->status &= ~PN_SEL_RD;
+    ctor->input_eos = true;
   } else {
     ctor->input_size += n;
   }
+}
 
-  while (ctor->input_size > 0) {
-    n = ctor->process_input(ctor);
+static void pn_connector_consume(pn_connector_t *ctor, int n)
+{
+  ctor->input_size -= n;
+  memmove(ctor->input, ctor->input + n, ctor->input_size);
+}
+
+static void pn_connector_process_input(pn_connector_t *ctor)
+{
+  while (!ctor->input_done && (ctor->input_size > 0 || ctor->input_eos)) {
+    ssize_t n = ctor->process_input(ctor);
     if (n > 0) {
       pn_connector_consume(ctor, n);
     } else if (n == 0) {
-      return;
+      break;
     } else {
-      if (n != PN_EOS) printf("error in process_input: %zi\n", n);
-      pn_connector_close(ctor);
-      return;
+      if (n != PN_EOS) {
+        printf("error in process_input: %zi\n", n);
+      }
+      ctor->input_done = true;
+      ctor->output_done = true;
+      break;
     }
   }
 }
@@ -411,13 +422,20 @@ static ssize_t pn_connector_read_sasl_he
 {
   if (ctor->input_size >= 8) {
     if (memcmp(ctor->input, "AMQP\x03\x01\x00\x00", 8)) {
-      fprintf(stderr, "sasl header missmatch\n");
+      fprintf(stderr, "sasl header missmatch: ");
+      pn_fprint_data(stderr, ctor->input, ctor->input_size);
+      fprintf(stderr, "\n");
       return PN_ERR;
     } else {
       fprintf(stderr, "    <- AMQP SASL 1.0\n");
       ctor->process_input = pn_connector_read_sasl;
       return 8;
     }
+  } else if (ctor->input_eos) {
+    fprintf(stderr, "sasl header missmatch: ");
+    pn_fprint_data(stderr, ctor->input, ctor->input_size);
+    fprintf(stderr, "\n");
+    return PN_ERR;
   }
 
   return 0;
@@ -448,6 +466,11 @@ static ssize_t pn_connector_read_amqp_he
       ctor->process_input = pn_connector_read_amqp;
       return 8;
     }
+  } else if (ctor->input_eos) {
+    fprintf(stderr, "amqp header missmatch: ");
+    pn_fprint_data(stderr, ctor->input, ctor->input_size);
+    fprintf(stderr, "\n");
+    return PN_ERR;
   }
 
   return 0;
@@ -456,7 +479,13 @@ static ssize_t pn_connector_read_amqp_he
 static ssize_t pn_connector_read_amqp(pn_connector_t *ctor)
 {
   pn_transport_t *transport = ctor->transport;
-  return pn_input(transport, ctor->input, ctor->input_size);
+  size_t n = 0;
+  if (ctor->input_size) {
+    n = pn_input(transport, ctor->input, ctor->input_size);
+  } else if (ctor->input_eos) {
+    ctor->input_done = true;
+  }
+  return n;
 }
 
 static char *pn_connector_output(pn_connector_t *ctor)
@@ -469,38 +498,46 @@ static size_t pn_connector_available(pn_
   return IO_BUF_SIZE - ctor->output_size;
 }
 
-static void pn_connector_write(pn_connector_t *ctor)
+static void pn_connector_process_output(pn_connector_t *ctor)
 {
-  while (pn_connector_available(ctor) > 0) {
+  while (!ctor->output_done && pn_connector_available(ctor) > 0) {
     ssize_t n = ctor->process_output(ctor);
     if (n > 0) {
       ctor->output_size += n;
     } else if (n == 0) {
       break;
     } else {
-      if (n != PN_EOS) fprintf(stderr, "error in process_output: %zi\n", n);
-      pn_connector_close(ctor);
-      return;
+      if (n != PN_EOS) {
+        fprintf(stderr, "error in process_output: %zi\n", n);
+      }
+      ctor->output_done = true;
+      ctor->input_done = true;
+      break;
     }
   }
 
+  if (ctor->output_size) {
+    ctor->status |= PN_SEL_WR;
+  }
+}
+
+static void pn_connector_write(pn_connector_t *ctor)
+{
   if (ctor->output_size > 0) {
     ssize_t n = send(ctor->fd, ctor->output, ctor->output_size, 0);
     if (n < 0) {
       // XXX
       perror("send");
-      pn_connector_close(ctor);
-      return;
+      ctor->output_size = 0;
+      ctor->output_done = true;
     } else {
       ctor->output_size -= n;
       memmove(ctor->output, ctor->output + n, ctor->output_size);
     }
-
-    if (ctor->output_size)
-      ctor->status |= PN_SEL_WR;
-    else
-      ctor->status &= ~PN_SEL_WR;
   }
+
+  if (!ctor->output_size)
+    ctor->status &= ~PN_SEL_WR;
 }
 
 static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor)
@@ -547,7 +584,8 @@ static time_t pn_connector_tick(pn_conne
 {
   // XXX: should probably have a function pointer for this and switch it with different layers
   time_t result = pn_tick(ctor->transport, now);
-  pn_connector_write(ctor);
+  pn_connector_process_input(ctor);
+  pn_connector_process_output(ctor);
   return result;
 }
 
@@ -556,10 +594,19 @@ void pn_connector_process(pn_connector_t
     int idx = c->idx;
     if (!idx) return;
     pn_driver_t *d = c->driver;
-    if (d->fds[idx].revents & POLLIN)
+    if (d->fds[idx].revents & POLLIN) {
       c->read(c);
-    if (d->fds[idx].revents & POLLOUT)
+      d->fds[idx].revents &= ~POLLIN;
+    }
+    pn_connector_process_input(c);
+    pn_connector_process_output(c);
+    if (d->fds[idx].revents & POLLOUT) {
       c->write(c);
+      d->fds[idx].revents &= ~POLLOUT;
+    }
+    if (c->output_size == 0 && c->input_done && c->output_done) {
+      pn_connector_close(c);
+    }
   }
 }
 
@@ -620,7 +667,6 @@ void pn_driver_wakeup(pn_driver_t *d)
 static void pn_driver_rebuild(pn_driver_t *d)
 {
   size_t size = d->listener_count + d->connector_count;
-  if (size == 0) return;
   while (d->capacity < size + 1) {
     d->capacity = d->capacity ? 2*d->capacity : 16;
     d->fds = realloc(d->fds, d->capacity*sizeof(struct pollfd));
@@ -653,17 +699,22 @@ static void pn_driver_rebuild(pn_driver_
   }
 }
 
-void pn_driver_wait(pn_driver_t *d) {
+void pn_driver_wait(pn_driver_t *d, int timeout) {
   pn_driver_rebuild(d);
 
   pn_connector_t *c = d->connector_head;
   while (c) {
-    // XXX
+    // XXX: should do this in process
+    // XXX: should handle timing also
     c->tick(c, 0);
     c = c->next;
   }
 
-  DIE_IFE(poll(d->fds, 1 + d->listener_count + d->connector_count, -1));
+  // XXX: double rebuild necessary now due to separating of read/write
+  // and processing
+  pn_driver_rebuild(d);
+
+  DIE_IFE(poll(d->fds, 1 + d->listener_count + d->connector_count, timeout));
 
   if (d->fds[0].revents & POLLIN) {
     //clear the pipe

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1331487&r1=1331486&r2=1331487&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Apr 27 16:12:46 2012
@@ -85,7 +85,9 @@ typedef struct {
   pn_delivery_buffer_t incoming;
   pn_delivery_buffer_t outgoing;
   pn_sequence_t incoming_transfer_count;
+  pn_sequence_t incoming_window;
   pn_sequence_t outgoing_transfer_count;
+  pn_sequence_t outgoing_window;
   pn_link_state_t *links;
   size_t link_capacity;
   pn_link_state_t **handles;
@@ -147,6 +149,7 @@ struct pn_link_t {
   pn_delivery_t *current;
   pn_delivery_t *settled_head;
   pn_delivery_t *settled_tail;
+  size_t unsettled_count;
   pn_sequence_t credit;
   size_t id;
 };
@@ -175,10 +178,10 @@ struct pn_delivery_t {
 };
 
 #define PN_SET_LOCAL(OLD, NEW)                                          \
-  (OLD) = ((OLD) & (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED)) | (NEW)
+  (OLD) = ((OLD) & PN_REMOTE_MASK) | (NEW)
 
 #define PN_SET_REMOTE(OLD, NEW)                                         \
-  (OLD) = ((OLD) & (PN_LOCAL_UNINIT | PN_LOCAL_ACTIVE | PN_LOCAL_CLOSED)) | (NEW)
+  (OLD) = ((OLD) & PN_LOCAL_MASK) | (NEW)
 
 void pn_link_dump(pn_link_t *link);
 

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1331487&r1=1331486&r2=1331487&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Fri Apr 27 16:12:46 2012
@@ -485,7 +485,7 @@ void pn_work_update(pn_connection_t *con
     pn_add_work(connection, delivery);
   } else if (delivery == current) {
     if (link->endpoint.type == SENDER) {
-      if (link->credit > 0) {
+      if (pn_credit(link) > 0) {
         pn_add_work(connection, delivery);
       } else {
         pn_clear_work(connection, delivery);
@@ -552,7 +552,13 @@ void pn_clear_modified(pn_connection_t *
 
 bool pn_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
 {
-  return (endpoint->type == type) && (endpoint->state == state);
+  if (endpoint->type != type) return false;
+
+  int st = endpoint->state;
+  if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0)
+    return st & state;
+  else
+    return st == state;
 }
 
 pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
@@ -606,6 +612,7 @@ pn_link_t *pn_link_next(pn_link_t *ssn,
 
 pn_session_t *pn_session(pn_connection_t *conn)
 {
+  if (!conn) return NULL;
   pn_session_t *ssn = malloc(sizeof(pn_session_t));
   if (!ssn) return NULL;
 
@@ -672,8 +679,7 @@ pn_session_state_t *pn_session_get_state
   {
     transport->sessions[i] = (pn_session_state_t) {.session=NULL,
                                                    .local_channel=-1,
-                                                   .remote_channel=-1,
-                                                   .outgoing_transfer_count=0};
+                                                   .remote_channel=-1};
     pn_delivery_buffer_init(&transport->sessions[i].incoming, 0, 1024);
     pn_delivery_buffer_init(&transport->sessions[i].outgoing, 0, 1024);
   }
@@ -732,6 +738,7 @@ void pn_link_init(pn_link_t *link, int t
   link->remote_target = NULL;
   link->settled_head = link->settled_tail = NULL;
   link->head = link->tail = link->current = NULL;
+  link->unsettled_count = 0;
   link->credit = 0;
 }
 
@@ -788,6 +795,7 @@ pn_link_state_t *pn_handle_state(pn_sess
 
 pn_link_t *pn_sender(pn_session_t *session, const wchar_t *name)
 {
+  if (!session) return NULL;
   pn_link_t *snd = malloc(sizeof(pn_link_t));
   if (!snd) return NULL;
   pn_link_init(snd, SENDER, session, name);
@@ -796,6 +804,7 @@ pn_link_t *pn_sender(pn_session_t *sessi
 
 pn_link_t *pn_receiver(pn_session_t *session, const wchar_t *name)
 {
+  if (!session) return NULL;
   pn_link_t *rcv = malloc(sizeof(pn_link_t));
   if (!rcv) return NULL;
   pn_link_init(rcv, RECEIVER, session, name);
@@ -829,6 +838,7 @@ pn_session_t *pn_get_session(pn_link_t *
 
 pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
 {
+  if (!link) return NULL;
   pn_delivery_t *delivery = link->settled_head;
   LL_POP_PFX(link->settled_head, link->settled_tail, link_);
   if (!delivery) delivery = malloc(sizeof(pn_delivery_t));
@@ -856,6 +866,8 @@ pn_delivery_t *pn_delivery(pn_link_t *li
   if (!link->current)
     link->current = delivery;
 
+  link->unsettled_count++;
+
   pn_work_update(link->session->connection, delivery);
 
   return delivery;
@@ -906,7 +918,7 @@ pn_delivery_t *pn_current(pn_link_t *lin
 
 void pn_advance_sender(pn_link_t *link)
 {
-  if (link->credit > 0) {
+  if (pn_credit(link) > 0) {
     link->current->done = true;
     link->credit--;
     pn_add_tpwork(link->current);
@@ -939,7 +951,26 @@ bool pn_advance(pn_link_t *link)
 
 int pn_credit(pn_link_t *link)
 {
-  return link ? link->credit : 0;
+  if (!link) return 0;
+
+  if (pn_is_receiver(link))
+    return link->credit;
+
+  pn_session_t *ssn = link->session;
+  pn_connection_t *conn = ssn->connection;
+  pn_transport_t *transport = conn->transport;
+  int available = 0;
+  if (transport) {
+    pn_session_state_t *ssn_state = pn_session_get_state(transport, ssn);
+    available = ssn_state->outgoing.capacity - link->unsettled_count;
+    if (ssn_state->outgoing_window < available)
+      available = ssn_state->outgoing_window;
+  }
+
+  int credit = link->credit;
+  if (available < credit) credit = available;
+
+  return credit;
 }
 
 void pn_real_settle(pn_delivery_t *delivery)
@@ -950,6 +981,7 @@ void pn_real_settle(pn_delivery_t *deliv
   LL_ADD_PFX(link->settled_head, link->settled_tail, delivery, link_);
   pn_clear_tag(delivery);
   pn_clear_bytes(delivery);
+  link->unsettled_count--;
 }
 
 void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
@@ -968,6 +1000,16 @@ void pn_settle(pn_delivery_t *delivery)
   pn_work_update(delivery->link->session->connection, delivery);
 }
 
+void pn_post_close(pn_transport_t *transport)
+{
+  pn_init_frame(transport->disp);
+  const char *condition = transport->endpoint.error.condition;
+  if (condition)
+    // XXX: description
+    pn_field(transport->disp, CLOSE_ERROR, pn_value("L([s])", ERROR, condition));
+  pn_post_frame(transport->disp, 0, CLOSE);
+}
+
 void pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
 {
   va_list ap;
@@ -976,9 +1018,12 @@ void pn_do_error(pn_transport_t *transpo
   // XXX: result
   vsnprintf(transport->endpoint.error.description, DESCRIPTION, fmt, ap);
   va_end(ap);
-  PN_SET_LOCAL(transport->endpoint.state, PN_LOCAL_CLOSED);
   fprintf(stderr, "ERROR %s %s\n", condition, transport->endpoint.error.description);
-  // XXX: need to write close frame if appropriate
+  if (!transport->close_sent)
+    pn_post_close(transport);
+  PN_SET_LOCAL(transport->endpoint.state, PN_LOCAL_CLOSED);
+  PN_SET_REMOTE(transport->endpoint.state, PN_REMOTE_CLOSED);
+  transport->disp->halt = true;
 }
 
 void pn_do_open(pn_dispatcher_t *disp)
@@ -1112,6 +1157,7 @@ void pn_do_transfer(pn_dispatcher_t *dis
   delivery->done = !pn_to_bool(pn_list_get(args, TRANSFER_MORE));
 
   ssn_state->incoming_transfer_count++;
+  ssn_state->incoming_window--;
 }
 
 void pn_do_flow(pn_dispatcher_t *disp)
@@ -1120,6 +1166,17 @@ void pn_do_flow(pn_dispatcher_t *disp)
   pn_list_t *args = disp->args;
   pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
 
+  pn_sequence_t iwin = pn_to_uint32(pn_list_get(args, FLOW_INCOMING_WINDOW));
+  //pn_sequence_t owin = pn_to_uint32(pn_list_get(args, FLOW_OUTGOING_WINDOW));
+  //pn_sequence_t onext = pn_to_uint32(pn_list_get(args, FLOW_NEXT_OUTGOING_ID));
+  pn_value_t vinext = pn_list_get(args, FLOW_NEXT_INCOMING_ID);
+  if (vinext.type == EMPTY) {
+    ssn_state->outgoing_window = iwin;
+  } else {
+    pn_sequence_t inext = pn_to_uint32(vinext);
+    ssn_state->outgoing_window = inext + iwin - ssn_state->outgoing_transfer_count;
+  }
+
   pn_value_t vhandle = pn_list_get(args, FLOW_HANDLE);
   if (vhandle.type != EMPTY) {
     uint32_t handle = pn_to_uint32(vhandle);
@@ -1153,21 +1210,22 @@ void pn_do_disposition(pn_dispatcher_t *
   pn_sequence_t last = lastv.type == EMPTY ? first : pn_to_int32(lastv);
   bool settled = pn_to_bool(pn_list_get(args, DISPOSITION_SETTLED));
   pn_tag_t *dstate = pn_to_tag(pn_list_get(args, DISPOSITION_STATE));
-  uint64_t code = pn_to_uint32(pn_tag_descriptor(dstate));
-  pn_disposition_t dispo;
-  switch (code)
-  {
-  case ACCEPTED:
-    dispo = PN_ACCEPTED;
-    break;
-  case REJECTED:
-    dispo = PN_REJECTED;
-    break;
-  default:
-    // XXX
-    fprintf(stderr, "default %" PRIu64 "\n", code);
-    dispo = 0;
-    break;
+  pn_disposition_t dispo = 0;
+  if (dstate) {
+    uint64_t code = pn_to_uint32(pn_tag_descriptor(dstate));
+    switch (code)
+    {
+    case ACCEPTED:
+      dispo = PN_ACCEPTED;
+      break;
+    case REJECTED:
+      dispo = PN_REJECTED;
+      break;
+    default:
+      // XXX
+      fprintf(stderr, "default %" PRIu64 "\n", code);
+      break;
+    }
   }
 
   pn_delivery_buffer_t *deliveries;
@@ -1328,8 +1386,8 @@ void pn_post_flow(pn_transport_t *transp
   pn_init_frame(transport->disp);
   if ((int16_t) ssn_state->remote_channel >= 0)
     pn_field(transport->disp, FLOW_NEXT_INCOMING_ID, pn_value("I", ssn_state->incoming_transfer_count));
-  pn_field(transport->disp, FLOW_INCOMING_WINDOW,
-           pn_value("I", pn_delivery_buffer_available(&ssn_state->incoming)));
+  ssn_state->incoming_window = pn_delivery_buffer_available(&ssn_state->incoming);
+  pn_field(transport->disp, FLOW_INCOMING_WINDOW, pn_value("I", ssn_state->incoming_window));
   pn_field(transport->disp, FLOW_NEXT_OUTGOING_ID, pn_value("I", ssn_state->outgoing.next));
   pn_field(transport->disp, FLOW_OUTGOING_WINDOW,
            pn_value("I", pn_delivery_buffer_available(&ssn_state->outgoing)));
@@ -1402,7 +1460,6 @@ void pn_process_disp_receiver(pn_transpo
       pn_link_t *link = delivery->link;
       if (link->endpoint.type == RECEIVER) {
         // XXX: need to prevent duplicate disposition sending
-        fprintf(stderr, "settled=%u\n", delivery->local_settled);
         pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
         if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled) {
           pn_post_disp(transport, delivery);
@@ -1411,8 +1468,8 @@ void pn_process_disp_receiver(pn_transpo
         if (delivery->local_settled) {
           size_t available = pn_delivery_buffer_available(&ssn_state->incoming);
           pn_full_settle(&ssn_state->incoming, delivery);
-          fprintf(stderr, "%zi, %zi\n", available, pn_delivery_buffer_available(&ssn_state->incoming));
-          if (pn_delivery_buffer_available(&ssn_state->incoming) > available) {
+          if (!ssn_state->incoming_window &&
+              pn_delivery_buffer_available(&ssn_state->incoming) > available) {
             pn_post_flow(transport, ssn_state, NULL);
           }
         }
@@ -1453,6 +1510,7 @@ void pn_process_msg_data(pn_transport_t
             }
             pn_post_frame(transport->disp, ssn_state->local_channel, TRANSFER);
             ssn_state->outgoing_transfer_count++;
+            ssn_state->outgoing_window--;
             if (delivery->done) {
               state->sent = true;
               link_state->delivery_count++;
@@ -1541,11 +1599,7 @@ void pn_process_conn_teardown(pn_transpo
   if (endpoint->type == CONNECTION)
   {
     if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
-      pn_init_frame(transport->disp);
-      /*if (condition)
-      // XXX: symbol
-      pn_field(eng, CLOSE_ERROR, pn_value("B([zS])", ERROR, condition, description));*/
-      pn_post_frame(transport->disp, 0, CLOSE);
+      pn_post_close(transport);
       transport->close_sent = true;
     }
   }
@@ -1691,7 +1745,7 @@ bool pn_writable(pn_delivery_t *delivery
   if (!delivery) return false;
 
   pn_link_t *link = delivery->link;
-  return pn_is_sender(link) && pn_is_current(delivery) && link->credit > 0;
+  return pn_is_sender(link) && pn_is_current(delivery) && pn_credit(link) > 0;
 }
 
 bool pn_readable(pn_delivery_t *delivery)

Modified: qpid/proton/trunk/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/proton.c?rev=1331487&r1=1331486&r2=1331487&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/proton.c (original)
+++ qpid/proton/trunk/proton-c/src/proton.c Fri Apr 27 16:12:46 2012
@@ -177,7 +177,7 @@ void server_callback(pn_connector_t *cto
 
     if (pn_updated(delivery)) {
       printf("disposition for %s: %u\n", tagstr, pn_remote_disp(delivery));
-      pn_clear(delivery);
+      pn_settle(delivery);
     }
 
     delivery = pn_work_next(delivery);
@@ -403,7 +403,7 @@ int main(int argc, char **argv)
     mbstowcs(ctx.address, address, 1024);
     if (!pn_connector(drv, host, port, &ctx)) pn_fatal("connector failed\n");
     while (!ctx.done) {
-      pn_driver_wait(drv);
+      pn_driver_wait(drv, -1);
       pn_connector_t *c;
       while ((c = pn_driver_connector(drv))) {
         pn_connector_process(c);
@@ -417,7 +417,7 @@ int main(int argc, char **argv)
     struct server_context ctx = {0};
     if (!pn_listener(drv, host, port, &ctx)) pn_fatal("listener failed\n");
     while (true) {
-      pn_driver_wait(drv);
+      pn_driver_wait(drv, -1);
       pn_listener_t *l;
       pn_connector_t *c;
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Loading...