(rs->target_sgl[rs->target_sge].length != 0);
}
+static int rs_cq_can_send(struct rsocket *rs)
+{
+ if (rs_can_send(rs))
+ return 0;
+ else if (rs_nonblocking(rs))
+ return ERR(EWOULDBLOCK);
+ else
+ return 1;
+}
+
static int rs_can_send_ctrl(struct rsocket *rs)
{
return !rs->ctrl_avail;
return (rs->rmsg_head != rs->rmsg_tail);
}
+static int rs_cq_have_rdata(struct rsocket *rs)
+{
+ if (rs_have_rdata(rs))
+ return 0;
+ else if (rs_nonblocking(rs))
+ return ERR(EWOULDBLOCK);
+ else
+ return 1;
+}
+
static int rs_all_sends_done(struct rsocket *rs)
{
- return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
+ return (rs->sqe_avail + rs->ctrl_avail) != rs->sq_size;
}
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
}
fastlock_acquire(&rs->rlock);
if (!rs_have_rdata(rs)) {
- ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata);
+ ret = rs_process_cq(rs, rs_cq_have_rdata);
if (ret && errno != ECONNRESET)
goto out;
}
fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
- ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
+ ret = rs_process_cq(rs, rs_cq_can_send);
if (ret)
break;
}
fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
- ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
+ ret = rs_process_cq(rs, rs_cq_can_send);
if (ret)
break;
}
0, 0, 0);
}
- if (!rs_all_sends_done(rs) && rs->state != rs_error)
- rs_process_cq(rs, 0, rs_all_sends_done);
+ if (rs_all_sends_done(rs) && rs->state != rs_error)
+ rs_process_cq(rs, rs_all_sends_done);
return 0;
}