]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh
authorSean Hefty <sean.hefty@intel.com>
Wed, 28 Nov 2012 00:27:40 +0000 (16:27 -0800)
committerSean Hefty <sean.hefty@intel.com>
Wed, 28 Nov 2012 00:27:40 +0000 (16:27 -0800)
meta
patches/dsocket
patches/refresh-temp [deleted file]

diff --git a/meta b/meta
index 048cd05ec5654daa8c72f4d9fbdf8b5c04ae2cf9..13d2516b29a04742413d902a287003403267fe3a 100644 (file)
--- a/meta
+++ b/meta
@@ -1,9 +1,8 @@
 Version: 1
-Previous: 5109f23e0a314c472f89e0192cbf83611cc5c432
-Head: c57b602d27ad4e916883dce975d819b52d2f902e
+Previous: 220d614ae98908b3705d160664af61230843e90c
+Head: b43272823d9e4e48a7daea9bafb1345043d615cf
 Applied:
-  dsocket: 59a409d3d22b50727262a8e057761e048d59db4b
-  refresh-temp: c57b602d27ad4e916883dce975d819b52d2f902e
+  dsocket: b43272823d9e4e48a7daea9bafb1345043d615cf
 Unapplied:
   test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7
   iom-dbg: 88434072d07f8edc58f454ac954d78bd39441eed
index f870b78b96ec03aaa49401ffe12d9a0086951363..afced889e61eb702f12376d7ebedb59efe691760 100644 (file)
@@ -1,5 +1,5 @@
 Bottom: 92d2aab8615c3d1003fee963587c4078b732e465
-Top:    faabde8e748d27fc0f733e702ed7f3c3902d8f58
+Top:    0ddd3b5ffa7ef87e8179ff9779cbd2f76c69ac89
 Author: Sean Hefty <sean.hefty@intel.com>
 Date:   2012-11-09 10:26:38 -0800
 
@@ -86,7 +86,7 @@ index 91bf108..2c6b032 100755
  
  uint16_t ucma_get_port(struct sockaddr *addr)
 diff --git a/src/rsocket.c b/src/rsocket.c
-index 58fcb8e..a81b8f3 100644
+index 58fcb8e..d7c3163 100644
 --- a/src/rsocket.c
 +++ b/src/rsocket.c
 @@ -46,6 +46,7 @@
@@ -186,7 +186,7 @@ index 58fcb8e..a81b8f3 100644
        rs_connect_error   =                0x0800,
        rs_disconnected    =                0x1000,
        rs_error           =                0x2000,
-@@ -169,68 +205,203 @@ enum rs_state {
+@@ -169,68 +205,219 @@ enum rs_state {
  
  #define RS_OPT_SWAP_SGL 1
  
@@ -197,10 +197,27 @@ index 58fcb8e..a81b8f3 100644
 +      struct sockaddr_in6     sin6;
 +};
 +
++struct ds_header {
++      uint8_t           version;
++      uint8_t           length;
++      uint16_t          port;
++      union {
++              uint32_t  ipv4;
++              struct {
++                      uint32_t flowinfo;
++                      uint8_t  addr[16];
++              } ipv6;
++      } addr;
++};
++
++#define DS_IPV4_HDR_LEN  8
++#define DS_IPV6_HDR_LEN 24
++
 +struct ds_qp {
 +      dlist_t           list;
 +      struct rsocket    *rs;
        struct rdma_cm_id *cm_id;
++      struct ds_header  hdr;
 +
 +      struct ibv_mr     *smr;
 +      struct ibv_mr     *rmr;
@@ -341,7 +358,6 @@ index 58fcb8e..a81b8f3 100644
 +      uint32_t          qpn;  /* upper 8-bits reserved */
  };
  
-+
 +#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
 +
 +static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
@@ -422,7 +438,7 @@ index 58fcb8e..a81b8f3 100644
  static int rs_value_to_scale(int value, int bits)
  {
        return value <= (1 << (bits - 1)) ?
-@@ -306,10 +477,10 @@ out:
+@@ -306,10 +493,10 @@ out:
        pthread_mutex_unlock(&mut);
  }
  
@@ -435,7 +451,7 @@ index 58fcb8e..a81b8f3 100644
        pthread_mutex_unlock(&mut);
        return rs->index;
  }
-@@ -321,7 +492,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -321,7 +508,7 @@ static void rs_remove(struct rsocket *rs)
        pthread_mutex_unlock(&mut);
  }
  
@@ -444,7 +460,7 @@ index 58fcb8e..a81b8f3 100644
  {
        struct rsocket *rs;
  
-@@ -329,7 +500,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -329,7 +516,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        if (!rs)
                return NULL;
  
@@ -456,7 +472,7 @@ index 58fcb8e..a81b8f3 100644
        if (inherited_rs) {
                rs->sbuf_size = inherited_rs->sbuf_size;
                rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -351,7 +526,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -351,7 +542,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        fastlock_init(&rs->rlock);
        fastlock_init(&rs->cq_lock);
        fastlock_init(&rs->cq_wait_lock);
@@ -465,7 +481,7 @@ index 58fcb8e..a81b8f3 100644
        dlist_init(&rs->iomap_list);
        dlist_init(&rs->iomap_queue);
        return rs;
-@@ -359,13 +534,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -359,13 +550,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
  
  static int rs_set_nonblocking(struct rsocket *rs, long arg)
  {
@@ -497,7 +513,7 @@ index 58fcb8e..a81b8f3 100644
  
        return ret;
  }
-@@ -389,17 +578,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -389,17 +594,39 @@ static void rs_set_qp_size(struct rsocket *rs)
                rs->rq_size = 2;
  }
  
@@ -539,7 +555,7 @@ index 58fcb8e..a81b8f3 100644
  
        rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
        if (!rs->smr)
-@@ -409,7 +620,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -409,7 +636,7 @@ static int rs_init_bufs(struct rsocket *rs)
              sizeof(*rs->target_iomap) * rs->target_iomap_size;
        rs->target_buffer_list = malloc(len);
        if (!rs->target_buffer_list)
@@ -548,7 +564,7 @@ index 58fcb8e..a81b8f3 100644
  
        rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
        if (!rs->target_mr)
-@@ -422,7 +633,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -422,7 +649,7 @@ static int rs_init_bufs(struct rsocket *rs)
  
        rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
        if (!rs->rbuf)
@@ -557,7 +573,7 @@ index 58fcb8e..a81b8f3 100644
  
        rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
        if (!rs->rmr)
-@@ -439,15 +650,32 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -439,15 +666,32 @@ static int rs_init_bufs(struct rsocket *rs)
        return 0;
  }
  
@@ -596,7 +612,7 @@ index 58fcb8e..a81b8f3 100644
                goto err1;
  
        if (rs->fd_flags & O_NONBLOCK) {
-@@ -455,21 +683,20 @@ static int rs_create_cq(struct rsocket *rs)
+@@ -455,21 +699,20 @@ static int rs_create_cq(struct rsocket *rs)
                        goto err2;
        }
  
@@ -625,7 +641,7 @@ index 58fcb8e..a81b8f3 100644
  {
        struct ibv_recv_wr wr, *bad;
  
-@@ -481,6 +708,23 @@ rs_post_recv(struct rsocket *rs)
+@@ -481,6 +724,23 @@ rs_post_recv(struct rsocket *rs)
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -649,7 +665,7 @@ index 58fcb8e..a81b8f3 100644
  static int rs_create_ep(struct rsocket *rs)
  {
        struct ibv_qp_init_attr qp_attr;
-@@ -491,7 +735,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -491,7 +751,7 @@ static int rs_create_ep(struct rsocket *rs)
        if (ret)
                return ret;
  
@@ -658,7 +674,7 @@ index 58fcb8e..a81b8f3 100644
        if (ret)
                return ret;
  
-@@ -548,8 +792,73 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -548,8 +808,73 @@ static void rs_free_iomappings(struct rsocket *rs)
        }
  }
  
@@ -732,7 +748,7 @@ index 58fcb8e..a81b8f3 100644
        if (rs->index >= 0)
                rs_remove(rs);
  
-@@ -581,7 +890,7 @@ static void rs_free(struct rsocket *rs)
+@@ -581,7 +906,7 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
  
@@ -741,7 +757,7 @@ index 58fcb8e..a81b8f3 100644
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
-@@ -635,29 +944,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -635,29 +960,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
        rs->sseq_comp = ntohs(conn->credits);
  }
  
@@ -791,11 +807,11 @@ index 58fcb8e..a81b8f3 100644
 +              ret = ds_init(rs, domain);
 +              if (ret)
 +                      goto err;
-+
-+              index = rs->udp_sock;
-+      }
  
 -      ret = rs_insert(rs);
++              index = rs->udp_sock;
++      }
++
 +      ret = rs_insert(rs, index);
        if (ret < 0)
                goto err;
@@ -804,7 +820,7 @@ index 58fcb8e..a81b8f3 100644
        return rs->index;
  
  err:
-@@ -671,9 +1005,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -671,9 +1021,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -826,7 +842,7 @@ index 58fcb8e..a81b8f3 100644
        return ret;
  }
  
-@@ -709,7 +1052,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -709,7 +1068,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -835,7 +851,7 @@ index 58fcb8e..a81b8f3 100644
        if (!new_rs)
                return ERR(ENOMEM);
  
-@@ -717,7 +1060,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -717,7 +1076,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        if (ret)
                goto err;
  
@@ -844,7 +860,7 @@ index 58fcb8e..a81b8f3 100644
        if (ret < 0)
                goto err;
  
-@@ -854,13 +1197,231 @@ connected:
+@@ -854,13 +1213,248 @@ connected:
        return ret;
  }
  
@@ -936,6 +952,22 @@ index 58fcb8e..a81b8f3 100644
 +      return ret;
 +}
 +
++static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
++{
++      if (addr->sa.sa_family == AF_INET) {
++              hdr->version = 4;
++              hdr->length = DS_IPV4_HDR_LEN;
++              hdr->port = addr->sin.sin_port;
++              hdr->addr.ipv4 = addr->sin.sin_addr;
++      } else {
++              hdr->version = 6;
++              hdr->length = DS_IPV6_HDR_LEN;
++              hdr->port = addr->sin6.sin6_port;
++              hdr->addr.ipv6.flowinfo= addr->sin6.sin6_flowinfo;
++              memcpy(&hdr->addr.ipv6.addr, &addr->sin6.sin6_addr, 16);
++      }
++}
++
 +static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
 +                      socklen_t addrlen, struct ds_qp **qp)
 +{
@@ -952,6 +984,7 @@ index 58fcb8e..a81b8f3 100644
 +      if (ret)
 +              goto err;
 +
++      ds_format_hdr(&(*qp)->hdr, src_addr);
 +      ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
 +      if (ret)
 +              goto err;
@@ -1078,22 +1111,21 @@ index 58fcb8e..a81b8f3 100644
  }
  
  static int rs_post_write_msg(struct rsocket *rs,
-@@ -902,6 +1463,25 @@ static int rs_post_write(struct rsocket *rs,
+@@ -902,6 +1496,24 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
  }
  
-+static int ds_post_send(struct rsocket *rs,
-+                      struct ibv_sge *sgl, int nsge,
-+                      uint64_t wr_id, int flags)
++static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
++                      uint64_t wr_id)
 +{
 +      struct ibv_send_wr wr, *bad;
 +
 +      wr.wr_id = wr_id;
 +      wr.next = NULL;
-+      wr.sg_list = sgl;
-+      wr.num_sge = nsge;
++      wr.sg_list = sge;
++      wr.num_sge = 1;
 +      wr.opcode = IBV_WR_SEND;
-+      wr.send_flags = flags;
++      wr.send_flags = (sge.length <= rs->sq_inline) ? IBV_SEND_INLINE : 0;
 +      wr.wr.ud.ah = rs->conn_dest->ah;
 +      wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
 +      wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
@@ -1104,26 +1136,7 @@ index 58fcb8e..a81b8f3 100644
  /*
   * Update target SGE before sending data.  Otherwise the remote side may
   * update the entry before we do.
-@@ -932,6 +1512,18 @@ static int rs_write_data(struct rsocket *rs,
-                                flags, addr, rkey);
- }
-+static int ds_send_data(struct rsocket *rs,
-+                      struct ibv_sge *sgl, int nsge,
-+                      uint32_t length, int flags)
-+{
-+      uint64_t offset;
-+
-+      rs->sqe_avail--;
-+      rs->sbuf_bytes_avail -= length;
-+      offset = sgl->addr - (uintptr_t) rs->sbuf;
-+      return ds_post_send(rs, sgl, nsge, ds_send_wr_id(offset, length), flags);
-+}
-+
- static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
-                          struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
- {
-@@ -1045,7 +1637,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1045,7 +1657,7 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state = rs_disconnected;
                                        return 0;
                                } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
@@ -1132,10 +1145,18 @@ index 58fcb8e..a81b8f3 100644
                                }
                                break;
                        case RS_OP_WRITE:
-@@ -1187,6 +1779,153 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1187,6 +1799,165 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
        return ret;
  }
  
++static int ds_valid_recv(void *buf, uint32_t len)
++{
++      struct ds_header *hdr = (struct ds_header *) buf;
++      return ((len >= sizeof(*hdr)) &&
++              ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
++               (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
++}
++
 +/*
 + * Poll all CQs associated with a datagram rsocket.  We need to drop any
 + * received messages that we do not have room to store.  To limit drops,
@@ -1146,7 +1167,8 @@ index 58fcb8e..a81b8f3 100644
 +static void ds_poll_cqs(struct rsocket *rs)
 +{
 +      struct ds_qp *qp;
-+      struct ds_smsg *msg;
++      struct ds_smsg *smsg;
++      struct ds_rmsg *rmsg;
 +      struct ibv_wc wc;
 +      int ret, cnt;
 +
@@ -1161,9 +1183,14 @@ index 58fcb8e..a81b8f3 100644
 +                      }
 +
 +                      if (ds_wr_is_recv(wc.wr_id)) {
-+                              if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS) {
++                              if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
++                                  ds_valid_recv(qp->rbuf + ds_wr_offset(wc.wr_id),
++                                                wc.byte_len)) {
 +                                      rs->rqe_avail--;
-+                                      rs->dmsg[rs->rmsg_tail].qp = qp;
++                                      rmsg = &rs->dmsg[rs->rmsg_tail];
++                                      rmsg->qp = qp;
++                                      rmsg->offset = ds_wr_offset(wc.wr_id);
++                                      rmsg->length = wc.byte_len;
 +                                      if (++rs->rmsg_tail == rs->rq_size + 1)
 +                                              rs->rmsg_tail = 0;
 +                              } else {
@@ -1171,12 +1198,10 @@ index 58fcb8e..a81b8f3 100644
 +                                                           ds_wr_offset(wc.wr_id));
 +                              }
 +                      } else {
-+                              if (ds_wr_length(wc.wr_id) > rs->sq_inline) {
-+                                      msg = (struct ds_smsg *)
-+                                            (rs->sbuf + ds_wr_offset(wc.wr_id));
-+                                      msg->next = rs->smsg_free;
-+                                      rs->smsg_free = msg;
-+                              }
++                              smsg = (struct ds_smsg *)
++                                     (rs->sbuf + ds_wr_offset(wc.wr_id));
++                              smsg->next = rs->smsg_free;
++                              rs->smsg_free = smsg;
 +                              rs->sqe_avail++;
 +                      }
 +
@@ -1286,13 +1311,18 @@ index 58fcb8e..a81b8f3 100644
  static int rs_nonblocking(struct rsocket *rs, int flags)
  {
        return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
-@@ -1218,9 +1957,14 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1218,9 +1989,19 @@ static int rs_can_send(struct rsocket *rs)
               (rs->target_sgl[rs->target_sge].length != 0);
  }
  
 +static int ds_can_send(struct rsocket *rs)
 +{
-+      return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT);
++      return rs->sqe_avail;
++}
++
++static int ds_all_sends_done(struct rsocket *rs)
++{
++      return rs->sqe_avail == rs->sq_size;
 +}
 +
  static int rs_conn_can_send(struct rsocket *rs)
@@ -1302,7 +1332,7 @@ index 58fcb8e..a81b8f3 100644
  }
  
  static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1235,7 +1979,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1235,7 +2016,7 @@ static int rs_have_rdata(struct rsocket *rs)
  
  static int rs_conn_have_rdata(struct rsocket *rs)
  {
@@ -1311,7 +1341,88 @@ index 58fcb8e..a81b8f3 100644
  }
  
  static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1338,7 +2082,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1244,6 +2025,66 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+              !(rs->state & rs_connected);
+ }
++static void ds_set_src(struct sockaddr *addr, socklen_t *addrlen,
++                     struct ds_header *hdr)
++{
++      union socket_addr sa;
++
++      if (hdr->version == 4) {
++              if (*addrlen > sizeof(sa.sin))
++                      *addrlen = sizeof(sa.sin);
++
++              sa.sin.sin_family = AF_INET;
++              sa.sin.sin_port = hdr->port;
++              sa.sin.sin_addr.s_addr =  hdr->addr.ipv4;
++      } else {
++              if (*addrlen > sizeof(sa.sin6))
++                      *addrlen = sizeof(sa.sin6);
++
++              sa.sin6.sin6_family = AF_INET6;
++              sa.sin6.sin6_port = hdr->port;
++              sa.sin6.sin6_flowinfo = hdr->addr.ipv6.flowinfo;
++              memcpy(&sa.sin6.sin6_addr, &hdr->addr.ipv6.addr, 16);
++              sa.sin6.sin6_scope_id = 0;
++      }
++      memcpy(addr, &sa, *addrlen);
++}
++
++static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
++                         struct sockaddr *src_addr, socklen_t *addrlen)
++{
++      struct ds_rmsg *rmsg;
++      struct ds_header *hdr;
++      int ret;
++
++      if (!(rs->state & rs_readable))
++              return ERR(EINVAL);
++
++      if (!rs_have_rdata(rs)) {
++              ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
++                                rs_have_rdata);
++              if (ret)
++                      return ret;
++      }
++
++      rmsg = &rs->dmsg[rs->rmsg_head];
++      hdr = (struct ds_header *) (rmsg->qp->rbuf + rmsg->offset);
++      if (len > rmsg->length - hdr->length)
++              len = rmsg->length - hdr->length;
++
++      memcpy(buf, (void *) hdr + hdr->length, len);
++      if (addrlen)
++              ds_set_src(src_addr, addrlen, hdr);
++
++      if (!(flags & MSG_PEEK)) {
++              ds_post_recv(rs, rmsg->qp, hdr);
++              if (++rs->rmsg_head == rs->rq_size + 1)
++                      rs->rmsg_head = 0;
++      }
++
++      return len;
++}
++
+ static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
+ {
+       size_t left = len;
+@@ -1289,6 +2130,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+       int ret;
+       rs = idm_at(&idm, socket);
++      if (rs->type == SOCK_DGRAM) {
++              fastlock_acquire(&rs->slock);
++              ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen);
++              fastlock_release(&rs->slock);
++              return ret;
++      }
++
+       if (rs->state & rs_opening) {
+               ret = rs_do_connect(rs);
+               if (ret) {
+@@ -1338,7 +2186,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        rs->rbuf_bytes_avail += rsize;
                }
  
@@ -1320,7 +1431,22 @@ index 58fcb8e..a81b8f3 100644
  
        fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
-@@ -1390,14 +2134,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1349,6 +2197,14 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
+ {
+       int ret;
++      rs = idm_at(&idm, socket);
++      if (rs->type == SOCK_DGRAM) {
++              fastlock_acquire(&rs->slock);
++              ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen);
++              fastlock_release(&rs->slock);
++              return ret;
++      }
++
+       ret = rrecv(socket, buf, len, flags);
+       if (ret > 0 && src_addr)
+               rgetpeername(socket, src_addr, addrlen);
+@@ -1390,14 +2246,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        struct rs_iomap iom;
        int ret;
  
@@ -1337,7 +1463,7 @@ index 58fcb8e..a81b8f3 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1446,10 +2190,84 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1446,10 +2302,75 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        }
  
        rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
@@ -1382,8 +1508,10 @@ index 58fcb8e..a81b8f3 100644
 +
 +static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
 +{
++      struct ds_smsg *msg;
 +      struct ibv_sge sge;
-+      int ret = 0;
++      uint64_t offset;
++      int flags, ret = 0;
 +
 +      if (!rs->conn_dest->ah)
 +              return ds_send_udp(rs, buf, len, flags);
@@ -1394,36 +1522,25 @@ index 58fcb8e..a81b8f3 100644
 +                      return ret;
 +      }
 +
-+      if (len <= rs->sq_inline) {
-+              sge.addr = (uintptr_t) buf;
-+              sge.length = len;
-+              sge.lkey = 0;
-+              ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE);
-+      } else if (len <= rs_sbuf_left(rs)) {
-+              memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len);
-+              rs->ssgl[0].length = len;
-+              ret = ds_send_data(rs, rs->ssgl, 1, len, 0);
-+              if (len < rs_sbuf_left(rs))
-+                      rs->ssgl[0].addr += len;
-+              else
-+                      rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
-+      } else {
-+              rs->ssgl[0].length = rs_sbuf_left(rs);
-+              memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
-+                      rs->ssgl[0].length);
-+              rs->ssgl[1].length = len - rs->ssgl[0].length;
-+              memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
-+              ret = ds_send_data(rs, rs->ssgl, 2, len, 0);
-+              rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
-+      }
++      msg = rs->smsg_free;
++      rs->smsg_free = msg->next;
++      rs->sqe_avail--;
++
++      memcpy((void *) msg, rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length);
++      memcpy((void *) msg + rs->conn_dest->qp->hdr.length, buf, len);
++      sge.addr = (uintptr_t) msg;
++      sge.length = rs->conn_dest->qp->hdr.length + len;
++      sge.lkey = rs->smr->lkey;
++      offset = (uint8_t *) msg - rs->sbuf;
 +
++      ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
 +      return ret ? ret : len;
 +}
 +
  /*
   * We overlap sending the data, by posting a small work request immediately,
   * then increasing the size of the send on each iteration.
-@@ -1463,6 +2281,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1463,6 +2384,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -1437,7 +1554,7 @@ index 58fcb8e..a81b8f3 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1484,7 +2309,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1484,7 +2412,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1446,14 +1563,15 @@ index 58fcb8e..a81b8f3 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1537,10 +2362,26 @@ out:
+@@ -1537,10 +2465,26 @@ out:
  ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
  {
 -      if (dest_addr || addrlen)
 -              return ERR(EISCONN);
 +      struct rsocket *rs;
-+
+-      return rsend(socket, buf, len, flags);
 +      rs = idm_at(&idm, socket);
 +      if (rs->type == SOCK_STREAM) {
 +              if (dest_addr || addrlen)
@@ -1461,8 +1579,7 @@ index 58fcb8e..a81b8f3 100644
 +
 +              return rsend(socket, buf, len, flags);
 +      }
--      return rsend(socket, buf, len, flags);
++
 +      fastlock_acquire(&rs->slock);
 +      if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
 +              ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
@@ -1476,7 +1593,7 @@ index 58fcb8e..a81b8f3 100644
  }
  
  static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1599,7 +2440,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1599,7 +2543,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1485,7 +1602,7 @@ index 58fcb8e..a81b8f3 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1652,7 +2493,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1652,7 +2596,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
        if (msg->msg_control && msg->msg_controllen)
                return ERR(ENOTSUP);
  
@@ -1494,7 +1611,7 @@ index 58fcb8e..a81b8f3 100644
  }
  
  ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1948,7 +2789,7 @@ int rshutdown(int socket, int how)
+@@ -1948,7 +2892,7 @@ int rshutdown(int socket, int how)
  
        rs = idm_at(&idm, socket);
        if (how == SHUT_RD) {
@@ -1503,7 +1620,7 @@ index 58fcb8e..a81b8f3 100644
                return 0;
        }
  
-@@ -1958,10 +2799,10 @@ int rshutdown(int socket, int how)
+@@ -1958,10 +2902,10 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected) {
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
@@ -1517,7 +1634,41 @@ index 58fcb8e..a81b8f3 100644
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
                }
                if (!rs->ctrl_avail) {
-@@ -2017,8 +2858,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -1986,13 +2930,31 @@ int rshutdown(int socket, int how)
+       return 0;
+ }
++static void ds_shutdown(struct rsocket *rs)
++{
++      int ret = 0;
++
++      if (rs->fd_flags & O_NONBLOCK)
++              rs_set_nonblocking(rs, 0);
++
++      rs->state &= ~(rs_readable | rs_writable);
++      ds_process_cqs(rs, 0, ds_all_sends_done);
++
++      if (rs->fd_flags & O_NONBLOCK)
++              rs_set_nonblocking(rs, 1);
++}
++
+ int rclose(int socket)
+ {
+       struct rsocket *rs;
+       rs = idm_at(&idm, socket);
+-      if (rs->state & rs_connected)
+-              rshutdown(socket, SHUT_RDWR);
++      if (rs->type == SOCK_STREAM) {
++              if (rs->state & rs_connected)
++                      rshutdown(socket, SHUT_RDWR);
++      } else {
++              ds_shutdown(rs);
++      }
+       rs_free(rs);
+       return 0;
+@@ -2017,8 +2979,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1532,7 +1683,7 @@ index 58fcb8e..a81b8f3 100644
  }
  
  int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2026,8 +2871,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2026,8 +2992,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1547,7 +1698,7 @@ index 58fcb8e..a81b8f3 100644
  }
  
  int rsetsockopt(int socket, int level, int optname,
-@@ -2039,18 +2888,26 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2039,18 +3009,26 @@ int rsetsockopt(int socket, int level, int optname,
  
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
@@ -1581,7 +1732,7 @@ index 58fcb8e..a81b8f3 100644
                        opt_on = *(int *) optval;
                        break;
                case SO_RCVBUF:
-@@ -2100,9 +2957,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2100,9 +3078,11 @@ int rsetsockopt(int socket, int level, int optname,
                opts = &rs->ipv6_opts;
                switch (optname) {
                case IPV6_V6ONLY:
@@ -1596,7 +1747,7 @@ index 58fcb8e..a81b8f3 100644
                        opt_on = *(int *) optval;
                        break;
                default:
-@@ -2314,7 +3173,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2314,7 +3294,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
        if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
                return ERR(EINVAL);
  
@@ -1605,7 +1756,7 @@ index 58fcb8e..a81b8f3 100644
        if (prot & PROT_WRITE) {
                iomr = rs_get_iomap_mr(rs);
                access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2348,7 +3207,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2348,7 +3328,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
                dlist_insert_tail(&iomr->entry, &rs->iomap_list);
        }
  out:
@@ -1614,7 +1765,7 @@ index 58fcb8e..a81b8f3 100644
        return offset;
  }
  
-@@ -2360,7 +3219,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2360,7 +3340,7 @@ int riounmap(int socket, void *buf, size_t len)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -1623,7 +1774,7 @@ index 58fcb8e..a81b8f3 100644
  
        for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
             entry = entry->next) {
-@@ -2381,7 +3240,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2381,7 +3361,7 @@ int riounmap(int socket, void *buf, size_t len)
        }
        ret = ERR(EINVAL);
  out:
@@ -1632,7 +1783,7 @@ index 58fcb8e..a81b8f3 100644
        return ret;
  }
  
-@@ -2425,7 +3284,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2425,7 +3405,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
                                          rs_conn_can_send);
                        if (ret)
                                break;
diff --git a/patches/refresh-temp b/patches/refresh-temp
deleted file mode 100644 (file)
index 2616d85..0000000
+++ /dev/null
@@ -1,379 +0,0 @@
-Bottom: faabde8e748d27fc0f733e702ed7f3c3902d8f58
-Top:    0ddd3b5ffa7ef87e8179ff9779cbd2f76c69ac89
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-11-27 16:27:39 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index a81b8f3..d7c3163 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -211,10 +211,27 @@ union socket_addr {
-       struct sockaddr_in6     sin6;
- };
-+struct ds_header {
-+      uint8_t           version;
-+      uint8_t           length;
-+      uint16_t          port;
-+      union {
-+              uint32_t  ipv4;
-+              struct {
-+                      uint32_t flowinfo;
-+                      uint8_t  addr[16];
-+              } ipv6;
-+      } addr;
-+};
-+
-+#define DS_IPV4_HDR_LEN  8
-+#define DS_IPV6_HDR_LEN 24
-+
- struct ds_qp {
-       dlist_t           list;
-       struct rsocket    *rs;
-       struct rdma_cm_id *cm_id;
-+      struct ds_header  hdr;
-       struct ibv_mr     *smr;
-       struct ibv_mr     *rmr;
-@@ -324,7 +341,6 @@ struct ds_udp_header {
-       uint32_t          qpn;  /* upper 8-bits reserved */
- };
--
- #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
- static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
-@@ -1285,6 +1301,22 @@ out:
-       return ret;
- }
-+static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
-+{
-+      if (addr->sa.sa_family == AF_INET) {
-+              hdr->version = 4;
-+              hdr->length = DS_IPV4_HDR_LEN;
-+              hdr->port = addr->sin.sin_port;
-+              hdr->addr.ipv4 = addr->sin.sin_addr;
-+      } else {
-+              hdr->version = 6;
-+              hdr->length = DS_IPV6_HDR_LEN;
-+              hdr->port = addr->sin6.sin6_port;
-+              hdr->addr.ipv6.flowinfo= addr->sin6.sin6_flowinfo;
-+              memcpy(&hdr->addr.ipv6.addr, &addr->sin6.sin6_addr, 16);
-+      }
-+}
-+
- static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-                       socklen_t addrlen, struct ds_qp **qp)
- {
-@@ -1301,6 +1333,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-       if (ret)
-               goto err;
-+      ds_format_hdr(&(*qp)->hdr, src_addr);
-       ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
-       if (ret)
-               goto err;
-@@ -1463,18 +1496,17 @@ static int rs_post_write(struct rsocket *rs,
-       return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
- }
--static int ds_post_send(struct rsocket *rs,
--                      struct ibv_sge *sgl, int nsge,
--                      uint64_t wr_id, int flags)
-+static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
-+                      uint64_t wr_id)
- {
-       struct ibv_send_wr wr, *bad;
-       wr.wr_id = wr_id;
-       wr.next = NULL;
--      wr.sg_list = sgl;
--      wr.num_sge = nsge;
-+      wr.sg_list = sge;
-+      wr.num_sge = 1;
-       wr.opcode = IBV_WR_SEND;
--      wr.send_flags = flags;
-+      wr.send_flags = (sge.length <= rs->sq_inline) ? IBV_SEND_INLINE : 0;
-       wr.wr.ud.ah = rs->conn_dest->ah;
-       wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
-       wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
-@@ -1512,18 +1544,6 @@ static int rs_write_data(struct rsocket *rs,
-                                flags, addr, rkey);
- }
--static int ds_send_data(struct rsocket *rs,
--                      struct ibv_sge *sgl, int nsge,
--                      uint32_t length, int flags)
--{
--      uint64_t offset;
--
--      rs->sqe_avail--;
--      rs->sbuf_bytes_avail -= length;
--      offset = sgl->addr - (uintptr_t) rs->sbuf;
--      return ds_post_send(rs, sgl, nsge, ds_send_wr_id(offset, length), flags);
--}
--
- static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
-                          struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
- {
-@@ -1779,6 +1799,14 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
-       return ret;
- }
-+static int ds_valid_recv(void *buf, uint32_t len)
-+{
-+      struct ds_header *hdr = (struct ds_header *) buf;
-+      return ((len >= sizeof(*hdr)) &&
-+              ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
-+               (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
-+}
-+
- /*
-  * Poll all CQs associated with a datagram rsocket.  We need to drop any
-  * received messages that we do not have room to store.  To limit drops,
-@@ -1789,7 +1817,8 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
- static void ds_poll_cqs(struct rsocket *rs)
- {
-       struct ds_qp *qp;
--      struct ds_smsg *msg;
-+      struct ds_smsg *smsg;
-+      struct ds_rmsg *rmsg;
-       struct ibv_wc wc;
-       int ret, cnt;
-@@ -1804,9 +1833,14 @@ static void ds_poll_cqs(struct rsocket *rs)
-                       }
-                       if (ds_wr_is_recv(wc.wr_id)) {
--                              if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS) {
-+                              if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
-+                                  ds_valid_recv(qp->rbuf + ds_wr_offset(wc.wr_id),
-+                                                wc.byte_len)) {
-                                       rs->rqe_avail--;
--                                      rs->dmsg[rs->rmsg_tail].qp = qp;
-+                                      rmsg = &rs->dmsg[rs->rmsg_tail];
-+                                      rmsg->qp = qp;
-+                                      rmsg->offset = ds_wr_offset(wc.wr_id);
-+                                      rmsg->length = wc.byte_len;
-                                       if (++rs->rmsg_tail == rs->rq_size + 1)
-                                               rs->rmsg_tail = 0;
-                               } else {
-@@ -1814,12 +1848,10 @@ static void ds_poll_cqs(struct rsocket *rs)
-                                                            ds_wr_offset(wc.wr_id));
-                               }
-                       } else {
--                              if (ds_wr_length(wc.wr_id) > rs->sq_inline) {
--                                      msg = (struct ds_smsg *)
--                                            (rs->sbuf + ds_wr_offset(wc.wr_id));
--                                      msg->next = rs->smsg_free;
--                                      rs->smsg_free = msg;
--                              }
-+                              smsg = (struct ds_smsg *)
-+                                     (rs->sbuf + ds_wr_offset(wc.wr_id));
-+                              smsg->next = rs->smsg_free;
-+                              rs->smsg_free = smsg;
-                               rs->sqe_avail++;
-                       }
-@@ -1959,7 +1991,12 @@ static int rs_can_send(struct rsocket *rs)
- static int ds_can_send(struct rsocket *rs)
- {
--      return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT);
-+      return rs->sqe_avail;
-+}
-+
-+static int ds_all_sends_done(struct rsocket *rs)
-+{
-+      return rs->sqe_avail == rs->sq_size;
- }
- static int rs_conn_can_send(struct rsocket *rs)
-@@ -1988,6 +2025,66 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
-              !(rs->state & rs_connected);
- }
-+static void ds_set_src(struct sockaddr *addr, socklen_t *addrlen,
-+                     struct ds_header *hdr)
-+{
-+      union socket_addr sa;
-+
-+      if (hdr->version == 4) {
-+              if (*addrlen > sizeof(sa.sin))
-+                      *addrlen = sizeof(sa.sin);
-+
-+              sa.sin.sin_family = AF_INET;
-+              sa.sin.sin_port = hdr->port;
-+              sa.sin.sin_addr.s_addr =  hdr->addr.ipv4;
-+      } else {
-+              if (*addrlen > sizeof(sa.sin6))
-+                      *addrlen = sizeof(sa.sin6);
-+
-+              sa.sin6.sin6_family = AF_INET6;
-+              sa.sin6.sin6_port = hdr->port;
-+              sa.sin6.sin6_flowinfo = hdr->addr.ipv6.flowinfo;
-+              memcpy(&sa.sin6.sin6_addr, &hdr->addr.ipv6.addr, 16);
-+              sa.sin6.sin6_scope_id = 0;
-+      }
-+      memcpy(addr, &sa, *addrlen);
-+}
-+
-+static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
-+                         struct sockaddr *src_addr, socklen_t *addrlen)
-+{
-+      struct ds_rmsg *rmsg;
-+      struct ds_header *hdr;
-+      int ret;
-+
-+      if (!(rs->state & rs_readable))
-+              return ERR(EINVAL);
-+
-+      if (!rs_have_rdata(rs)) {
-+              ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
-+                                rs_have_rdata);
-+              if (ret)
-+                      return ret;
-+      }
-+
-+      rmsg = &rs->dmsg[rs->rmsg_head];
-+      hdr = (struct ds_header *) (rmsg->qp->rbuf + rmsg->offset);
-+      if (len > rmsg->length - hdr->length)
-+              len = rmsg->length - hdr->length;
-+
-+      memcpy(buf, (void *) hdr + hdr->length, len);
-+      if (addrlen)
-+              ds_set_src(src_addr, addrlen, hdr);
-+
-+      if (!(flags & MSG_PEEK)) {
-+              ds_post_recv(rs, rmsg->qp, hdr);
-+              if (++rs->rmsg_head == rs->rq_size + 1)
-+                      rs->rmsg_head = 0;
-+      }
-+
-+      return len;
-+}
-+
- static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
- {
-       size_t left = len;
-@@ -2033,6 +2130,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
-       int ret;
-       rs = idm_at(&idm, socket);
-+      if (rs->type == SOCK_DGRAM) {
-+              fastlock_acquire(&rs->slock);
-+              ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen);
-+              fastlock_release(&rs->slock);
-+              return ret;
-+      }
-+
-       if (rs->state & rs_opening) {
-               ret = rs_do_connect(rs);
-               if (ret) {
-@@ -2093,6 +2197,14 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
- {
-       int ret;
-+      rs = idm_at(&idm, socket);
-+      if (rs->type == SOCK_DGRAM) {
-+              fastlock_acquire(&rs->slock);
-+              ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen);
-+              fastlock_release(&rs->slock);
-+              return ret;
-+      }
-+
-       ret = rrecv(socket, buf, len, flags);
-       if (ret > 0 && src_addr)
-               rgetpeername(socket, src_addr, addrlen);
-@@ -2230,8 +2342,10 @@ static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int
- static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
- {
-+      struct ds_smsg *msg;
-       struct ibv_sge sge;
--      int ret = 0;
-+      uint64_t offset;
-+      int flags, ret = 0;
-       if (!rs->conn_dest->ah)
-               return ds_send_udp(rs, buf, len, flags);
-@@ -2242,29 +2356,18 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
-                       return ret;
-       }
--      if (len <= rs->sq_inline) {
--              sge.addr = (uintptr_t) buf;
--              sge.length = len;
--              sge.lkey = 0;
--              ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE);
--      } else if (len <= rs_sbuf_left(rs)) {
--              memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len);
--              rs->ssgl[0].length = len;
--              ret = ds_send_data(rs, rs->ssgl, 1, len, 0);
--              if (len < rs_sbuf_left(rs))
--                      rs->ssgl[0].addr += len;
--              else
--                      rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
--      } else {
--              rs->ssgl[0].length = rs_sbuf_left(rs);
--              memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
--                      rs->ssgl[0].length);
--              rs->ssgl[1].length = len - rs->ssgl[0].length;
--              memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
--              ret = ds_send_data(rs, rs->ssgl, 2, len, 0);
--              rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
--      }
-+      msg = rs->smsg_free;
-+      rs->smsg_free = msg->next;
-+      rs->sqe_avail--;
-+
-+      memcpy((void *) msg, rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length);
-+      memcpy((void *) msg + rs->conn_dest->qp->hdr.length, buf, len);
-+      sge.addr = (uintptr_t) msg;
-+      sge.length = rs->conn_dest->qp->hdr.length + len;
-+      sge.lkey = rs->smr->lkey;
-+      offset = (uint8_t *) msg - rs->sbuf;
-+      ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
-       return ret ? ret : len;
- }
-@@ -2827,13 +2930,31 @@ int rshutdown(int socket, int how)
-       return 0;
- }
-+static void ds_shutdown(struct rsocket *rs)
-+{
-+      int ret = 0;
-+
-+      if (rs->fd_flags & O_NONBLOCK)
-+              rs_set_nonblocking(rs, 0);
-+
-+      rs->state &= ~(rs_readable | rs_writable);
-+      ds_process_cqs(rs, 0, ds_all_sends_done);
-+
-+      if (rs->fd_flags & O_NONBLOCK)
-+              rs_set_nonblocking(rs, 1);
-+}
-+
- int rclose(int socket)
- {
-       struct rsocket *rs;
-       rs = idm_at(&idm, socket);
--      if (rs->state & rs_connected)
--              rshutdown(socket, SHUT_RDWR);
-+      if (rs->type == SOCK_STREAM) {
-+              if (rs->state & rs_connected)
-+                      rshutdown(socket, SHUT_RDWR);
-+      } else {
-+              ds_shutdown(rs);
-+      }
-       rs_free(rs);
-       return 0;