Bottom: 92d2aab8615c3d1003fee963587c4078b732e465
-Top: 0ddd3b5ffa7ef87e8179ff9779cbd2f76c69ac89
+Top: bd15dfc9adaf449efbd6fcccaac92ea3ed7ad81b
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-11-09 10:26:38 -0800
uint16_t ucma_get_port(struct sockaddr *addr)
diff --git a/src/rsocket.c b/src/rsocket.c
-index 58fcb8e..d7c3163 100644
+index 58fcb8e..07cf31d 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -46,6 +46,7 @@
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
-@@ -63,6 +64,23 @@
+@@ -63,6 +64,26 @@
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+};
+
+static pthread_t svc_id;
++static int svc_sock[2];
+static int svc_cnt;
-+static int svc_fds[2];
++static int svc_size;
++static struct rsocket **svc_rss;
++static struct pollfd *svc_fds;
+
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
-@@ -99,6 +117,14 @@ enum {
+@@ -99,6 +120,14 @@ enum {
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
#define rs_msg_op(imm_data) (imm_data >> 29)
#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
enum {
RS_CTRL_DISCONNECT,
-@@ -110,6 +136,18 @@ struct rs_msg {
+@@ -110,6 +139,18 @@ struct rs_msg {
uint32_t data;
};
struct rs_sge {
uint64_t addr;
uint32_t key;
-@@ -144,8 +182,6 @@ struct rs_conn_data {
+@@ -144,8 +185,6 @@ struct rs_conn_data {
struct rs_sge data_buf;
};
/*
* rsocket states are ordered as passive, connecting, connected, disconnected.
*/
-@@ -159,9 +195,9 @@ enum rs_state {
+@@ -159,9 +198,9 @@ enum rs_state {
rs_connecting = rs_opening | 0x0040,
rs_accepting = rs_opening | 0x0080,
rs_connected = 0x0100,
rs_connect_error = 0x0800,
rs_disconnected = 0x1000,
rs_error = 0x2000,
-@@ -169,68 +205,219 @@ enum rs_state {
+@@ -169,68 +208,349 @@ enum rs_state {
#define RS_OPT_SWAP_SGL 1
+#define DS_IPV4_HDR_LEN 8
+#define DS_IPV6_HDR_LEN 24
+
++struct ds_dest {
++ union socket_addr addr; /* must be first */
++ struct ds_qp *qp;
++ struct ibv_ah *ah;
++ uint32_t qpn;
++};
++
+struct ds_qp {
+ dlist_t list;
+ struct rsocket *rs;
struct rdma_cm_id *cm_id;
+ struct ds_header hdr;
++ struct ds_dest dest;
+
+ struct ibv_mr *smr;
+ struct ibv_mr *rmr;
+ int cq_armed;
+};
+
-+struct ds_dest {
-+ union socket_addr addr; /* must be first */
-+ struct ds_qp *qp;
-+ struct ibv_ah *ah;
-+ uint32_t qpn;
-+};
-+
+struct rsocket {
+ int type;
+ int index;
- uint32_t rbuf_size;
- struct ibv_mr *rmr;
- uint8_t *rbuf;
-+#define DS_UDP_TAG 0x5555555555555555ULL
++#define DS_UDP_TAG 0x55555555
- uint32_t sbuf_size;
- struct ibv_mr *smr;
- struct ibv_sge ssgl[2];
- uint8_t *sbuf;
+struct ds_udp_header {
-+ uint64_t tag;
++ uint32_t tag;
+ uint8_t version;
-+ uint8_t reserved[3];
-+ uint32_t qpn; /* upper 8-bits reserved */
++ uint8_t op;
++ uint8_t length;
++ uint8_t reserved;
++ uint32_t qpn; /* lower 8-bits reserved */
++ union {
++ uint32_t ipv4;
++ struct {
++ uint8_t addr[16];
++ } ipv6;
++ } addr;
};
+#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
+ }
+}
+
++static int rs_svc_grow_sets(void)
++{
++ struct rsocket **rss;
++ struct pollfd *fds;
++ void *set;
++
++ set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
++ if (!set)
++ return ENOMEM;
++
++ svc_size += 2;
++ rss = set;
++ fds = set + sizeof(*rss) * svc_size;
++ if (svc_cnt) {
++ memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
++ memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
++ }
++
++ free(svc_rss);
++ free(svc_fds);
++ svc_rss = rss;
++ svc_fds = fds;
++ return 0;
++}
++
++/*
++ * Index 0 is reserved for the service's communication socket.
++ */
++static int rs_svc_add_rs(struct rsocket *rs)
++{
++ int ret;
++
++ if (svc_cnt >= svc_size - 1) {
++ ret = rs_svc_grow_sets();
++ if (ret)
++ return ret;
++ }
++
++ svc_rss[++svc_cnt] = rs;
++ svc_fds[svc_cnt].fd = rs->udp_sock;
++ svc_fds[svc_cnt].events = POLLIN;
++ svc_fds[svc_cnt].revents = 0;
++ return 0;
++}
++
++static int rs_svc_rm_rs(struct rsocket *rs)
++{
++ int i;
++
++ for (i = 1; i <= svc_cnt; i++) {
++ if (svc_rss[i] == rs) {
++ svc_cnt--;
++ svc_rss[i] = svc_rss[svc_cnt];
++ svc_fds[i] = svc_fds[svc_cnt];
++ return 0;
++ }
++ }
++ return EBADF;
++}
++
++static void rs_svc_process_sock(void)
++{
++ struct rs_svc_msg msg;
++
++ read(svc_sock[1], &msg, sizeof msg);
++ switch (msg.op) {
++ case RS_SVC_INSERT:
++ msg.status = rs_svc_add_rs(msg.rs);
++ break;
++ case RS_SVC_REMOVE:
++ msg.status = rs_svc_rm_rs(msg.rs);
++ break;
++ default:
++ msg.status = ENOTSUP;
++ break;
++ }
++ write(svc_sock[1], &msg, sizeof msg);
++}
++
++static void rs_svc_process_rs(struct rsocket *rs)
++{
++
++}
++
+static int rs_svc_run(void *arg)
+{
++ struct rs_svc_msg msg;
++ int i, ret;
++
++ ret = rs_svc_grow_sets();
++ if (ret) {
++ msg.status = ret;
++ write(svc_sock[1] &msg, sizeof msg);
++ return ret;
++ }
++
++ svc_fds[0].fd = svc_sock[1];
++ svc_fds[0].events = POLLIN;
++ do {
++ for (i = 0; i <= svc_cnt; i++)
++ svc_fds[i].revents = 0;
++
++ poll(svc_fds, svc_cnt + 1, -1);
++ if (svc_fds[0].revents)
++ rs_svc_process_sock();
++
++ for (i = 1; i <= svc_cnt; i++) {
++ if (svc_fds[i].revents)
++ rs_svc_process_rs(svc_rss[i]);
++ }
++ } while (svc_cnt > 1);
++
+ return 0;
+}
+
+
+ pthread_mutex_lock(&mut);
+ if (!svc_cnt) {
-+ ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds);
++ ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_sock);
+ if (ret)
-+ goto out;
++ goto err1;
+
+ ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
+ if (ret) {
-+ close(svc_fds[0]);
-+ close(svc_fds[1]);
+ ret = ERR(ret);
-+ goto out;
++ goto err2;
+ }
+ }
+
+ msg.op = RS_SVC_INSERT;
+ msg.status = EINVAL;
+ msg.rs = rs;
-+ svc_cnt++;
-+ write(svc_fds[0], &msg, sizeof msg);
-+ read(svc_fds[0], &msg, sizeof msg);
++ write(svc_sock[0], &msg, sizeof msg);
++ read(svc_sock[0], &msg, sizeof msg);
+ ret = ERR(msg.status);
-+out:
++ if (ret && !svn_cnt)
++ goto err3;
++
++ pthread_mutex_unlock(&mut);
++ return ret;
++
++err3:
++ pthread_join(svc_id, NULL);
++err2:
++ close(svc_sock[0]);
++ close(svc_sock[1]);
++err1:
+ pthread_mutex_unlock(&mut);
+ return ret;
+}
+ msg.op = RS_SVC_REMOVE;
+ msg.status = EINVAL;
+ msg.rs = rs;
-+ write(svc_fds[0], &msg, sizeof msg);
-+ read(svc_fds[0], &msg, sizeof msg);
++ write(svc_sock[0], &msg, sizeof msg);
++ read(svc_sock[0], &msg, sizeof msg);
+ ret = ERR(msg.status);
-+ if (!ret && !--svn_cnt)
++ if (!svn_cnt) {
+ pthread_join(svc_id, NULL);
++ close(svc_sock[0]);
++ close(svc_sock[1]);
++ }
+
+ pthread_mutex_unlock(&mut);
+ return ret;
static int rs_value_to_scale(int value, int bits)
{
return value <= (1 << (bits - 1)) ?
-@@ -306,10 +493,10 @@ out:
+@@ -306,10 +626,10 @@ out:
pthread_mutex_unlock(&mut);
}
pthread_mutex_unlock(&mut);
return rs->index;
}
-@@ -321,7 +508,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -321,7 +641,7 @@ static void rs_remove(struct rsocket *rs)
pthread_mutex_unlock(&mut);
}
{
struct rsocket *rs;
-@@ -329,7 +516,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -329,7 +649,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
if (!rs)
return NULL;
if (inherited_rs) {
rs->sbuf_size = inherited_rs->sbuf_size;
rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -351,7 +542,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -351,7 +675,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
fastlock_init(&rs->rlock);
fastlock_init(&rs->cq_lock);
fastlock_init(&rs->cq_wait_lock);
dlist_init(&rs->iomap_list);
dlist_init(&rs->iomap_queue);
return rs;
-@@ -359,13 +550,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -359,13 +683,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
static int rs_set_nonblocking(struct rsocket *rs, long arg)
{
return ret;
}
-@@ -389,17 +594,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -389,17 +727,39 @@ static void rs_set_qp_size(struct rsocket *rs)
rs->rq_size = 2;
}
rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
if (!rs->smr)
-@@ -409,7 +636,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -409,7 +769,7 @@ static int rs_init_bufs(struct rsocket *rs)
sizeof(*rs->target_iomap) * rs->target_iomap_size;
rs->target_buffer_list = malloc(len);
if (!rs->target_buffer_list)
rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
if (!rs->target_mr)
-@@ -422,7 +649,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -422,7 +782,7 @@ static int rs_init_bufs(struct rsocket *rs)
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
if (!rs->rmr)
-@@ -439,15 +666,32 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -439,15 +799,32 @@ static int rs_init_bufs(struct rsocket *rs)
return 0;
}
+
+ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
+ if (!qp->rmr)
- return -1;
-
-- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
-- rs->cm_id, rs->cm_id->recv_cq_channel, 0);
-- if (!rs->cm_id->recv_cq)
++ return -1;
++
+ return 0;
+}
+
+{
+ cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
+ if (!cm_id->recv_cq_channel)
-+ return -1;
-+
+ return -1;
+
+- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
+- rs->cm_id, rs->cm_id->recv_cq_channel, 0);
+- if (!rs->cm_id->recv_cq)
+ cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
+ cm_id, cm_id->recv_cq_channel, 0);
+ if (!cm_id->recv_cq)
goto err1;
if (rs->fd_flags & O_NONBLOCK) {
-@@ -455,21 +699,20 @@ static int rs_create_cq(struct rsocket *rs)
+@@ -455,21 +832,20 @@ static int rs_create_cq(struct rsocket *rs)
goto err2;
}
{
struct ibv_recv_wr wr, *bad;
-@@ -481,6 +724,23 @@ rs_post_recv(struct rsocket *rs)
+@@ -481,6 +857,23 @@ rs_post_recv(struct rsocket *rs)
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
static int rs_create_ep(struct rsocket *rs)
{
struct ibv_qp_init_attr qp_attr;
-@@ -491,7 +751,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -491,7 +884,7 @@ static int rs_create_ep(struct rsocket *rs)
if (ret)
return ret;
if (ret)
return ret;
-@@ -548,8 +808,73 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -548,8 +941,76 @@ static void rs_free_iomappings(struct rsocket *rs)
}
}
+
+ if (qp->cm_id) {
+ if (qp->cm_id->qp) {
++ tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_dest);
+ epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
+ qp->cm_id->recv_cq_channel->fd, NULL);
+ rdma_destroy_qp(qp->cm_id);
+ }
+ rdma_destroy_id(qp->cm_id);
+ }
++
+ free(qp);
+}
+
+ if (rs->sbuf)
+ free(rs->sbuf);
+
++ tdestroy(rs->dest_map, free);
+ fastlock_destroy(&rs->map_lock);
+ fastlock_destroy(&rs->cq_wait_lock);
+ fastlock_destroy(&rs->cq_lock);
if (rs->index >= 0)
rs_remove(rs);
-@@ -581,7 +906,7 @@ static void rs_free(struct rsocket *rs)
+@@ -581,7 +1042,7 @@ static void rs_free(struct rsocket *rs)
rdma_destroy_id(rs->cm_id);
}
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
-@@ -635,29 +960,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -635,29 +1096,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
rs->sseq_comp = ntohs(conn->credits);
}
return rs->index;
err:
-@@ -671,9 +1021,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -671,9 +1157,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
int ret;
rs = idm_at(&idm, socket);
return ret;
}
-@@ -709,7 +1068,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -709,7 +1204,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
int ret;
rs = idm_at(&idm, socket);
if (!new_rs)
return ERR(ENOMEM);
-@@ -717,7 +1076,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -717,7 +1212,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
if (ret)
goto err;
if (ret < 0)
goto err;
-@@ -854,13 +1213,248 @@ connected:
- return ret;
- }
+@@ -825,42 +1320,309 @@ connected:
+ break;
+ }
+- rs_save_conn_data(rs, cresp);
+- rs->state = rs_connect_rdwr;
+- break;
+- case rs_accepting:
+- if (!(rs->fd_flags & O_NONBLOCK))
+- rs_set_nonblocking(rs, 0);
++ rs_save_conn_data(rs, cresp);
++ rs->state = rs_connect_rdwr;
++ break;
++ case rs_accepting:
++ if (!(rs->fd_flags & O_NONBLOCK))
++ rs_set_nonblocking(rs, 0);
++
++ ret = ucma_complete(rs->cm_id);
++ if (ret)
++ break;
++
++ rs->state = rs_connect_rdwr;
++ break;
++ default:
++ ret = ERR(EINVAL);
++ break;
++ }
++
++ if (ret) {
++ if (errno == EAGAIN || errno == EWOULDBLOCK) {
++ errno = EINPROGRESS;
++ } else {
++ rs->state = rs_connect_error;
++ rs->err = errno;
++ }
++ }
++ return ret;
++}
++
+static int ds_init_ep(struct rsocket *rs)
+{
+ struct ds_smsg *msg;
+ }
+}
+
++static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
++ socklen_t addrlen)
++{
++ struct ibv_port_attr port_attr;
++ struct ibv_ah_attr attr;
++ int ret;
++
++ memcpy(&qp->dest.addr, addr, addrlen);
++ qp->dest.qp = qp;
++ qp->dest.qpn = qp->cm_id->qp->qp_num;
++
++ ret = ibv_query_port(qp->cm_id->verbs, qp->cm_id->port_num, &port_attr);
++ if (ret)
++ return ret;
++
++ memset(&attr, 0, sizeof attr);
++ attr.dlid = port_attr.lid;
++ attr.port_num = qp->cm_id->port_num;
++ qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
++ if (!qp->dest.ah)
++ return ERR(ENOMEM);
++
++ tsearch(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
++ return 0;
++}
++
+static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
+ socklen_t addrlen, struct ds_qp **qp)
+{
+ if (ret)
+ goto err;
+
-+ event.events = EPOLLIN | EPOLLOUT;
++ ret = ds_add_qp_dest(*qp, src_addr, addrlen);
++ if (ret)
++ goto err;
++
++ event.events = EPOLLIN;
+ event.data.ptr = *qp;
+ ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
+ (*qp)->cm_id->recv_cq_channel->fd, &event);
+ socklen_t src_len;
+ struct ds_qp *qp;
+ int ret = 0;
-+
+
+- ret = ucma_complete(rs->cm_id);
+- if (ret)
+- break;
+ fastlock_acquire(&rs->map_lock);
+ dest = tfind(addr, &rs->dest_map, ds_compare_addr);
+ if (dest)
+ goto out;
-+
+
+- rs->state = rs_connect_rdwr;
+- break;
+- default:
+- ret = ERR(EINVAL);
+- break;
+ if (rs->state == rs_init) {
+ ret = ds_init_ep(rs);
+ if (ret)
+ goto out;
-+ }
-+
+ }
+
+- if (ret) {
+- if (errno == EAGAIN || errno == EWOULDBLOCK) {
+- errno = EINPROGRESS;
+- } else {
+- rs->state = rs_connect_error;
+- rs->err = errno;
+ ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
+ if (ret)
+ goto out;
+ if (ret)
+ goto out;
+
-+ *dest = calloc(1, sizeof(struct ds_dest));
-+ if (!*dest) {
-+ ret = ERR(ENOMEM);
-+ goto out;
-+ }
++ if ((addrlen != src_len) || memcmp(addr, src_addr, addrlen)) {
++ *dest = calloc(1, sizeof(struct ds_dest));
++ if (!*dest) {
++ ret = ERR(ENOMEM);
++ goto out;
+ }
+
-+ memcpy(&(*dest)->addr, addr, addrlen);
-+ (*dest)->qp = qp;
-+ tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
++ memcpy(&(*dest)->addr, addr, addrlen);
++ (*dest)->qp = qp;
++ tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
+ }
+out:
+ fastlock_release(&rs->map_lock);
-+ return ret;
-+}
-+
+ return ret;
+ }
+
int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
{
struct rsocket *rs;
}
static int rs_post_write_msg(struct rsocket *rs,
-@@ -902,6 +1496,24 @@ static int rs_post_write(struct rsocket *rs,
+@@ -902,6 +1664,24 @@ static int rs_post_write(struct rsocket *rs,
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
/*
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
-@@ -1045,7 +1657,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1045,7 +1825,7 @@ static int rs_poll_cq(struct rsocket *rs)
rs->state = rs_disconnected;
return 0;
} else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
}
break;
case RS_OP_WRITE:
-@@ -1187,6 +1799,165 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1187,6 +1967,165 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
return ret;
}
static int rs_nonblocking(struct rsocket *rs, int flags)
{
return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
-@@ -1218,9 +1989,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1218,9 +2157,19 @@ static int rs_can_send(struct rsocket *rs)
(rs->target_sgl[rs->target_sge].length != 0);
}
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1235,7 +2016,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1235,7 +2184,7 @@ static int rs_have_rdata(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
}
static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1244,6 +2025,66 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1244,6 +2193,66 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
!(rs->state & rs_connected);
}
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
{
size_t left = len;
-@@ -1289,6 +2130,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1289,6 +2298,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
int ret;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1338,7 +2186,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1338,7 +2354,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
rs->rbuf_bytes_avail += rsize;
}
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
-@@ -1349,6 +2197,14 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
+@@ -1349,6 +2365,14 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
{
int ret;
ret = rrecv(socket, buf, len, flags);
if (ret > 0 && src_addr)
rgetpeername(socket, src_addr, addrlen);
-@@ -1390,14 +2246,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1390,14 +2414,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
struct rs_iomap iom;
int ret;
ret = ERR(ECONNRESET);
break;
}
-@@ -1446,10 +2302,75 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1446,10 +2470,81 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+
+ hdr.tag = htonll(DS_UDP_TAG);
+ hdr.version = 1;
++ hdr.op = op;
+ memset(&hdr->reserved, 0, sizeof hdr->reserved);
+ hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
+
+ memset(&msg, 0, sizeof msg);
+ msg.msg_iov = miov;
+ msg.msg_iovlen = iovcnt + 1;
-+ return sendmsg(rs->fd, msg, flags);
++ return sendmsg(rs->udp_sock, msg, flags);
+}
+
-+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags)
++static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
++ int flags, uint8_t op)
+{
+ struct iovec iov;
-+ iov.iov_base = buf;
-+ iov_iov_len = len;
-+ return ds_sendv_udp(s, &iov, 1, flags);
++ if (buf && len) {
++ iov.iov_base = buf;
++ iov_iov_len = len;
++ return ds_sendv_udp(rs, &iov, 1, flags, op);
++ } else {
++ return ds_sendv_udp(rs, NULL, 0, flags, op);
++ }
+}
+
+static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
+ int flags, ret = 0;
+
+ if (!rs->conn_dest->ah)
-+ return ds_send_udp(rs, buf, len, flags);
++ return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
+
+ if (!ds_can_send(rs)) {
+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1463,6 +2384,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1463,6 +2558,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
int ret = 0;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1484,7 +2412,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1484,7 +2586,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1537,10 +2465,26 @@ out:
+@@ -1537,10 +2639,26 @@ out:
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
- if (dest_addr || addrlen)
- return ERR(EISCONN);
+ struct rsocket *rs;
-
-- return rsend(socket, buf, len, flags);
++
+ rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_STREAM) {
+ if (dest_addr || addrlen)
+ return ERR(EISCONN);
-+
+
+- return rsend(socket, buf, len, flags);
+ return rsend(socket, buf, len, flags);
+ }
+
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1599,7 +2543,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1599,7 +2717,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1652,7 +2596,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1652,7 +2770,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1948,7 +2892,7 @@ int rshutdown(int socket, int how)
+@@ -1689,8 +2807,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+ int ret;
+
+ check_cq:
+- if ((rs->state & rs_connected) || (rs->state == rs_disconnected) ||
+- (rs->state & rs_error)) {
++ if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) ||
++ (rs->state == rs_disconnected) || (rs->state & rs_error))) {
+ rs_process_cq(rs, nonblock, test);
+
+ revents = 0;
+@@ -1706,6 +2824,16 @@ check_cq:
+ }
+
+ return revents;
++ } else if (rs->type == SOCK_DGRAM) {
++ ds_process_cqs(rs, nonblock, test);
++
++ revents = 0;
++ if ((events & POLLIN) && rs_have_rdata(rs))
++ revents |= POLLIN;
++ if ((events & POLLOUT) && ds_can_send(rs))
++ revents |= POLLOUT;
++
++ return revents;
+ }
+
+ if (rs->state == rs_listening) {
+@@ -1765,11 +2893,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+ if (fds[i].revents)
+ return 1;
+
+- if (rs->state >= rs_connected)
+- rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
+- else
+- rfds[i].fd = rs->cm_id->channel->fd;
+-
++ if (rs->type == SOCK_STREAM) {
++ if (rs->state >= rs_connected)
++ rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
++ else
++ rfds[i].fd = rs->cm_id->channel->fd;
++ } else {
++ rfds[i].fd = rs->epfd;
++ }
+ rfds[i].events = POLLIN;
+ } else {
+ rfds[i].fd = fds[i].fd;
+@@ -1792,7 +2923,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+
+ rs = idm_lookup(&idm, fds[i].fd);
+ if (rs) {
+- rs_get_cq_event(rs);
++ if (rs->type == SOCK_STREAM)
++ rs_get_cq_event(rs);
++ else
++ ds_get_cq_event(rs);
+ fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
+ } else {
+ fds[i].revents = rfds[i].revents;
+@@ -1948,7 +3082,7 @@ int rshutdown(int socket, int how)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
return 0;
}
-@@ -1958,10 +2902,10 @@ int rshutdown(int socket, int how)
+@@ -1958,10 +3092,10 @@ int rshutdown(int socket, int how)
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
-@@ -1986,13 +2930,31 @@ int rshutdown(int socket, int how)
+@@ -1986,13 +3120,31 @@ int rshutdown(int socket, int how)
return 0;
}
rs_free(rs);
return 0;
-@@ -2017,8 +2979,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2017,8 +3169,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2026,8 +2992,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2026,8 +3182,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rsetsockopt(int socket, int level, int optname,
-@@ -2039,18 +3009,26 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2039,18 +3199,26 @@ int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
opt_on = *(int *) optval;
break;
case SO_RCVBUF:
-@@ -2100,9 +3078,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2100,9 +3268,11 @@ int rsetsockopt(int socket, int level, int optname,
opts = &rs->ipv6_opts;
switch (optname) {
case IPV6_V6ONLY:
opt_on = *(int *) optval;
break;
default:
-@@ -2314,7 +3294,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2314,7 +3484,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2348,7 +3328,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2348,7 +3518,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
return offset;
}
-@@ -2360,7 +3340,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2360,7 +3530,7 @@ int riounmap(int socket, void *buf, size_t len)
int ret = 0;
rs = idm_at(&idm, socket);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
-@@ -2381,7 +3361,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2381,7 +3551,7 @@ int riounmap(int socket, void *buf, size_t len)
}
ret = ERR(EINVAL);
out:
return ret;
}
-@@ -2425,7 +3405,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2425,7 +3595,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
rs_conn_can_send);
if (ret)
break;
+++ /dev/null
-Bottom: 0ddd3b5ffa7ef87e8179ff9779cbd2f76c69ac89
-Top: bd15dfc9adaf449efbd6fcccaac92ea3ed7ad81b
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-12-03 11:22:59 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index d7c3163..07cf31d 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -78,8 +78,11 @@ struct rs_svc_msg {
- };
-
- static pthread_t svc_id;
-+static int svc_sock[2];
- static int svc_cnt;
--static int svc_fds[2];
-+static int svc_size;
-+static struct rsocket **svc_rss;
-+static struct pollfd *svc_fds;
-
- static uint16_t def_iomap_size = 0;
- static uint16_t def_inline = 64;
-@@ -227,11 +230,19 @@ struct ds_header {
- #define DS_IPV4_HDR_LEN 8
- #define DS_IPV6_HDR_LEN 24
-
-+struct ds_dest {
-+ union socket_addr addr; /* must be first */
-+ struct ds_qp *qp;
-+ struct ibv_ah *ah;
-+ uint32_t qpn;
-+};
-+
- struct ds_qp {
- dlist_t list;
- struct rsocket *rs;
- struct rdma_cm_id *cm_id;
- struct ds_header hdr;
-+ struct ds_dest dest;
-
- struct ibv_mr *smr;
- struct ibv_mr *rmr;
-@@ -240,13 +251,6 @@ struct ds_qp {
- int cq_armed;
- };
-
--struct ds_dest {
-- union socket_addr addr; /* must be first */
-- struct ds_qp *qp;
-- struct ibv_ah *ah;
-- uint32_t qpn;
--};
--
- struct rsocket {
- int type;
- int index;
-@@ -332,13 +336,21 @@ struct rsocket {
- int iomap_pending;
- };
-
--#define DS_UDP_TAG 0x5555555555555555ULL
-+#define DS_UDP_TAG 0x55555555
-
- struct ds_udp_header {
-- uint64_t tag;
-+ uint32_t tag;
- uint8_t version;
-- uint8_t reserved[3];
-- uint32_t qpn; /* upper 8-bits reserved */
-+ uint8_t op;
-+ uint8_t length;
-+ uint8_t reserved;
-+ uint32_t qpn; /* lower 8-bits reserved */
-+ union {
-+ uint32_t ipv4;
-+ struct {
-+ uint8_t addr[16];
-+ } ipv6;
-+ } addr;
- };
-
- #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
-@@ -362,8 +374,118 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
- }
- }
-
-+static int rs_svc_grow_sets(void)
-+{
-+ struct rsocket **rss;
-+ struct pollfd *fds;
-+ void *set;
-+
-+ set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
-+ if (!set)
-+ return ENOMEM;
-+
-+ svc_size += 2;
-+ rss = set;
-+ fds = set + sizeof(*rss) * svc_size;
-+ if (svc_cnt) {
-+ memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
-+ memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
-+ }
-+
-+ free(svc_rss);
-+ free(svc_fds);
-+ svc_rss = rss;
-+ svc_fds = fds;
-+ return 0;
-+}
-+
-+/*
-+ * Index 0 is reserved for the service's communication socket.
-+ */
-+static int rs_svc_add_rs(struct rsocket *rs)
-+{
-+ int ret;
-+
-+ if (svc_cnt >= svc_size - 1) {
-+ ret = rs_svc_grow_sets();
-+ if (ret)
-+ return ret;
-+ }
-+
-+ svc_rss[++svc_cnt] = rs;
-+ svc_fds[svc_cnt].fd = rs->udp_sock;
-+ svc_fds[svc_cnt].events = POLLIN;
-+ svc_fds[svc_cnt].revents = 0;
-+ return 0;
-+}
-+
-+static int rs_svc_rm_rs(struct rsocket *rs)
-+{
-+ int i;
-+
-+ for (i = 1; i <= svc_cnt; i++) {
-+ if (svc_rss[i] == rs) {
-+ svc_cnt--;
-+ svc_rss[i] = svc_rss[svc_cnt];
-+ svc_fds[i] = svc_fds[svc_cnt];
-+ return 0;
-+ }
-+ }
-+ return EBADF;
-+}
-+
-+static void rs_svc_process_sock(void)
-+{
-+ struct rs_svc_msg msg;
-+
-+ read(svc_sock[1], &msg, sizeof msg);
-+ switch (msg.op) {
-+ case RS_SVC_INSERT:
-+ msg.status = rs_svc_add_rs(msg.rs);
-+ break;
-+ case RS_SVC_REMOVE:
-+ msg.status = rs_svc_rm_rs(msg.rs);
-+ break;
-+ default:
-+ msg.status = ENOTSUP;
-+ break;
-+ }
-+ write(svc_sock[1], &msg, sizeof msg);
-+}
-+
-+static void rs_svc_process_rs(struct rsocket *rs)
-+{
-+
-+}
-+
- static int rs_svc_run(void *arg)
- {
-+ struct rs_svc_msg msg;
-+ int i, ret;
-+
-+ ret = rs_svc_grow_sets();
-+ if (ret) {
-+ msg.status = ret;
-+ write(svc_sock[1] &msg, sizeof msg);
-+ return ret;
-+ }
-+
-+ svc_fds[0].fd = svc_sock[1];
-+ svc_fds[0].events = POLLIN;
-+ do {
-+ for (i = 0; i <= svc_cnt; i++)
-+ svc_fds[i].revents = 0;
-+
-+ poll(svc_fds, svc_cnt + 1, -1);
-+ if (svc_fds[0].revents)
-+ rs_svc_process_sock();
-+
-+ for (i = 1; i <= svc_cnt; i++) {
-+ if (svc_fds[i].revents)
-+ rs_svc_process_rs(svc_rss[i]);
-+ }
-+ } while (svc_cnt > 1);
-+
- return 0;
- }
-
-@@ -374,27 +496,35 @@ static int rs_svc_insert(struct rsocket *rs)
-
- pthread_mutex_lock(&mut);
- if (!svc_cnt) {
-- ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds);
-+ ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_sock);
- if (ret)
-- goto out;
-+ goto err1;
-
- ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
- if (ret) {
-- close(svc_fds[0]);
-- close(svc_fds[1]);
- ret = ERR(ret);
-- goto out;
-+ goto err2;
- }
- }
-
- msg.op = RS_SVC_INSERT;
- msg.status = EINVAL;
- msg.rs = rs;
-- svc_cnt++;
-- write(svc_fds[0], &msg, sizeof msg);
-- read(svc_fds[0], &msg, sizeof msg);
-+ write(svc_sock[0], &msg, sizeof msg);
-+ read(svc_sock[0], &msg, sizeof msg);
- ret = ERR(msg.status);
--out:
-+ if (ret && !svn_cnt)
-+ goto err3;
-+
-+ pthread_mutex_unlock(&mut);
-+ return ret;
-+
-+err3:
-+ pthread_join(svc_id, NULL);
-+err2:
-+ close(svc_sock[0]);
-+ close(svc_sock[1]);
-+err1:
- pthread_mutex_unlock(&mut);
- return ret;
- }
-@@ -408,11 +538,14 @@ static int rs_svc_remove(struct rsocket *rs)
- msg.op = RS_SVC_REMOVE;
- msg.status = EINVAL;
- msg.rs = rs;
-- write(svc_fds[0], &msg, sizeof msg);
-- read(svc_fds[0], &msg, sizeof msg);
-+ write(svc_sock[0], &msg, sizeof msg);
-+ read(svc_sock[0], &msg, sizeof msg);
- ret = ERR(msg.status);
-- if (!ret && !--svn_cnt)
-+ if (!svn_cnt) {
- pthread_join(svc_id, NULL);
-+ close(svc_sock[0]);
-+ close(svc_sock[1]);
-+ }
-
- pthread_mutex_unlock(&mut);
- return ret;
-@@ -821,12 +954,14 @@ static void ds_free_qp(struct ds_qp *qp)
-
- if (qp->cm_id) {
- if (qp->cm_id->qp) {
-+ tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_dest);
- epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
- qp->cm_id->recv_cq_channel->fd, NULL);
- rdma_destroy_qp(qp->cm_id);
- }
- rdma_destroy_id(qp->cm_id);
- }
-+
- free(qp);
- }
-
-@@ -860,6 +995,7 @@ static void ds_free(struct rsocket *rs)
- if (rs->sbuf)
- free(rs->sbuf);
-
-+ tdestroy(rs->dest_map, free);
- fastlock_destroy(&rs->map_lock);
- fastlock_destroy(&rs->cq_wait_lock);
- fastlock_destroy(&rs->cq_lock);
-@@ -1317,6 +1453,32 @@ static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
- }
- }
-
-+static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
-+ socklen_t addrlen)
-+{
-+ struct ibv_port_attr port_attr;
-+ struct ibv_ah_attr attr;
-+ int ret;
-+
-+ memcpy(&qp->dest.addr, addr, addrlen);
-+ qp->dest.qp = qp;
-+ qp->dest.qpn = qp->cm_id->qp->qp_num;
-+
-+ ret = ibv_query_port(qp->cm_id->verbs, qp->cm_id->port_num, &port_attr);
-+ if (ret)
-+ return ret;
-+
-+ memset(&attr, 0, sizeof attr);
-+ attr.dlid = port_attr.lid;
-+ attr.port_num = qp->cm_id->port_num;
-+ qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
-+ if (!qp->dest.ah)
-+ return ERR(ENOMEM);
-+
-+ tsearch(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
-+ return 0;
-+}
-+
- static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
- socklen_t addrlen, struct ds_qp **qp)
- {
-@@ -1361,7 +1523,11 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
- if (ret)
- goto err;
-
-- event.events = EPOLLIN | EPOLLOUT;
-+ ret = ds_add_qp_dest(*qp, src_addr, addrlen);
-+ if (ret)
-+ goto err;
-+
-+ event.events = EPOLLIN;
- event.data.ptr = *qp;
- ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
- (*qp)->cm_id->recv_cq_channel->fd, &event);
-@@ -1424,15 +1590,17 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
- if (ret)
- goto out;
-
-- *dest = calloc(1, sizeof(struct ds_dest));
-- if (!*dest) {
-- ret = ERR(ENOMEM);
-- goto out;
-- }
-+ if ((addrlen != src_len) || memcmp(addr, src_addr, addrlen)) {
-+ *dest = calloc(1, sizeof(struct ds_dest));
-+ if (!*dest) {
-+ ret = ERR(ENOMEM);
-+ goto out;
-+ }
-
-- memcpy(&(*dest)->addr, addr, addrlen);
-- (*dest)->qp = qp;
-- tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
-+ memcpy(&(*dest)->addr, addr, addrlen);
-+ (*dest)->qp = qp;
-+ tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
-+ }
- out:
- fastlock_release(&rs->map_lock);
- return ret;
-@@ -2319,6 +2487,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-
- hdr.tag = htonll(DS_UDP_TAG);
- hdr.version = 1;
-+ hdr.op = op;
- memset(&hdr->reserved, 0, sizeof hdr->reserved);
- hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
-
-@@ -2329,15 +2498,20 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- memset(&msg, 0, sizeof msg);
- msg.msg_iov = miov;
- msg.msg_iovlen = iovcnt + 1;
-- return sendmsg(rs->fd, msg, flags);
-+ return sendmsg(rs->udp_sock, msg, flags);
- }
-
--static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags)
-+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
-+ int flags, uint8_t op)
- {
- struct iovec iov;
-- iov.iov_base = buf;
-- iov_iov_len = len;
-- return ds_sendv_udp(s, &iov, 1, flags);
-+ if (buf && len) {
-+ iov.iov_base = buf;
-+ iov_iov_len = len;
-+ return ds_sendv_udp(rs, &iov, 1, flags, op);
-+ } else {
-+ return ds_sendv_udp(rs, NULL, 0, flags, op);
-+ }
- }
-
- static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
-@@ -2348,7 +2522,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
- int flags, ret = 0;
-
- if (!rs->conn_dest->ah)
-- return ds_send_udp(rs, buf, len, flags);
-+ return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
-
- if (!ds_can_send(rs)) {
- ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
-@@ -2633,8 +2807,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
- int ret;
-
- check_cq:
-- if ((rs->state & rs_connected) || (rs->state == rs_disconnected) ||
-- (rs->state & rs_error)) {
-+ if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) ||
-+ (rs->state == rs_disconnected) || (rs->state & rs_error))) {
- rs_process_cq(rs, nonblock, test);
-
- revents = 0;
-@@ -2650,6 +2824,16 @@ check_cq:
- }
-
- return revents;
-+ } else if (rs->type == SOCK_DGRAM) {
-+ ds_process_cqs(rs, nonblock, test);
-+
-+ revents = 0;
-+ if ((events & POLLIN) && rs_have_rdata(rs))
-+ revents |= POLLIN;
-+ if ((events & POLLOUT) && ds_can_send(rs))
-+ revents |= POLLOUT;
-+
-+ return revents;
- }
-
- if (rs->state == rs_listening) {
-@@ -2709,11 +2893,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
- if (fds[i].revents)
- return 1;
-
-- if (rs->state >= rs_connected)
-- rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
-- else
-- rfds[i].fd = rs->cm_id->channel->fd;
--
-+ if (rs->type == SOCK_STREAM) {
-+ if (rs->state >= rs_connected)
-+ rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
-+ else
-+ rfds[i].fd = rs->cm_id->channel->fd;
-+ } else {
-+ rfds[i].fd = rs->epfd;
-+ }
- rfds[i].events = POLLIN;
- } else {
- rfds[i].fd = fds[i].fd;
-@@ -2736,7 +2923,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
-
- rs = idm_lookup(&idm, fds[i].fd);
- if (rs) {
-- rs_get_cq_event(rs);
-+ if (rs->type == SOCK_STREAM)
-+ rs_get_cq_event(rs);
-+ else
-+ ds_get_cq_event(rs);
- fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
- } else {
- fds[i].revents = rfds[i].revents;