]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsocket: Merge nonblock test with test() routine in rs_process_cq
authorSean Hefty <sean.hefty@intel.com>
Sat, 26 May 2012 07:02:47 +0000 (00:02 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sat, 26 May 2012 07:02:47 +0000 (00:02 -0700)
rs_process_cq takes the following 2 parameters: nonblock and test().
These are used to control the operation of rs_process_cq.  If
nonblock is true, rs_process_cq will exit without arming the CQ or
waiting on a CQ event.  rs_process_cq() will also exit if test()
returns true.  The only difference in the operation is the return
value that rs_process_cq() returns.

We can simplify the code by merging the nonblock test into the
caller's provided test() routine.  The test() routine simply needs
to return the correct value for rs_process_cq().  This will also
simplify fixing an issue where a caller may block indefinitely
in send() or recv() after an rsocket has been disconnected.  That
fix is in a subsequent patch.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/rsocket.c

index 70d0c46efd3ebb20b0c64531e11e00b8430be936..8cffc667e9105aa6655830735f85760600cfa25a 100644 (file)
@@ -871,24 +871,29 @@ static int rs_get_cq_event(struct rsocket *rs)
  * We handle this by using two locks.  The cq_lock protects against polling
  * the CQ and processing completions.  The cq_wait_lock serializes access to
  * waiting on the CQ.
+ *
+ * test() should return 0 on success, < 0 on error to abort, and > 0 to
+ * indicate that processing should continue.
  */
-static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+static int rs_process_cq(struct rsocket *rs, int (*test)(struct rsocket *rs))
 {
-       int ret;
+       int ret, err;
 
        fastlock_acquire(&rs->cq_lock);
        do {
                rs_update_credits(rs);
-               ret = rs_poll_cq(rs);
-               if (test(rs)) {
-                       ret = 0;
+               err = rs_poll_cq(rs);
+               ret = test(rs);
+               if (ret <= 0)
                        break;
-               } else if (ret) {
+
+               if (err) {
+                       ret = err;
                        break;
-               } else if (nonblock) {
-                       ret = ERR(EWOULDBLOCK);
-               } else if (!rs->cq_armed) {
-                       ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+               }
+
+               if (!rs->cq_armed) {
+                       ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
                        rs->cq_armed = 1;
                } else {
                        rs_update_credits(rs);
@@ -937,9 +942,19 @@ static int rs_can_send(struct rsocket *rs)
               (rs->target_sgl[rs->target_sge].length != 0);
 }
 
+static int rs_cq_can_send(struct rsocket *rs)
+{
+       if (rs_can_send(rs))
+               return 0;
+       else if (rs_nonblocking(rs))
+               return ERR(EWOULDBLOCK);
+       else
+               return 1;
+}
+
 static int rs_can_send_ctrl(struct rsocket *rs)
 {
-       return rs->ctrl_avail;
+       return !rs->ctrl_avail;
 }
 
 static int rs_have_rdata(struct rsocket *rs)
@@ -947,9 +962,19 @@ static int rs_have_rdata(struct rsocket *rs)
        return (rs->rmsg_head != rs->rmsg_tail);
 }
 
+static int rs_cq_have_rdata(struct rsocket *rs)
+{
+       if (rs_have_rdata(rs))
+               return 0;
+       else if (rs_nonblocking(rs))
+               return ERR(EWOULDBLOCK);
+       else
+               return 1;
+}
+
 static int rs_all_sends_done(struct rsocket *rs)
 {
-       return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
+       return (rs->sqe_avail + rs->ctrl_avail) != rs->sq_size;
 }
 
 static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
@@ -1007,7 +1032,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
        }
        fastlock_acquire(&rs->rlock);
        if (!rs_have_rdata(rs)) {
-               ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata);
+               ret = rs_process_cq(rs, rs_cq_have_rdata);
                if (ret && errno != ECONNRESET)
                        goto out;
        }
@@ -1111,7 +1136,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size, buf += xfer_size) {
                if (!rs_can_send(rs)) {
-                       ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
+                       ret = rs_process_cq(rs, rs_cq_can_send);
                        if (ret)
                                break;
                }
@@ -1208,7 +1233,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
        fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size) {
                if (!rs_can_send(rs)) {
-                       ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
+                       ret = rs_process_cq(rs, rs_cq_can_send);
                        if (ret)
                                break;
                }
@@ -1537,8 +1562,8 @@ int rshutdown(int socket, int how)
 
        if (rs->state == rs_connected) {
                rs->state = rs_disconnected;
-               if (!rs_can_send_ctrl(rs)) {
-                       ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
+               if (rs_can_send_ctrl(rs)) {
+                       ret = rs_process_cq(rs, rs_can_send_ctrl);
                        if (ret)
                                return ret;
                }
@@ -1549,8 +1574,8 @@ int rshutdown(int socket, int how)
                                    0, 0, 0);
        }
 
-       if (!rs_all_sends_done(rs) && rs->state != rs_error)
-               rs_process_cq(rs, 0, rs_all_sends_done);
+       if (rs_all_sends_done(rs) && rs->state != rs_error)
+               rs_process_cq(rs, rs_all_sends_done);
 
        return 0;
 }