* We handle this by using two locks. The cq_lock protects against polling
* the CQ and processing completions. The cq_wait_lock serializes access to
* waiting on the CQ.
+ *
+ * test() should return 0 on success, < 0 on error to abort, and > 0 to
+ * indicate that processing should continue.
*/
-static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+static int rs_process_cq(struct rsocket *rs, int (*test)(struct rsocket *rs))
{
- int ret;
+ int ret, err;
fastlock_acquire(&rs->cq_lock);
do {
rs_update_credits(rs);
- ret = rs_poll_cq(rs);
- if (test(rs)) {
- ret = 0;
+ err = rs_poll_cq(rs);
+ ret = test(rs);
+ if (ret <= 0)
break;
- } else if (ret) {
+
+ if (err) {
+ ret = err;
break;
- } else if (nonblock) {
- ret = ERR(EWOULDBLOCK);
- } else if (!rs->cq_armed) {
- ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+ }
+
+ if (!rs->cq_armed) {
+ ret = ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
rs->cq_armed = 1;
} else {
rs_update_credits(rs);
static int rs_can_send_ctrl(struct rsocket *rs)
{
- return rs->ctrl_avail;
+ return !rs->ctrl_avail;
}
static int rs_have_rdata(struct rsocket *rs)
if (rs->state == rs_connected) {
rs->state = rs_disconnected;
- if (!rs_can_send_ctrl(rs)) {
- ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
+ if (rs_can_send_ctrl(rs)) {
+ ret = rs_process_cq(rs, rs_can_send_ctrl);
if (ret)
return ret;
}