]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsockets: Add support for MSG_WAITALL rrecv() flag
authorSean Hefty <sean.hefty@intel.com>
Thu, 28 Jun 2012 18:34:38 +0000 (11:34 -0700)
committerSean Hefty <sean.hefty@intel.com>
Thu, 16 Aug 2012 20:01:56 +0000 (13:01 -0700)
Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/rsocket.c

index 996cb2fdc4dce5f3667f06a036fd47cb92496ca2..07c13555cb892a0ced776e8bf21117ff0b0e09f8 100644 (file)
@@ -1077,7 +1077,7 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
               !(rs->state & rs_connected);
 }
 
-static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
+static ssize_t rs_peek(struct rsocket *rs, void **buf, size_t len)
 {
        size_t left = len;
        uint32_t end_size, rsize;
@@ -1097,15 +1097,15 @@ static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
 
                end_size = rs->rbuf_size - rbuf_offset;
                if (rsize > end_size) {
-                       memcpy(buf, &rs->rbuf[rbuf_offset], end_size);
+                       memcpy(*buf, &rs->rbuf[rbuf_offset], end_size);
                        rbuf_offset = 0;
-                       buf += end_size;
+                       *buf += end_size;
                        rsize -= end_size;
                        left -= end_size;
                }
-               memcpy(buf, &rs->rbuf[rbuf_offset], rsize);
+               memcpy(*buf, &rs->rbuf[rbuf_offset], rsize);
                rbuf_offset += rsize;
-               buf += rsize;
+               *buf += rsize;
        }
 
        return len - left;
@@ -1131,43 +1131,48 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                }
        }
        fastlock_acquire(&rs->rlock);
-       if (!rs_have_rdata(rs)) {
-               ret = rs_get_comp(rs, rs_nonblocking(rs, flags), rs_conn_have_rdata);
-               if (ret)
-                       goto out;
-       }
-
-       ret = 0;
-       if (flags & MSG_PEEK) {
-               left = len - rs_peek(rs, buf, len);
-               goto out;
-       }
+       do {
+               if (!rs_have_rdata(rs)) {
+                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+                                         rs_conn_have_rdata);
+                       if (ret)
+                               break;
+               }
 
-       for (; left && rs_have_rdata(rs); left -= rsize) {
-               if (left < rs->rmsg[rs->rmsg_head].data) {
-                       rsize = left;
-                       rs->rmsg[rs->rmsg_head].data -= left;
-               } else {
-                       rs->rseq_no++;
-                       rsize = rs->rmsg[rs->rmsg_head].data;
-                       if (++rs->rmsg_head == rs->rq_size + 1)
-                               rs->rmsg_head = 0;
+               ret = 0;
+               if (flags & MSG_PEEK) {
+                       // fixme or will peek same data in waitall loop
+                       left -= rs_peek(rs, &buf, left);
+                       continue;
                }
 
-               end_size = rs->rbuf_size - rs->rbuf_offset;
-               if (rsize > end_size) {
-                       memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size);
-                       rs->rbuf_offset = 0;
-                       buf += end_size;
-                       rsize -= end_size;
-                       left -= end_size;
+               for (; left && rs_have_rdata(rs); left -= rsize) {
+                       if (left < rs->rmsg[rs->rmsg_head].data) {
+                               rsize = left;
+                               rs->rmsg[rs->rmsg_head].data -= left;
+                       } else {
+                               rs->rseq_no++;
+                               rsize = rs->rmsg[rs->rmsg_head].data;
+                               if (++rs->rmsg_head == rs->rq_size + 1)
+                                       rs->rmsg_head = 0;
+                       }
+
+                       end_size = rs->rbuf_size - rs->rbuf_offset;
+                       if (rsize > end_size) {
+                               memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size);
+                               rs->rbuf_offset = 0;
+                               buf += end_size;
+                               rsize -= end_size;
+                               left -= end_size;
+                       }
+                       memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize);
+                       rs->rbuf_offset += rsize;
+                       buf += rsize;
                }
-               memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize);
-               rs->rbuf_offset += rsize;
-               buf += rsize;
-       }
-       rs->rbuf_bytes_avail += len - left;
-out:
+               rs->rbuf_bytes_avail += len - left; // <- fixme in waitall loop
+
+       } while ((flags & MSG_WAITALL) && (rs->state & rs_connect_rd) && left);
+
        fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
 }