From: Sean Hefty Date: Sat, 26 May 2012 07:02:47 +0000 (-0700) Subject: rsocket: Merge nonblock test with test() routine in rs_process_cq X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=ce0584ae542bdd1e1e0b3f1683c1a62b874af064;p=~shefty%2Flibrdmacm.git rsocket: Merge nonblock test with test() routine in rs_process_cq rs_process_cq takes the following 2 parameters: nonblock and test(). These are used to control the operation of rs_process_cq. If nonblock is true, rs_process_cq will exit without arming the CQ or waiting on a CQ event. rs_process_cq() will also exit if test() returns true. The only difference in the operation is the return value that rs_process_cq() returns. We can simplify the code by merging the nonblock test into the caller's provided test() routine. The test() routine simply needs to return the correct value for rs_process_cq(). This will also simplify fixing an issue where a caller may block indefinitely in send() or recv() after an rsocket has been disconnected. That fix is in a subsequent patch. Signed-off-by: Sean Hefty --- diff --git a/src/rsocket.c b/src/rsocket.c index 70d0c46e..8cffc667 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -871,24 +871,29 @@ static int rs_get_cq_event(struct rsocket *rs) * We handle this by using two locks. The cq_lock protects against polling * the CQ and processing completions. The cq_wait_lock serializes access to * waiting on the CQ. + * + * test() should return 0 on success, < 0 on error to abort, and > 0 to + * indicate that processing should continue. */ -static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) +static int rs_process_cq(struct rsocket *rs, int (*test)(struct rsocket *rs)) { - int ret; + int ret, err; fastlock_acquire(&rs->cq_lock); do { rs_update_credits(rs); - ret = rs_poll_cq(rs); - if (test(rs)) { - ret = 0; + err = rs_poll_cq(rs); + ret = test(rs); + if (ret <= 0) break; - } else if (ret) { + + if (err) { + ret = err; break; - } else if (nonblock) { - ret = ERR(EWOULDBLOCK); - } else if (!rs->cq_armed) { - ibv_req_notify_cq(rs->cm_id->recv_cq, 0); + } + + if (!rs->cq_armed) { + ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0); rs->cq_armed = 1; } else { rs_update_credits(rs); @@ -937,9 +942,19 @@ static int rs_can_send(struct rsocket *rs) (rs->target_sgl[rs->target_sge].length != 0); } +static int rs_cq_can_send(struct rsocket *rs) +{ + if (rs_can_send(rs)) + return 0; + else if (rs_nonblocking(rs)) + return ERR(EWOULDBLOCK); + else + return 1; +} + static int rs_can_send_ctrl(struct rsocket *rs) { - return rs->ctrl_avail; + return !rs->ctrl_avail; } static int rs_have_rdata(struct rsocket *rs) @@ -947,9 +962,19 @@ static int rs_have_rdata(struct rsocket *rs) return (rs->rmsg_head != rs->rmsg_tail); } +static int rs_cq_have_rdata(struct rsocket *rs) +{ + if (rs_have_rdata(rs)) + return 0; + else if (rs_nonblocking(rs)) + return ERR(EWOULDBLOCK); + else + return 1; +} + static int rs_all_sends_done(struct rsocket *rs) { - return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size; + return (rs->sqe_avail + rs->ctrl_avail) != rs->sq_size; } static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len) @@ -1007,7 +1032,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) } fastlock_acquire(&rs->rlock); if (!rs_have_rdata(rs)) { - ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata); + ret = rs_process_cq(rs, rs_cq_have_rdata); if (ret && errno != ECONNRESET) goto out; } @@ -1111,7 +1136,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size, buf += xfer_size) { if (!rs_can_send(rs)) { - ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); + ret = rs_process_cq(rs, rs_cq_can_send); if (ret) break; } @@ -1208,7 +1233,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size) { if (!rs_can_send(rs)) { - ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); + ret = rs_process_cq(rs, rs_cq_can_send); if (ret) break; } @@ -1537,8 +1562,8 @@ int rshutdown(int socket, int how) if (rs->state == rs_connected) { rs->state = rs_disconnected; - if (!rs_can_send_ctrl(rs)) { - ret = rs_process_cq(rs, 0, rs_can_send_ctrl); + if (rs_can_send_ctrl(rs)) { + ret = rs_process_cq(rs, rs_can_send_ctrl); if (ret) return ret; } @@ -1549,8 +1574,8 @@ int rshutdown(int socket, int how) 0, 0, 0); } - if (!rs_all_sends_done(rs) && rs->state != rs_error) - rs_process_cq(rs, 0, rs_all_sends_done); + if (rs_all_sends_done(rs) && rs->state != rs_error) + rs_process_cq(rs, rs_all_sends_done); return 0; }