From aa1793e1b70c856f450fb173bf32cfa47f0f7690 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Thu, 28 Jun 2012 11:34:38 -0700 Subject: [PATCH] rsockets: Add support for MSG_WAITALL rrecv() flag Signed-off-by: Sean Hefty --- src/rsocket.c | 81 +++++++++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 38 deletions(-) diff --git a/src/rsocket.c b/src/rsocket.c index 996cb2fd..07c13555 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -1077,7 +1077,7 @@ static int rs_conn_all_sends_done(struct rsocket *rs) !(rs->state & rs_connected); } -static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len) +static ssize_t rs_peek(struct rsocket *rs, void **buf, size_t len) { size_t left = len; uint32_t end_size, rsize; @@ -1097,15 +1097,15 @@ static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len) end_size = rs->rbuf_size - rbuf_offset; if (rsize > end_size) { - memcpy(buf, &rs->rbuf[rbuf_offset], end_size); + memcpy(*buf, &rs->rbuf[rbuf_offset], end_size); rbuf_offset = 0; - buf += end_size; + *buf += end_size; rsize -= end_size; left -= end_size; } - memcpy(buf, &rs->rbuf[rbuf_offset], rsize); + memcpy(*buf, &rs->rbuf[rbuf_offset], rsize); rbuf_offset += rsize; - buf += rsize; + *buf += rsize; } return len - left; @@ -1131,43 +1131,48 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) } } fastlock_acquire(&rs->rlock); - if (!rs_have_rdata(rs)) { - ret = rs_get_comp(rs, rs_nonblocking(rs, flags), rs_conn_have_rdata); - if (ret) - goto out; - } - - ret = 0; - if (flags & MSG_PEEK) { - left = len - rs_peek(rs, buf, len); - goto out; - } + do { + if (!rs_have_rdata(rs)) { + ret = rs_get_comp(rs, rs_nonblocking(rs, flags), + rs_conn_have_rdata); + if (ret) + break; + } - for (; left && rs_have_rdata(rs); left -= rsize) { - if (left < rs->rmsg[rs->rmsg_head].data) { - rsize = left; - rs->rmsg[rs->rmsg_head].data -= left; - } else { - rs->rseq_no++; - rsize = rs->rmsg[rs->rmsg_head].data; - if (++rs->rmsg_head == rs->rq_size + 1) - rs->rmsg_head = 0; + ret = 0; + if (flags & MSG_PEEK) { + // fixme or will peek same data in waitall loop + left -= rs_peek(rs, &buf, left); + continue; } - end_size = rs->rbuf_size - rs->rbuf_offset; - if (rsize > end_size) { - memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size); - rs->rbuf_offset = 0; - buf += end_size; - rsize -= end_size; - left -= end_size; + for (; left && rs_have_rdata(rs); left -= rsize) { + if (left < rs->rmsg[rs->rmsg_head].data) { + rsize = left; + rs->rmsg[rs->rmsg_head].data -= left; + } else { + rs->rseq_no++; + rsize = rs->rmsg[rs->rmsg_head].data; + if (++rs->rmsg_head == rs->rq_size + 1) + rs->rmsg_head = 0; + } + + end_size = rs->rbuf_size - rs->rbuf_offset; + if (rsize > end_size) { + memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size); + rs->rbuf_offset = 0; + buf += end_size; + rsize -= end_size; + left -= end_size; + } + memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize); + rs->rbuf_offset += rsize; + buf += rsize; } - memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize); - rs->rbuf_offset += rsize; - buf += rsize; - } - rs->rbuf_bytes_avail += len - left; -out: + rs->rbuf_bytes_avail += len - left; // <- fixme in waitall loop + + } while ((flags & MSG_WAITALL) && (rs->state & rs_connect_rd) && left); + fastlock_release(&rs->rlock); return ret ? ret : len - left; } -- 2.45.2