qpid-proton git commit: PROTON-1504: epoll proactor: no PN_LISTENER_ACCEPT events if no FDs

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

qpid-proton git commit: PROTON-1504: epoll proactor: no PN_LISTENER_ACCEPT events if no FDs

aconway-2
Repository: qpid-proton
Updated Branches:
  refs/heads/master 48e75e304 -> e504ce12f


PROTON-1504: epoll proactor: no PN_LISTENER_ACCEPT events if no FDs

Epoll proactor now generates PN_LISTENER_ACCEPT events only if a socket accept()
succeeds. Simplifies the code and makes it consistent with the libuv proactor.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e504ce12
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e504ce12
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e504ce12

Branch: refs/heads/master
Commit: e504ce12f148842e43c8584ff96baf2e984e4d10
Parents: 48e75e3
Author: Alan Conway <[hidden email]>
Authored: Thu Jun 15 17:28:32 2017 -0400
Committer: Alan Conway <[hidden email]>
Committed: Thu Jun 15 17:38:31 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 72 ++++++++++++++++++--------------------
 1 file changed, 34 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e504ce12/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index c4d0c73..a173086 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -528,7 +528,8 @@ struct pn_listener_t {
   pn_record_t *attachments;
   void *listener_context;
   size_t backlog;
-  psocket_t *acceptable, *accepted;
+  int accepted_fd;              /* fd accepted but not yet handled by pn_listener_accept() */
+  psocket_t *accepted;          /* psocket from which we accpeted accepted_fd */
   bool close_dispatched;
   bool armed;
   pn_listener_t *overflow;       /* Next overflowed listener */
@@ -710,7 +711,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
   pc->read_blocked = true;
   pc->write_blocked = true;
   pc->disconnected = false;
-  pc->hog_count = 0;;
+  pc->hog_count = 0;
   pc->batch.next_event = pconnection_batch_next;
 
   if (server) {
@@ -1269,6 +1270,8 @@ pn_listener_t *pn_event_listener(pn_event_t *e) {
 pn_listener_t *pn_listener() {
   pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
   if (l) {
+    l->accepted_fd = -1;
+    l->accepted = NULL;
     l->batch.next_event = listener_batch_next;
     l->collector = pn_collector();
     l->condition = pn_condition();
@@ -1394,7 +1397,6 @@ static void listener_begin_close(pn_listener_t* l) {
       }
     }
     pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
-    l->acceptable = l->accepted = NULL;
   }
 }
 
@@ -1419,6 +1421,24 @@ static void listener_forced_shutdown(pn_listener_t *l) {
   pn_listener_free(l);
 }
 
+/* Accept a connection as part of listener_process(). Called with listener context lock held. */
+static void listener_accept_lh(psocket_t *ps) {
+  pn_listener_t *l = psocket_listener(ps);
+  assert(l->accepted_fd < 0); /* Shouldn't already have an accepted_fd */
+  l->accepted_fd = accept(ps->sockfd, NULL, 0);
+  l->accepted = ps;
+  if (l->accepted_fd >= 0) {
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+  } else {
+    int err = errno;
+    if (err == ENFILE || err == EMFILE) {
+      listener_set_overflow(l);
+    } else {
+      psocket_error(ps, err, "accept");
+    }
+  }
+}
+
 /* Process a listening socket */
 static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
   // TODO: some parallelization of the accept mechanism.
@@ -1430,8 +1450,7 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
       /* Calls listener_begin_close which closes all the listener's sockets */
       psocket_error(ps, errno, "listener epoll");
     } else if (!l->context.closing && events & EPOLLIN) {
-      l->acceptable = ps;
-      pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+      listener_accept_lh(ps);
     }
   } else {
     wake_done(&l->context); // callback accounting
@@ -1472,7 +1491,7 @@ static void listener_done(pn_listener_t *l) {
   } else if (listener_has_event(l)) {
     notify = wake(&l->context);
   } else if (l->overflow == NO_OVERFLOW &&
-             !l->context.closing && !l->armed && !l->acceptable && l->accepted)
+             !l->context.closing && !l->armed && l->accepted_fd < 0 && l->accepted)
   {
     /* Don't rearm until the current socket is accepted */
     rearm(l->accepted->proactor, &l->accepted->epoll_io);
@@ -1507,42 +1526,19 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   // TODO: fuller sanity check on input args
   pconnection_t *pc = new_pconnection_t(l->psockets[0].proactor, c, true, "");
   assert(pc);  // TODO: memory safety
-  int err = 0;
-  int newfd = -1;
-  bool need_done = false;
 
   lock(&l->context.mutex);
+  int fd = l->accepted_fd;
+  l->accepted_fd = -1;
   proactor_add(&pc->context);
-  if (l->context.closing) {
-    err = EBADF;
-  } else if (l->acceptable == 0) {
-    err = EAGAIN;
-  } else {
-    l->accepted = l->acceptable;
-    l->acceptable = 0;
-    newfd = accept(l->accepted->sockfd, NULL, 0);
-    if (newfd < 0) err = errno;
-  }
-  if (err) {
-    lock(&pc->context.mutex);
-    psocket_error(&pc->psocket, err, "accepting from"); /* Always signal error on the connection */
-    pconnection_begin_close(pc);
-    need_done = true;
-    unlock(&pc->context.mutex);
-    if (err == EMFILE || err == ENFILE) { /* Out of FDs does not close the listener */
-      listener_set_overflow(l);
-    } else {
-      psocket_error(l->accepted, err, "accepting from");
-    }
-  } else {                      /* No errors */
-    lock(&pc->context.mutex);
-    configure_socket(newfd);
-    pc->psocket.sockfd = newfd;
-    pconnection_start(pc);
-    unlock(&pc->context.mutex);
-  }
+
+  lock(&pc->context.mutex);
+  configure_socket(fd);
+  pc->psocket.sockfd = fd;
+  pconnection_start(pc);
+  unlock(&pc->context.mutex);
+
   unlock(&l->context.mutex);
-  if (need_done) pconnection_done(pc);
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]