--- /dev/null
+Bottom: 97a52629c221cba1033082bbd308ecfc4d4b6082
+Top: da0048097eea01b21df587e85b3f7ac44a2582c8
+Author: Sean Hefty <sean.hefty@intel.com>
+Date: 2012-11-20 23:43:13 -0800
+
+Refresh of dsocket
+
+---
+
+diff --git a/src/cma.c b/src/cma.c
+index 91bf108..2c6b032 100755
+--- a/src/cma.c
++++ b/src/cma.c
+@@ -2237,9 +2237,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
+ int ucma_max_qpsize(struct rdma_cm_id *id)
+ {
+ struct cma_id_private *id_priv;
++ int i, max_size = 0;
+
+ id_priv = container_of(id, struct cma_id_private, id);
+- return id_priv->cma_dev->max_qpsize;
++ if (id && id_priv->cma_dev) {
++ max_size = id_priv->cma_dev->max_qpsize;
++ } else {
++ for (i = 0; i < cma_dev_cnt; i++) {
++ if (!max_size || max_size > cma_dev_array[i].max_qpsize)
++ max_size = cma_dev_array[i].max_qpsize;
++ }
++ }
++ return max_size;
+ }
+
+ uint16_t ucma_get_port(struct sockaddr *addr)
+diff --git a/src/rsocket.c b/src/rsocket.c
+index 99e638c..0695d12 100644
+--- a/src/rsocket.c
++++ b/src/rsocket.c
+@@ -55,7 +55,7 @@
+
+ #define RS_OLAP_START_SIZE 2048
+ #define RS_MAX_TRANSFER 65536
+-#define RS_SNDLOWAT 64
++#define RS_SNDLOWAT 2048
+ #define RS_QP_MAX_SIZE 0xFFFE
+ #define RS_QP_CTRL_SIZE 4
+ #define RS_CONN_RETRIES 6
+@@ -63,6 +63,23 @@
+ static struct index_map idm;
+ static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+
++enum {
++ RS_SVC_INSERT,
++ RS_SVC_REMOVE
++};
++
++struct rsocket;
++
++struct rs_svc_msg {
++ uint32_t op;
++ uint32_t status;
++ struct rsocket *rs;
++};
++
++static pthread_t svc_id;
++static int svc_cnt;
++static int svc_fds[2];
++
+ static uint16_t def_iomap_size = 0;
+ static uint16_t def_inline = 64;
+ static uint16_t def_sqsize = 384;
+@@ -165,9 +182,9 @@ enum rs_state {
+ rs_connecting = rs_opening | 0x0040,
+ rs_accepting = rs_opening | 0x0080,
+ rs_connected = 0x0100,
+- rs_connect_wr = 0x0200,
+- rs_connect_rd = 0x0400,
+- rs_connect_rdwr = rs_connected | rs_connect_rd | rs_connect_wr,
++ rs_writable = 0x0200,
++ rs_readable = 0x0400,
++ rs_connect_rdwr = rs_connected | rs_readable | rs_writable,
+ rs_connect_error = 0x0800,
+ rs_disconnected = 0x1000,
+ rs_error = 0x2000,
+@@ -182,18 +199,23 @@ union socket_addr {
+ };
+
+ struct ds_qp {
++ dlist_t list;
++ struct rsocket *rs;
+ struct rdma_cm_id *cm_id;
+- int rbuf_cnt;
++
++ struct ibv_mr *rmr;
++ uint8_t *rbuf;
++
++ struct ibv_mr *smr;
+ uint16_t lid;
+ uint8_t sl;
+ };
+
+ struct ds_dest {
+- union socket_addr addr;
++ union socket_addr addr; /* must be first */
+ struct ds_qp *qp;
+ struct ibv_ah *ah;
+ uint32_t qpn;
+- atomic_t refcnt;
+ };
+
+ struct rsocket {
+@@ -223,12 +245,12 @@ struct rsocket {
+ };
+ /* datagram */
+ struct {
+- struct ds_qp *qp_array;
++ dlist_t qp_list;
+ void *dest_map;
+ struct ds_dest *conn_dest;
+ struct pollfd *fds;
+ nfds_t nfds;
+- int fd;
++ int dsock;
+ int sbytes_needed;
+ };
+ };
+@@ -290,6 +312,62 @@ struct ds_udp_header {
+ };
+
+
++static int rs_svc_run(void *arg)
++{
++ return 0;
++}
++
++static int rs_svc_insert(struct rsocket *rs)
++{
++ struct rs_svc_msg msg;
++ int ret;
++
++ pthread_mutex_lock(&mut);
++ if (!svc_cnt) {
++ ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds);
++ if (ret)
++ goto out;
++
++ ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
++ if (ret) {
++ close(svc_fds[0]);
++ close(svc_fds[1]);
++ ret = ERR(ret);
++ goto out;
++ }
++ }
++
++ msg.op = RS_SVC_INSERT;
++ msg.status = EINVAL;
++ msg.rs = rs;
++ svc_cnt++;
++ write(svc_fds[0], &msg, sizeof msg);
++ read(svc_fds[0], &msg, sizeof msg);
++ ret = ERR(msg.status);
++out:
++ pthread_mutex_unlock(&mut);
++ return ret;
++}
++
++static int rs_svc_remove(struct rsocket *rs)
++{
++ struct rs_svc_msg msg;
++ int ret;
++
++ pthread_mutex_lock(&mut);
++ msg.op = RS_SVC_REMOVE;
++ msg.status = EINVAL;
++ msg.rs = rs;
++ write(svc_fds[0], &msg, sizeof msg);
++ read(svc_fds[0], &msg, sizeof msg);
++ ret = ERR(msg.status);
++ if (!ret && !--svn_cnt)
++ pthread_join(svc_id, NULL);
++
++ pthread_mutex_unlock(&mut);
++ return ret;
++}
++
+ static int rs_value_to_scale(int value, int bits)
+ {
+ return value <= (1 << (bits - 1)) ?
+@@ -390,7 +468,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
+
+ rs->type = type;
+ rs->index = -1;
+- rs->fd = -1;
++ rs->dsock = -1;
+ if (inherited_rs) {
+ rs->sbuf_size = inherited_rs->sbuf_size;
+ rs->rbuf_size = inherited_rs->rbuf_size;
+@@ -418,6 +496,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
+ return rs;
+ }
+
++/*
++ * TODO: Support datagram rsockets
++ */
+ static int rs_set_nonblocking(struct rsocket *rs, long arg)
+ {
+ int ret = 0;
+@@ -500,15 +581,32 @@ static int rs_init_bufs(struct rsocket *rs)
+ return 0;
+ }
+
+-static int rs_create_cq(struct rsocket *rs)
++static int ds_init_bufs(struct ds_qp *qp)
+ {
+- rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
+- if (!rs->cm_id->recv_cq_channel)
++ qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
++ if (!qp->rbuf)
++ return ERR(ENOMEM);
++
++ qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
++ if (!qp->smr)
++ return -1;
++
++ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
++ if (!qp->rmr)
+ return -1;
+
+- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
+- rs->cm_id, rs->cm_id->recv_cq_channel, 0);
+- if (!rs->cm_id->recv_cq)
++ return 0;
++}
++
++static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
++{
++ cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
++ if (!cm_id->recv_cq_channel)
++ return -1;
++
++ cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
++ cm_id, cm_id->recv_cq_channel, 0);
++ if (!cm_id->recv_cq)
+ goto err1;
+
+ if (rs->fd_flags & O_NONBLOCK) {
+@@ -516,21 +614,20 @@ static int rs_create_cq(struct rsocket *rs)
+ goto err2;
+ }
+
+- rs->cm_id->send_cq_channel = rs->cm_id->recv_cq_channel;
+- rs->cm_id->send_cq = rs->cm_id->recv_cq;
++ cm_id->send_cq_channel = cm_id->recv_cq_channel;
++ cm_id->send_cq = cm_id->recv_cq;
+ return 0;
+
+ err2:
+- ibv_destroy_cq(rs->cm_id->recv_cq);
+- rs->cm_id->recv_cq = NULL;
++ ibv_destroy_cq(cm_id->recv_cq);
++ cm_id->recv_cq = NULL;
+ err1:
+- ibv_destroy_comp_channel(rs->cm_id->recv_cq_channel);
+- rs->cm_id->recv_cq_channel = NULL;
++ ibv_destroy_comp_channel(cm_id->recv_cq_channel);
++ cm_id->recv_cq_channel = NULL;
+ return -1;
+ }
+
+-static inline int
+-rs_post_recv(struct rsocket *rs)
++static inline int rs_post_recv(struct rsocket *rs)
+ {
+ struct ibv_recv_wr wr, *bad;
+
+@@ -542,6 +639,23 @@ rs_post_recv(struct rsocket *rs)
+ return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
+ }
+
++static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
++{
++ struct ibv_recv_wr wr, *bad;
++ struct ibv_sge sge;
++
++ sge.addr = (uintptr_t) buf;
++ sge.length = RS_SNDLOWAT;
++ sge.lkey = qp->rmr;
++
++ wr.wr_id = RS_RECV_WR_ID;
++ wr.next = NULL;
++ wr.sg_list = &sge;
++ wr.num_sge = 1;
++
++ return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
++}
++
+ static int rs_create_ep(struct rsocket *rs)
+ {
+ struct ibv_qp_init_attr qp_attr;
+@@ -552,7 +666,7 @@ static int rs_create_ep(struct rsocket *rs)
+ if (ret)
+ return ret;
+
+- ret = rs_create_cq(rs);
++ ret = rs_create_cq(rs, rs->cm_id);
+ if (ret)
+ return ret;
+
+@@ -609,8 +723,71 @@ static void rs_free_iomappings(struct rsocket *rs)
+ }
+ }
+
++static void ds_free_qp(struct ds_qp *qp)
++{
++ if (qp->smr)
++ rdma_dereg_mr(qp->smr);
++
++ if (qp->rbuf) {
++ if (qp->rmr)
++ rdma_dereg_mr(qp->rmr);
++ free(qp->rbuf);
++ }
++
++ if (qp->cm_id) {
++ if (qp->cm_id->qp) {
++ dlist_remove(&qp->list);
++ rdma_destroy_qp(qp->cm_id);
++ }
++ rdma_destroy_id(qp->cm_id);
++ }
++ free(qp);
++}
++
++static void ds_free_qps(struct rsocket *rs)
++{
++ struct ds_qp *qp;
++ dlist_t *entry;
++
++ while (!dlist_empty(&rs->qp_list)) {
++ qp = container_of(rs->qp_list.next, struct ds_qp, list);
++ ds_free_qp(qp);
++ }
++}
++
++static void ds_free(struct rsocket *rs)
++{
++ if (rs->state & (rs_readable | rs_writable))
++ rs_svc_remove(rs);
++
++ if (rs->dsock >= 0)
++ close(rs->dsock);
++
++ if (rs->index >= 0)
++ rs_remove(rs);
++
++ ds_free_qps(rs);
++ if (rs->fds)
++ free(rs->fds);
++
++ if (rs->sbuf)
++ free(rs->sbuf);
++
++ fastlock_destroy(&rs->map_lock);
++ fastlock_destroy(&rs->cq_wait_lock);
++ fastlock_destroy(&rs->cq_lock);
++ fastlock_destroy(&rs->rlock);
++ fastlock_destroy(&rs->slock);
++ free(rs);
++}
++
+ static void rs_free(struct rsocket *rs)
+ {
++ if (rs->type == SOCK_DGRAM) {
++ ds_free(rs);
++ return;
++ }
++
+ if (rs->index >= 0)
+ rs_remove(rs);
+
+@@ -642,11 +819,6 @@ static void rs_free(struct rsocket *rs)
+ rdma_destroy_id(rs->cm_id);
+ }
+
+- if (rs->fd >= 0)
+- close(rs->fd);
+- if (rs->fds)
+- free(rs->fds);
+-
+ fastlock_destroy(&rs->map_lock);
+ fastlock_destroy(&rs->cq_wait_lock);
+ fastlock_destroy(&rs->cq_lock);
+@@ -703,14 +875,15 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+
+ static int ds_init(struct rsocket *rs, int domain)
+ {
+- rs->fd = socket(domain, SOCK_DGRAM, 0);
+- if (rs->fd < 0)
+- return rs->fd;
++ rs->dsock = socket(domain, SOCK_DGRAM, 0);
++ if (rs->dsock < 0)
++ return rs->dsock;
+
+ rs->fds = calloc(1, sizeof *fds);
+ if (!rs->fds)
+ return ERR(ENOMEM);
+ rs->nfds = 1;
++ dlist_init(&rs->qp_list);
+
+ return 0;
+ }
+@@ -743,7 +916,7 @@ int rsocket(int domain, int type, int protocol)
+ if (ret)
+ goto err;
+
+- index = rs->fd;
++ index = rs->dsock;
+ }
+
+ ret = rs_insert(rs, index);
+@@ -768,7 +941,12 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ if (!ret)
+ rs->state = rs_bound;
+ } else {
+- ret = bind(rs->fd, addr, addrlen);
++ ret = bind(rs->dsock, addr, addrlen);
++ if (!ret) {
++ ret = rs_svc_insert(rs);
++ if (!ret)
++ rs->state = rs_readable | rs_writable;
++ }
+ }
+ return ret;
+ }
+@@ -950,7 +1128,33 @@ connected:
+ return ret;
+ }
+
+-static int ds_compare_dest(const void *dst1, const void *dst2)
++static int ds_init_ep(struct rsocket *rs)
++{
++ int ret;
++
++ rs_set_qp_size(rs);
++ if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
++ rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
++ else
++ rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
++
++ rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
++ if (!rs->sbuf)
++ return ERR(ENOMEM);
++
++ rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
++ rs->sbuf_bytes_avail = rs->sbuf_size;
++ rs->sqe_avail = rs->sq_size;
++
++ ret = rs_svc_insert(rs);
++ if (ret)
++ return ret;
++
++ rs->state = rs_readable | rs_writable;
++ return 0;
++}
++
++static int ds_compare_addr(const void *dst1, const void *dst2)
+ {
+ const struct sockaddr *sa1, *sa2;
+ size_t len;
+@@ -963,60 +1167,173 @@ static int ds_compare_dest(const void *dst1, const void *dst2)
+ return memcmp(dst1, dst2, len);
+ }
+
+-/* Caller must hold map_lock around accessing source address */
+-static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr,
+- socklen_t dst_len, socklen_t *src_len)
++static int rs_any_addr(const union socket_addr *addr)
++{
++ if (addr->sa.sa_family == AF_INET) {
++ return (addr->sin.sin_addr == INADDR_ANY ||
++ addr->sin.sin_addr == INADDR_LOOPBACK);
++ } else {
++ return (addr->sin6.sin6_addr == in6addr_any ||
++ addr->sin6.sin6_addr == in6addr_loopback);
++ }
++}
++
++static int ds_get_src_addr(struct rsocket *rs,
++ const struct sockaddr *dest_addr, socklen_t dest_len,
++ union socket_addr *src_addr, socklen_t *src_len)
+ {
+- static union socket_addr src_addr;
+ int sock, ret;
++ uint16_t port;
+
+- sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0);
++ *src_len = sizeof src_addr;
++ ret = getsockname(rs->dsock, &src_addr->sa, src_len);
++ if (ret || !rs_any_addr(src_addr))
++ return ret;
++
++ port = src_addr->sin.sin_port;
++ sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
+ if (sock < 0)
+- return NULL;
++ return sock;
+
+- ret = connect(sock, &dst_addr->sa, dst_len);
++ ret = connect(sock, dest_addr, dest_len);
+ if (ret)
+ goto out;
+
+ *src_len = sizeof src_addr;
+- ret = getsockname(sock, &src_addr.sa, src_len);
+-
++ ret = getsockname(sock, &src_addr->sa, src_len);
++ src_addr->sin.sin_port = port;
+ out:
+ close(sock);
+- return ret ? NULL : &src_addr;
++ return ret;
++}
++
++static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
++ socklen_t addrlen, struct ds_qp **qp)
++{
++ struct ibv_qp_init_attr qp_attr;
++ int ret;
++
++ *qp = calloc(1, sizeof(struct ds_qp));
++ if (!*qp)
++ return ERR(ENOMEM);
++
++ (*qp)->rs = rs;
++ ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
++ if (ret)
++ goto err;
++
++ ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
++ if (ret)
++ goto err;
++
++ ret = ds_init_bufs(*qp);
++ if (ret)
++ goto err;
++
++ ret = rs_create_cq(rs, (*qp)->cm_id);
++ if (ret)
++ goto err;
++
++ memset(&qp_attr, 0, sizeof qp_attr);
++ qp_attr.qp_context = qp;
++ qp_attr.send_cq = rs->cm_id->send_cq;
++ qp_attr.recv_cq = rs->cm_id->recv_cq;
++ qp_attr.qp_type = IBV_QPT_UD;
++ qp_attr.sq_sig_all = 1;
++ qp_attr.cap.max_send_wr = rs->sq_size;
++ qp_attr.cap.max_recv_wr = rs->rq_size;
++ qp_attr.cap.max_send_sge = 2;
++ qp_attr.cap.max_recv_sge = 1;
++ qp_attr.cap.max_inline_data = rs->sq_inline;
++
++ ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
++ if (ret)
++ return ret;
++
++ for (i = 0; i < rs->rq_size; i++) {
++ ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
++ if (ret)
++ goto err;
++ }
++ list_insert_head(&(*qp)->list, &rs->qp_list);
++ return 0;
++err:
++ ds_free_qp(*qp);
++ return ret;
+ }
+
+-static struct ds_qp *ds_get_qp(struct rsocket *rs,
+- const struct sockaddr *src_addr, socklen_t addrlen)
++static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
++ socklen_t addrlen, struct ds_qp **qp)
+ {
+- union socket_addr *addr;
+- socklen_t len;
++ dlist_t *entry;
+
++ for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
++ *qp = container_of(entry, struct ds_qp, list);
++ if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
++ return 0;
++ }
++
++ return ds_create_qp(rs, src_addr, addrlen, qp);
+ }
+
+-static int ds_connect(struct rsocket *rs,
+- const struct sockaddr *dest_addr, socklen_t addrlen)
++static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
++ socklen_t addrlen, struct ds_dest **dest)
+ {
+- struct ds_dest **dest;
++ union socket_addr src_addr;
++ socklen_t src_len;
++ struct ds_qp *qp;
++ int ret = 0;
+
+ fastlock_acquire(&rs->map_lock);
+- dest = tfind(dest_addr, rs->dest_map, ds_compare_dest);
+- if (dest) {
+- rs->conn_dest = *dest;
++ dest = tfind(addr, &rs->dest_map, ds_compare_addr);
++ if (dest)
++ goto out;
++
++ if (rs->state == rs_init) {
++ ret = ds_init_ep(rs);
++ if (ret)
++ goto out;
++ }
++
++ ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
++ if (ret)
++ goto out;
++
++ ret = ds_get_qp(rs, src_addr, src_len, &qp);
++ if (ret)
++ goto out;
++
++ *dest = calloc(1, sizeof(struct ds_dest));
++ if (!*dest) {
++ ret = ERR(ENOMEM);
+ goto out;
+ }
++
++ memcpy(&(*dest)->addr, addr, addrlen);
++ (*dest)->qp = qp;
++ tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
+ out:
+ fastlock_release(&rs->map_lock);
+- return 0;
++ return ret;
+ }
+
+ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ {
+ struct rsocket *rs;
++ int ret;
+
+ rs = idm_at(&idm, socket);
+- memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
+- return rs_do_connect(rs);
++ if (rs->type == SOCK_STREAM) {
++ memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
++ ret = rs_do_connect(rs);
++ } else {
++ fastlock_acquire(&rs->slock);
++ ret = connect(rs->dsock, addr, addrlen);
++ if (!ret)
++ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
++ fastlock_release(&rs->slock);
++ }
++ return ret;
+ }
+
+ static int rs_post_write_msg(struct rsocket *rs,
+@@ -1229,7 +1546,7 @@ static int rs_poll_cq(struct rsocket *rs)
+ rs->state = rs_disconnected;
+ return 0;
+ } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
+- rs->state &= ~rs_connect_rd;
++ rs->state &= ~rs_readable;
+ }
+ break;
+ case RS_OP_WRITE:
+@@ -1409,7 +1726,7 @@ static int ds_can_send(struct rsocket *rs)
+
+ static int rs_conn_can_send(struct rsocket *rs)
+ {
+- return rs_can_send(rs) || !(rs->state & rs_connect_wr);
++ return rs_can_send(rs) || !(rs->state & rs_writable);
+ }
+
+ static int rs_conn_can_send_ctrl(struct rsocket *rs)
+@@ -1424,7 +1741,7 @@ static int rs_have_rdata(struct rsocket *rs)
+
+ static int rs_conn_have_rdata(struct rsocket *rs)
+ {
+- return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
++ return rs_have_rdata(rs) || !(rs->state & rs_readable);
+ }
+
+ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1527,7 +1844,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+ rs->rbuf_bytes_avail += rsize;
+ }
+
+- } while (left && (flags & MSG_WAITALL) && (rs->state & rs_connect_rd));
++ } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable));
+
+ fastlock_release(&rs->rlock);
+ return ret ? ret : len - left;
+@@ -1586,7 +1903,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+ rs_conn_can_send);
+ if (ret)
+ break;
+- if (!(rs->state & rs_connect_wr)) {
++ if (!(rs->state & rs_writable)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+@@ -1764,7 +2081,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+ rs_conn_can_send);
+ if (ret)
+ break;
+- if (!(rs->state & rs_connect_wr)) {
++ if (!(rs->state & rs_writable)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+@@ -1890,7 +2207,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+ rs_conn_can_send);
+ if (ret)
+ break;
+- if (!(rs->state & rs_connect_wr)) {
++ if (!(rs->state & rs_writable)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+@@ -2239,7 +2556,7 @@ int rshutdown(int socket, int how)
+
+ rs = idm_at(&idm, socket);
+ if (how == SHUT_RD) {
+- rs->state &= ~rs_connect_rd;
++ rs->state &= ~rs_readable;
+ return 0;
+ }
+
+@@ -2249,10 +2566,10 @@ int rshutdown(int socket, int how)
+ if (rs->state & rs_connected) {
+ if (how == SHUT_RDWR) {
+ ctrl = RS_CTRL_DISCONNECT;
+- rs->state &= ~(rs_connect_rd | rs_connect_wr);
++ rs->state &= ~(rs_readable | rs_writable);
+ } else {
+- rs->state &= ~rs_connect_wr;
+- ctrl = (rs->state & rs_connect_rd) ?
++ rs->state &= ~rs_writable;
++ ctrl = (rs->state & rs_readable) ?
+ RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
+ }
+ if (!rs->ctrl_avail) {
+@@ -2733,7 +3050,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+ rs_conn_can_send);
+ if (ret)
+ break;
+- if (!(rs->state & rs_connect_wr)) {
++ if (!(rs->state & rs_writable)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }