!(rs->state & rs_connected);
}
-static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
+static ssize_t rs_peek(struct rsocket *rs, void **buf, size_t len)
{
size_t left = len;
uint32_t end_size, rsize;
end_size = rs->rbuf_size - rbuf_offset;
if (rsize > end_size) {
- memcpy(buf, &rs->rbuf[rbuf_offset], end_size);
+ memcpy(*buf, &rs->rbuf[rbuf_offset], end_size);
rbuf_offset = 0;
- buf += end_size;
+ *buf += end_size;
rsize -= end_size;
left -= end_size;
}
- memcpy(buf, &rs->rbuf[rbuf_offset], rsize);
+ memcpy(*buf, &rs->rbuf[rbuf_offset], rsize);
rbuf_offset += rsize;
- buf += rsize;
+ *buf += rsize;
}
return len - left;
}
}
fastlock_acquire(&rs->rlock);
- if (!rs_have_rdata(rs)) {
- ret = rs_get_comp(rs, rs_nonblocking(rs, flags), rs_conn_have_rdata);
- if (ret)
- goto out;
- }
-
- ret = 0;
- if (flags & MSG_PEEK) {
- left = len - rs_peek(rs, buf, len);
- goto out;
- }
+ do {
+ if (!rs_have_rdata(rs)) {
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_conn_have_rdata);
+ if (ret)
+ break;
+ }
- for (; left && rs_have_rdata(rs); left -= rsize) {
- if (left < rs->rmsg[rs->rmsg_head].data) {
- rsize = left;
- rs->rmsg[rs->rmsg_head].data -= left;
- } else {
- rs->rseq_no++;
- rsize = rs->rmsg[rs->rmsg_head].data;
- if (++rs->rmsg_head == rs->rq_size + 1)
- rs->rmsg_head = 0;
+ ret = 0;
+ if (flags & MSG_PEEK) {
+ // fixme or will peek same data in waitall loop
+ left -= rs_peek(rs, &buf, left);
+ continue;
}
- end_size = rs->rbuf_size - rs->rbuf_offset;
- if (rsize > end_size) {
- memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size);
- rs->rbuf_offset = 0;
- buf += end_size;
- rsize -= end_size;
- left -= end_size;
+ for (; left && rs_have_rdata(rs); left -= rsize) {
+ if (left < rs->rmsg[rs->rmsg_head].data) {
+ rsize = left;
+ rs->rmsg[rs->rmsg_head].data -= left;
+ } else {
+ rs->rseq_no++;
+ rsize = rs->rmsg[rs->rmsg_head].data;
+ if (++rs->rmsg_head == rs->rq_size + 1)
+ rs->rmsg_head = 0;
+ }
+
+ end_size = rs->rbuf_size - rs->rbuf_offset;
+ if (rsize > end_size) {
+ memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size);
+ rs->rbuf_offset = 0;
+ buf += end_size;
+ rsize -= end_size;
+ left -= end_size;
+ }
+ memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize);
+ rs->rbuf_offset += rsize;
+ buf += rsize;
}
- memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize);
- rs->rbuf_offset += rsize;
- buf += rsize;
- }
- rs->rbuf_bytes_avail += len - left;
-out:
+ rs->rbuf_bytes_avail += len - left; // <- fixme in waitall loop
+
+ } while ((flags & MSG_WAITALL) && (rs->state & rs_connect_rd) && left);
+
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
}