(rs->target_sgl[rs->target_sge].length != 0);
}
+static int rs_conn_can_send(struct rsocket *rs)
+{
+ return rs_can_send(rs) || (rs->state != rs_connected);
+}
+
static int rs_can_send_ctrl(struct rsocket *rs)
{
return rs->ctrl_avail;
return (rs->rmsg_head != rs->rmsg_tail);
}
+static int rs_conn_have_rdata(struct rsocket *rs)
+{
+ return rs_have_rdata(rs) || (rs->state != rs_connected);
+}
+
static int rs_all_sends_done(struct rsocket *rs)
{
return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
}
fastlock_acquire(&rs->rlock);
if (!rs_have_rdata(rs)) {
- ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata);
+ ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_conn_have_rdata);
if (ret && errno != ECONNRESET)
goto out;
}
fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
- ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
+ ret = rs_process_cq(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
if (ret)
break;
+ if (rs->state != rs_connected) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
}
if (olen < left) {
fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
- ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
+ ret = rs_process_cq(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
if (ret)
break;
+ if (rs->state != rs_connected) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
}
if (olen < left) {