Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
-Top: f8258dbc93f14acdb67621e51c1696fca92c841a
+Top: 89a1bd159135ab5aae205202e62cdc91b9ab3b0d
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-11-09 10:26:38 -0800
{
errno = err;
diff --git a/src/rsocket.c b/src/rsocket.c
-index a060f66..4631b1d 100644
+index a060f66..219aa4a 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -47,6 +47,8 @@
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
-@@ -64,6 +66,36 @@
+@@ -64,6 +66,27 @@
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
-+enum {
-+ RS_SVC_INSERT,
-+ RS_SVC_REMOVE
-+};
-+
+struct rsocket;
+
-+
-+#define PRINTADDR(a) \
-+printf("%s port %x ip %x\n", __func__, \
-+ ((struct sockaddr_in *)a)->sin_port, \
-+ ((struct sockaddr_in *)a)->sin_addr.s_addr)
-+
-+
++enum {
++ RS_SVC_DGRAM = 1 << 0
++};
+
+struct rs_svc_msg {
-+ uint32_t op;
++ uint32_t svcs;
+ uint32_t status;
+ struct rsocket *rs;
+};
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
-@@ -100,6 +132,14 @@ enum {
+@@ -100,6 +123,14 @@ enum {
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
#define rs_msg_op(imm_data) (imm_data >> 29)
#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
enum {
RS_CTRL_DISCONNECT,
-@@ -111,6 +151,18 @@ struct rs_msg {
+@@ -111,6 +142,18 @@ struct rs_msg {
uint32_t data;
};
struct rs_sge {
uint64_t addr;
uint32_t key;
-@@ -145,8 +197,6 @@ struct rs_conn_data {
+@@ -145,8 +188,6 @@ struct rs_conn_data {
struct rs_sge data_buf;
};
/*
* rsocket states are ordered as passive, connecting, connected, disconnected.
*/
-@@ -160,9 +210,9 @@ enum rs_state {
+@@ -160,9 +201,9 @@ enum rs_state {
rs_connecting = rs_opening | 0x0040,
rs_accepting = rs_opening | 0x0080,
rs_connected = 0x0100,
rs_connect_error = 0x0800,
rs_disconnected = 0x1000,
rs_error = 0x2000,
-@@ -170,68 +220,249 @@ enum rs_state {
+@@ -170,68 +211,251 @@ enum rs_state {
#define RS_OPT_SWAP_SGL 1
fastlock_t cq_lock;
fastlock_t cq_wait_lock;
- fastlock_t iomap_lock;
+-
+ fastlock_t map_lock; /* acquire slock first if needed */
+
+ union {
+ struct ds_smsg *smsg_free;
+ };
+ };
-
++
++ int svcs;
int opts;
long fd_flags;
uint64_t so_opts;
- void *target_buffer_list;
- volatile struct rs_sge *target_sgl;
- struct rs_iomap *target_iomap;
--
++#define DS_UDP_TAG 0x55555555
+
- uint32_t rbuf_size;
- struct ibv_mr *rmr;
- uint8_t *rbuf;
-+#define DS_UDP_TAG 0x55555555
-
+-
- uint32_t sbuf_size;
- struct ibv_mr *smr;
- struct ibv_sge ssgl[2];
+ }
+}
+
-+static int rs_add_to_svc(struct rsocket *rs)
++static int rs_modify_svcs(struct rsocket *rs, int svcs)
+{
+ struct rs_svc_msg msg;
+ int ret;
+ if (!svc_cnt) {
+ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock);
+ if (ret)
-+ goto err1;
++ goto unlock;
+
+ ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
+ if (ret) {
+ ret = ERR(ret);
-+ goto err2;
++ goto closepair;
+ }
+ }
+
-+ msg.op = RS_SVC_INSERT;
++ msg.svcs = svcs;
+ msg.status = EINVAL;
+ msg.rs = rs;
+ write(svc_sock[0], &msg, sizeof msg);
+ read(svc_sock[0], &msg, sizeof msg);
+ ret = rdma_seterrno(msg.status);
-+ if (ret && !svc_cnt)
-+ goto err3;
++ if (svc_cnt)
++ goto unlock;
++// if (ret && !svc_cnt)
++// goto join;
++//
++// pthread_mutex_unlock(&mut);
++// return ret;
+
-+ pthread_mutex_unlock(&mut);
-+ return ret;
-+
-+err3:
+ pthread_join(svc_id, NULL);
-+err2:
++closepair:
+ close(svc_sock[0]);
+ close(svc_sock[1]);
-+err1:
++unlock:
+ pthread_mutex_unlock(&mut);
+ return ret;
+}
+
-+static int rs_remove_from_svc(struct rsocket *rs)
-+{
-+ struct rs_svc_msg msg;
-+ int ret;
-+
-+ pthread_mutex_lock(&mut);
-+ msg.op = RS_SVC_REMOVE;
-+ msg.status = EINVAL;
-+ msg.rs = rs;
-+ write(svc_sock[0], &msg, sizeof msg);
-+ read(svc_sock[0], &msg, sizeof msg);
-+ ret = ERR(msg.status);
-+ if (!svc_cnt) {
-+ pthread_join(svc_id, NULL);
-+ close(svc_sock[0]);
-+ close(svc_sock[1]);
-+ }
-+
-+ pthread_mutex_unlock(&mut);
-+ return ret;
-+}
++//static void rs_remove_from_svc(struct rsocket *rs)
++//{
++// struct rs_svc_msg msg;
++// int ret;
++//
++// pthread_mutex_lock(&mut);
++// if (svc_cnt) {
++// msg.op = RS_SVC_REMOVE;
++// msg.status = EINVAL;
++// msg.rs = rs;
++// write(svc_sock[0], &msg, sizeof msg);
++// read(svc_sock[0], &msg, sizeof msg);
++// }
++//
++// if (!svc_cnt) {
++// pthread_join(svc_id, NULL);
++// close(svc_sock[0]);
++// close(svc_sock[1]);
++// }
++//
++// pthread_mutex_unlock(&mut);
++//}
+
+static int ds_compare_addr(const void *dst1, const void *dst2)
+{
+
+ len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
+ sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
-+// printf("%s len %d sizeof sin %d memcmp %d\n", __func__, len, sizeof(struct sockaddr_in), memcmp(dst1,dst2,len));
+ return memcmp(dst1, dst2, len);
+}
+
static int rs_value_to_scale(int value, int bits)
{
return value <= (1 << (bits - 1)) ?
-@@ -307,10 +538,10 @@ out:
+@@ -307,10 +531,10 @@ out:
pthread_mutex_unlock(&mut);
}
pthread_mutex_unlock(&mut);
return rs->index;
}
-@@ -322,7 +553,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -322,7 +546,7 @@ static void rs_remove(struct rsocket *rs)
pthread_mutex_unlock(&mut);
}
{
struct rsocket *rs;
-@@ -330,29 +561,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -330,29 +554,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
if (!rs)
return NULL;
dlist_init(&rs->iomap_list);
dlist_init(&rs->iomap_queue);
return rs;
-@@ -360,13 +601,26 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -360,13 +594,26 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
static int rs_set_nonblocking(struct rsocket *rs, long arg)
{
return ret;
}
-@@ -390,17 +644,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -390,17 +637,39 @@ static void rs_set_qp_size(struct rsocket *rs)
rs->rq_size = 2;
}
rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
if (!rs->smr)
-@@ -410,7 +686,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -410,7 +679,7 @@ static int rs_init_bufs(struct rsocket *rs)
sizeof(*rs->target_iomap) * rs->target_iomap_size;
rs->target_buffer_list = malloc(len);
if (!rs->target_buffer_list)
rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
if (!rs->target_mr)
-@@ -423,7 +699,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -423,7 +692,7 @@ static int rs_init_bufs(struct rsocket *rs)
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
if (!rs->rmr)
-@@ -440,37 +716,57 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -440,37 +709,57 @@ static int rs_init_bufs(struct rsocket *rs)
return 0;
}
{
struct ibv_recv_wr wr, *bad;
-@@ -482,6 +778,26 @@ rs_post_recv(struct rsocket *rs)
+@@ -482,6 +771,26 @@ rs_post_recv(struct rsocket *rs)
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
static int rs_create_ep(struct rsocket *rs)
{
struct ibv_qp_init_attr qp_attr;
-@@ -492,7 +808,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -492,7 +801,7 @@ static int rs_create_ep(struct rsocket *rs)
if (ret)
return ret;
if (ret)
return ret;
-@@ -549,8 +865,73 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -549,8 +858,70 @@ static void rs_free_iomappings(struct rsocket *rs)
}
}
+{
+ struct ds_qp *qp;
+
-+ if (rs->state & (rs_readable | rs_writable))
-+ rs_remove_from_svc(rs);
-+
+ if (rs->udp_sock >= 0)
+ close(rs->udp_sock);
+
if (rs->index >= 0)
rs_remove(rs);
-@@ -582,7 +963,7 @@ static void rs_free(struct rsocket *rs)
+@@ -582,7 +953,7 @@ static void rs_free(struct rsocket *rs)
rdma_destroy_id(rs->cm_id);
}
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
-@@ -636,29 +1017,88 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -636,29 +1007,88 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
rs->sseq_comp = ntohs(conn->credits);
}
+ }
+ msg->next = NULL;
+
-+ ret = rs_add_to_svc(rs);
++ ret = rs_modify_svcs(rs, RS_SVC_DGRAM);
+ if (ret)
+ return ret;
+
return rs->index;
err:
-@@ -672,9 +1112,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -672,9 +1102,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
int ret;
rs = idm_at(&idm, socket);
return ret;
}
-@@ -710,7 +1159,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -710,7 +1149,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
int ret;
rs = idm_at(&idm, socket);
if (!new_rs)
return ERR(ENOMEM);
-@@ -718,7 +1167,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -718,7 +1157,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
if (ret)
goto err;
if (ret < 0)
goto err;
-@@ -729,7 +1178,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -729,7 +1168,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
}
if (rs->fd_flags & O_NONBLOCK)
ret = rs_create_ep(new_rs);
if (ret)
-@@ -831,7 +1280,7 @@ connected:
+@@ -831,7 +1270,7 @@ connected:
break;
case rs_accepting:
if (!(rs->fd_flags & O_NONBLOCK))
ret = ucma_complete(rs->cm_id);
if (ret)
-@@ -855,13 +1304,251 @@ connected:
+@@ -855,13 +1294,240 @@ connected:
return ret;
}
+ int sock, ret;
+ uint16_t port;
+
-+// printf("dest: "); PRINTADDR(dest_addr);
-+ *src_len = sizeof src_addr;
++ *src_len = sizeof *src_addr;
+ ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
-+// printf("src: "); PRINTADDR(src_addr);
+ if (ret || !rs_any_addr(src_addr))
+ return ret;
+
+ if (ret)
+ goto out;
+
-+ *src_len = sizeof src_addr;
++ *src_len = sizeof *src_addr;
+ ret = getsockname(sock, &src_addr->sa, src_len);
+ src_addr->sin.sin_port = port;
-+// printf("selected src: ");
+out:
+ close(sock);
+ return ret;
+static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
+{
+ if (addr->sa.sa_family == AF_INET) {
-+// PRINTADDR(addr);
+ hdr->version = 4;
+ hdr->length = DS_IPV4_HDR_LEN;
+ hdr->port = addr->sin.sin_port;
+ struct ibv_ah_attr attr;
+ int ret;
+
-+// printf("%s\n", __func__);
+ memcpy(&qp->dest.addr, addr, addrlen);
+ qp->dest.qp = qp;
+ qp->dest.qpn = qp->cm_id->qp->qp_num;
+ attr.dlid = port_attr.lid;
+ attr.port_num = qp->cm_id->port_num;
+ qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
-+// printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid,
-+// attr.port_num, qp->dest.qpn);
+ if (!qp->dest.ah)
+ return ERR(ENOMEM);
+
+ struct epoll_event event;
+ int i, ret;
+
-+// PRINTADDR(src_addr);
+ qp = calloc(1, sizeof(*qp));
+ if (!qp)
+ return ERR(ENOMEM);
+ struct ds_dest **tdest, *new_dest;
+ int ret = 0;
+
-+// PRINTADDR(addr);
+ fastlock_acquire(&rs->map_lock);
+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
+ if (tdest)
+ goto found;
+
+ ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
-+// printf("get src: "); PRINTADDR(&src_addr);
+ if (ret)
+ goto out;
+
+ }
+
+ fastlock_acquire(&rs->slock);
-+// PRINTADDR(addr);
+ ret = connect(rs->udp_sock, addr, addrlen);
+ if (!ret)
+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
}
static int rs_post_write_msg(struct rsocket *rs,
-@@ -903,6 +1590,26 @@ static int rs_post_write(struct rsocket *rs,
+@@ -903,6 +1569,24 @@ static int rs_post_write(struct rsocket *rs,
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
+ wr.wr.ud.ah = rs->conn_dest->ah;
+ wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
+ wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
-+// printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah,
-+// rs->conn_dest->qpn);
+
+ return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
+}
/*
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
-@@ -1046,7 +1753,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1046,7 +1730,7 @@ static int rs_poll_cq(struct rsocket *rs)
rs->state = rs_disconnected;
return 0;
} else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
}
break;
case RS_OP_WRITE:
-@@ -1133,46 +1840,230 @@ static int rs_get_cq_event(struct rsocket *rs)
+@@ -1133,46 +1817,213 @@ static int rs_get_cq_event(struct rsocket *rs)
*/
static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
{
+ if (ds_wr_is_recv(wc.wr_id)) {
+ if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
+ ds_valid_recv(qp, &wc)) {
-+// printf("%s recv over QP\n", __func__);
+ rs->rqe_avail--;
+ rmsg = &rs->dmsg[rs->rmsg_tail];
+ rmsg->qp = qp;
+ if (++rs->rmsg_tail == rs->rq_size + 1)
+ rs->rmsg_tail = 0;
+ } else {
-+// printf("%s invalid recv\n", __func__);
+ ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id));
+ }
+ } else {
+ smsg = (struct ds_smsg *)
+ (rs->sbuf + ds_wr_offset(wc.wr_id));
-+// printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
+ smsg->next = rs->smsg_free;
+ rs->smsg_free = smsg;
+ rs->sqe_avail++;
+ if (!rs->cq_armed)
+ return 0;
+
-+// printf("wait for epoll event\n");
+ ret = epoll_wait(rs->epfd, &event, 1, -1);
-+// printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (ret <= 0)
+ return ret;
+
+
+static int rs_have_rdata(struct rsocket *rs);
+static int ds_can_send(struct rsocket *rs);
++static int rs_poll_all(struct rsocket *rs);
++static int ds_all_sends_done(struct rsocket *rs);
+
+static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+ int ret = 0;
-+
-+ if (test == rs_have_rdata)
-+ printf("%s test rs_have_rdata\n", __func__);
-+ else if (test == ds_can_send)
-+ printf("%s test ds_can_send\n", __func__);
-+ else
-+ printf("%s test ?\n", __func__);
fastlock_acquire(&rs->cq_lock);
do {
- ret = rs_poll_cq(rs);
+ ds_poll_cqs(rs);
if (test(rs)) {
-+ printf("%s test succeeded\n", __func__);
ret = 0;
break;
- } else if (ret) {
- break;
} else if (nonblock) {
ret = ERR(EWOULDBLOCK);
-+ printf("%s nonblocking \n", __func__);
} else if (!rs->cq_armed) {
- ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
-+ printf("%s req notify \n", __func__);
+ ds_req_notify_cqs(rs);
rs->cq_armed = 1;
} else {
fastlock_release(&rs->cq_lock);
- ret = rs_get_cq_event(rs);
-+ printf("%s wait for event \n", __func__);
+ ret = ds_get_cq_event(rs);
-+ printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
fastlock_release(&rs->cq_wait_lock);
fastlock_acquire(&rs->cq_lock);
}
- rs_update_credits(rs);
fastlock_release(&rs->cq_lock);
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
return ret;
}
do {
- ret = rs_process_cq(rs, 1, test);
+ ret = ds_process_cqs(rs, 1, test);
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
if (!ret || nonblock || errno != EWOULDBLOCK)
return ret;
-@@ -1184,7 +2075,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1184,7 +2035,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
(e.tv_usec - s.tv_usec) + 1;
} while (poll_time <= polling_time);
return ret;
}
-@@ -1219,9 +2110,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1219,9 +2070,19 @@ static int rs_can_send(struct rsocket *rs)
(rs->target_sgl[rs->target_sge].length != 0);
}
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1236,7 +2137,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1236,7 +2097,7 @@ static int rs_have_rdata(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
}
static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1245,6 +2146,74 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1245,6 +2106,67 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
!(rs->state & rs_connected);
}
+ struct ds_header *hdr;
+ int ret;
+
-+// printf("%s \n", __func__);
+ if (!(rs->state & rs_readable))
+ return ERR(EINVAL);
+
+ if (!rs_have_rdata(rs)) {
-+// printf("%s need rdata \n", __func__);
+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_have_rdata);
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (ret)
+ return ret;
+ }
+
+ memcpy(buf, (void *) hdr + hdr->length, len);
+ if (addrlen)
-+{
+ ds_set_src(src_addr, addrlen, hdr);
-+//PRINTADDR(src_addr);
-+}
+
+ if (!(flags & MSG_PEEK)) {
+ ds_post_recv(rs, rmsg->qp, rmsg->offset);
+ rs->rqe_avail++;
+ }
+
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+ return len;
+}
+
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
{
size_t left = len;
-@@ -1290,6 +2259,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1290,6 +2212,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
int ret;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1339,7 +2315,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1339,7 +2268,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
rs->rbuf_bytes_avail += rsize;
}
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
-@@ -1348,8 +2324,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1348,8 +2277,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
ret = rrecv(socket, buf, len, flags);
if (ret > 0 && src_addr)
rgetpeername(socket, src_addr, addrlen);
-@@ -1391,14 +2376,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1391,14 +2329,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
struct rs_iomap iom;
int ret;
ret = ERR(ECONNRESET);
break;
}
-@@ -1447,10 +2432,102 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2385,92 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+ struct iovec miov[8];
+ ssize_t ret;
+
-+// printf("%s\n", __func__);
+ if (iovcnt > 8)
+ return ERR(ENOTSUP);
+
+ msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
+ msg.msg_iov = miov;
+ msg.msg_iovlen = iovcnt + 1;
-+// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
+ ret = sendmsg(rs->udp_sock, &msg, flags);
+ return ret > 0 ? ret - hdr.length : ret;
+}
+ int flags, uint8_t op)
+{
+ struct iovec iov;
-+// printf("%s\n", __func__);
+ if (buf && len) {
-+// printf("%s have buffer\n", __func__);
+ iov.iov_base = (void *) buf;
+ iov.iov_len = len;
+ return ds_sendv_udp(rs, &iov, 1, flags, op);
+ } else {
-+// printf("%s no buffer\n", __func__);
+ return ds_sendv_udp(rs, NULL, 0, flags, op);
+ }
+}
+ uint64_t offset;
+ int ret = 0;
+
-+// printf("%s\n", __func__);
+ if (!rs->conn_dest->ah)
+ return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
+
+ if (!ds_can_send(rs)) {
-+ printf("can't send\n");
+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
-+ printf("ds_get_comp %d\n", ret);
+ if (ret)
+ return ret;
+ }
+ sge.lkey = rs->conn_dest->qp->smr->lkey;
+ offset = (uint8_t *) msg - rs->sbuf;
+
-+// printf("%s - sending over QP\n", __func__);
+ ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
-+// printf("%s - ds_post_send %d %s\n", __func__, ret, strerror(errno));
+ return ret ? ret : len;
+}
+
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1464,6 +2541,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2484,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
int ret = 0;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1485,7 +2569,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2512,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1538,10 +2622,51 @@ out:
+@@ -1538,10 +2565,34 @@ out:
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
+ int ret;
- return rsend(socket, buf, len, flags);
-+// PRINTADDR(dest_addr);
-+// printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
+ rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_STREAM) {
+ if (dest_addr || addrlen)
+ }
+
+ fastlock_acquire(&rs->slock);
-+// printf("%s - checking conn dest %p\n", __func__, rs->conn_dest);
-+// printf("dest addr: af %d", dest_addr->sa_family); PRINTADDR(dest_addr);
-+// if (rs->conn_dest) {
-+// int i;
-+// printf("conn addr: af %d", rs->conn_dest->addr.sa.sa_family); PRINTADDR(&rs->conn_dest->addr);
-+// for (i=0;i<16;i++)
-+// printf("%x", ((uint8_t *)dest_addr)[i]);
-+// printf("\n");
-+// for (i=0;i<16;i++)
-+// printf("%x", ((uint8_t *)&rs->conn_dest->addr)[i]);
-+// printf("\n");
-+// }
+ if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
-+// printf("%s - getting conn dest\n", __func__);
+ ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
-+// printf("%s - get conn dest %d %s\n", __func__, ret, strerror(errno));
+ if (ret)
+ goto out;
+ }
+
+ ret = dsend(rs, buf, len, flags);
-+// printf("%s - dsend %d %s\n", __func__, ret, strerror(errno));
+out:
+ fastlock_release(&rs->slock);
+ return ret;
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1600,7 +2725,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2651,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1653,7 +2778,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2704,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1690,8 +2815,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2741,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
int ret;
check_cq:
rs_process_cq(rs, nonblock, test);
revents = 0;
-@@ -1707,6 +2832,16 @@ check_cq:
+@@ -1707,6 +2758,16 @@ check_cq:
}
return revents;
}
if (rs->state == rs_listening) {
-@@ -1766,11 +2901,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2827,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
if (fds[i].revents)
return 1;
rfds[i].events = POLLIN;
} else {
rfds[i].fd = fds[i].fd;
-@@ -1793,7 +2931,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2857,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
-@@ -1949,7 +3090,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +3016,7 @@ int rshutdown(int socket, int how)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
return 0;
}
-@@ -1959,10 +3100,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +3026,10 @@ int rshutdown(int socket, int how)
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
-@@ -1987,13 +3128,29 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3054,32 @@ int rshutdown(int socket, int how)
return 0;
}
+static void ds_shutdown(struct rsocket *rs)
+{
++ if (rs->svcs)
++ rs_modify_svcs(rs, 0);
++
+ if (rs->fd_flags & O_NONBLOCK)
+ rs_set_nonblocking(rs, 0);
+
rs_free(rs);
return 0;
-@@ -2018,8 +3175,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3104,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2027,8 +3188,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3117,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rsetsockopt(int socket, int level, int optname,
-@@ -2040,22 +3205,31 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,22 +3134,31 @@ int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
rs->rbuf_size = (*(uint32_t *) optval) << 1;
ret = 0;
break;
-@@ -2101,9 +3275,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2101,9 +3204,11 @@ int rsetsockopt(int socket, int level, int optname,
opts = &rs->ipv6_opts;
switch (optname) {
case IPV6_V6ONLY:
opt_on = *(int *) optval;
break;
default:
-@@ -2315,7 +3491,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3420,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2349,7 +3525,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3454,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
return offset;
}
-@@ -2361,7 +3537,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3466,7 @@ int riounmap(int socket, void *buf, size_t len)
int ret = 0;
rs = idm_at(&idm, socket);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
-@@ -2382,7 +3558,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3487,7 @@ int riounmap(int socket, void *buf, size_t len)
}
ret = ERR(EINVAL);
out:
return ret;
}
-@@ -2426,7 +3602,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3531,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -2476,3 +3652,278 @@ out:
+@@ -2476,3 +3581,272 @@ out:
return (ret && left == count) ? ret : count - left;
}
+ struct rs_svc_msg msg;
+
+ read(svc_sock[1], &msg, sizeof msg);
-+ switch (msg.op) {
-+ case RS_SVC_INSERT:
++ if (msg.svcs & RS_SVC_DGRAM) {
+ msg.status = rs_svc_add_rs(msg.rs);
-+ break;
-+ case RS_SVC_REMOVE:
++ } else if (!msg.svcs) {
+ msg.status = rs_svc_rm_rs(msg.rs);
-+ break;
-+ default:
-+ msg.status = ENOTSUP;
-+ break;
+ }
++
++ if (!msg.status)
++ msg.rs->svcs = msg.svcs;
+ write(svc_sock[1], &msg, sizeof msg);
+}
+
+ struct ibv_sge sge;
+ uint64_t offset;
+
-+// PRINTADDR(src);
+ if (!ds_can_send(rs)) {
+ if (ds_get_comp(rs, 0, ds_can_send))
+ return;
+ rs->sqe_avail--;
+
+ ds_format_hdr(&hdr, src);
-+// printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version,
-+// hdr.length, hdr.port);
+ memcpy((void *) msg, &hdr, hdr.length);
+ memcpy((void *) msg + hdr.length, buf, len);
-+// printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf));
+ sge.addr = (uintptr_t) msg;
+ sge.length = hdr.length + len;
+ sge.lkey = rs->conn_dest->qp->smr->lkey;
+ offset = (uint8_t *) msg - rs->sbuf;
+
-+// printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version,
-+// ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port);
+ ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
+}
+
+ int len, ret;
+
+ ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
-+// PRINTADDR(&addr);
-+// printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8]));
+ if (ret < DS_UDP_IPV4_HDR_LEN)
+ return;
+
+ if (ret)
+ return;
+
++ if (udp_hdr->op == RS_OP_DATA) {
++ fastlock_acquire(&rs->slock);
++ cur_dest = rs->conn_dest;
++ rs->conn_dest = dest;
++ ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
++ rs->conn_dest = cur_dest;
++ fastlock_release(&rs->slock);
++ }
++
+ if (!dest->ah || (dest->qpn != udp_hdr->qpn))
+ rs_svc_create_ah(rs, dest, udp_hdr->qpn);
+
+ /* to do: handle when dest local ip address doesn't match udp ip */
-+ if (udp_hdr->op != RS_OP_DATA)
-+ return;
-+
-+ fastlock_acquire(&rs->slock);
-+ cur_dest = rs->conn_dest;
-+ rs->conn_dest = &dest->qp->dest;
-+ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
-+
-+ rs->conn_dest = dest;
-+ ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
-+ rs->conn_dest = cur_dest;
-+ fastlock_release(&rs->slock);
++ if (udp_hdr->op == RS_OP_DATA) {
++ fastlock_acquire(&rs->slock);
++ cur_dest = rs->conn_dest;
++ rs->conn_dest = &dest->qp->dest;
++ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
++ rs->conn_dest = cur_dest;
++ fastlock_release(&rs->slock);
++ }
+}
+
+static void *rs_svc_run(void *arg)
+++ /dev/null
-Bottom: f8258dbc93f14acdb67621e51c1696fca92c841a
-Top: 89a1bd159135ab5aae205202e62cdc91b9ab3b0d
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-12-17 13:41:21 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 4631b1d..219aa4a 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -66,23 +66,14 @@
- static struct index_map idm;
- static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
-
--enum {
-- RS_SVC_INSERT,
-- RS_SVC_REMOVE
--};
--
- struct rsocket;
-
--
--#define PRINTADDR(a) \
--printf("%s port %x ip %x\n", __func__, \
-- ((struct sockaddr_in *)a)->sin_port, \
-- ((struct sockaddr_in *)a)->sin_addr.s_addr)
--
--
-+enum {
-+ RS_SVC_DGRAM = 1 << 0
-+};
-
- struct rs_svc_msg {
-- uint32_t op;
-+ uint32_t svcs;
- uint32_t status;
- struct rsocket *rs;
- };
-@@ -318,6 +309,7 @@ struct rsocket {
- };
- };
-
-+ int svcs;
- int opts;
- long fd_flags;
- uint64_t so_opts;
-@@ -387,7 +379,7 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
- }
- }
-
--static int rs_add_to_svc(struct rsocket *rs)
-+static int rs_modify_svcs(struct rsocket *rs, int svcs)
- {
- struct rs_svc_msg msg;
- int ret;
-@@ -396,58 +388,60 @@ static int rs_add_to_svc(struct rsocket *rs)
- if (!svc_cnt) {
- ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock);
- if (ret)
-- goto err1;
-+ goto unlock;
-
- ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
- if (ret) {
- ret = ERR(ret);
-- goto err2;
-+ goto closepair;
- }
- }
-
-- msg.op = RS_SVC_INSERT;
-+ msg.svcs = svcs;
- msg.status = EINVAL;
- msg.rs = rs;
- write(svc_sock[0], &msg, sizeof msg);
- read(svc_sock[0], &msg, sizeof msg);
- ret = rdma_seterrno(msg.status);
-- if (ret && !svc_cnt)
-- goto err3;
-+ if (svc_cnt)
-+ goto unlock;
-+// if (ret && !svc_cnt)
-+// goto join;
-+//
-+// pthread_mutex_unlock(&mut);
-+// return ret;
-
-- pthread_mutex_unlock(&mut);
-- return ret;
--
--err3:
- pthread_join(svc_id, NULL);
--err2:
-+closepair:
- close(svc_sock[0]);
- close(svc_sock[1]);
--err1:
-+unlock:
- pthread_mutex_unlock(&mut);
- return ret;
- }
-
--static int rs_remove_from_svc(struct rsocket *rs)
--{
-- struct rs_svc_msg msg;
-- int ret;
--
-- pthread_mutex_lock(&mut);
-- msg.op = RS_SVC_REMOVE;
-- msg.status = EINVAL;
-- msg.rs = rs;
-- write(svc_sock[0], &msg, sizeof msg);
-- read(svc_sock[0], &msg, sizeof msg);
-- ret = ERR(msg.status);
-- if (!svc_cnt) {
-- pthread_join(svc_id, NULL);
-- close(svc_sock[0]);
-- close(svc_sock[1]);
-- }
--
-- pthread_mutex_unlock(&mut);
-- return ret;
--}
-+//static void rs_remove_from_svc(struct rsocket *rs)
-+//{
-+// struct rs_svc_msg msg;
-+// int ret;
-+//
-+// pthread_mutex_lock(&mut);
-+// if (svc_cnt) {
-+// msg.op = RS_SVC_REMOVE;
-+// msg.status = EINVAL;
-+// msg.rs = rs;
-+// write(svc_sock[0], &msg, sizeof msg);
-+// read(svc_sock[0], &msg, sizeof msg);
-+// }
-+//
-+// if (!svc_cnt) {
-+// pthread_join(svc_id, NULL);
-+// close(svc_sock[0]);
-+// close(svc_sock[1]);
-+// }
-+//
-+// pthread_mutex_unlock(&mut);
-+//}
-
- static int ds_compare_addr(const void *dst1, const void *dst2)
- {
-@@ -459,7 +453,6 @@ static int ds_compare_addr(const void *dst1, const void *dst2)
-
- len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
- sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
--// printf("%s len %d sizeof sin %d memcmp %d\n", __func__, len, sizeof(struct sockaddr_in), memcmp(dst1,dst2,len));
- return memcmp(dst1, dst2, len);
- }
-
-@@ -893,9 +886,6 @@ static void ds_free(struct rsocket *rs)
- {
- struct ds_qp *qp;
-
-- if (rs->state & (rs_readable | rs_writable))
-- rs_remove_from_svc(rs);
--
- if (rs->udp_sock >= 0)
- close(rs->udp_sock);
-
-@@ -1056,7 +1046,7 @@ static int ds_init_ep(struct rsocket *rs)
- }
- msg->next = NULL;
-
-- ret = rs_add_to_svc(rs);
-+ ret = rs_modify_svcs(rs, RS_SVC_DGRAM);
- if (ret)
- return ret;
-
-@@ -1322,10 +1312,8 @@ static int ds_get_src_addr(struct rsocket *rs,
- int sock, ret;
- uint16_t port;
-
--// printf("dest: "); PRINTADDR(dest_addr);
-- *src_len = sizeof src_addr;
-+ *src_len = sizeof *src_addr;
- ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
--// printf("src: "); PRINTADDR(src_addr);
- if (ret || !rs_any_addr(src_addr))
- return ret;
-
-@@ -1338,10 +1326,9 @@ static int ds_get_src_addr(struct rsocket *rs,
- if (ret)
- goto out;
-
-- *src_len = sizeof src_addr;
-+ *src_len = sizeof *src_addr;
- ret = getsockname(sock, &src_addr->sa, src_len);
- src_addr->sin.sin_port = port;
--// printf("selected src: ");
- out:
- close(sock);
- return ret;
-@@ -1350,7 +1337,6 @@ out:
- static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
- {
- if (addr->sa.sa_family == AF_INET) {
--// PRINTADDR(addr);
- hdr->version = 4;
- hdr->length = DS_IPV4_HDR_LEN;
- hdr->port = addr->sin.sin_port;
-@@ -1371,7 +1357,6 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
- struct ibv_ah_attr attr;
- int ret;
-
--// printf("%s\n", __func__);
- memcpy(&qp->dest.addr, addr, addrlen);
- qp->dest.qp = qp;
- qp->dest.qpn = qp->cm_id->qp->qp_num;
-@@ -1384,8 +1369,6 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
- attr.dlid = port_attr.lid;
- attr.port_num = qp->cm_id->port_num;
- qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
--// printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid,
--// attr.port_num, qp->dest.qpn);
- if (!qp->dest.ah)
- return ERR(ENOMEM);
-
-@@ -1401,7 +1384,6 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
- struct epoll_event event;
- int i, ret;
-
--// PRINTADDR(src_addr);
- qp = calloc(1, sizeof(*qp));
- if (!qp)
- return ERR(ENOMEM);
-@@ -1490,14 +1472,12 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
- struct ds_dest **tdest, *new_dest;
- int ret = 0;
-
--// PRINTADDR(addr);
- fastlock_acquire(&rs->map_lock);
- tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
- if (tdest)
- goto found;
-
- ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
--// printf("get src: "); PRINTADDR(&src_addr);
- if (ret)
- goto out;
-
-@@ -1542,7 +1522,6 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- }
-
- fastlock_acquire(&rs->slock);
--// PRINTADDR(addr);
- ret = connect(rs->udp_sock, addr, addrlen);
- if (!ret)
- ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-@@ -1604,8 +1583,6 @@ static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
- wr.wr.ud.ah = rs->conn_dest->ah;
- wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
- wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
--// printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah,
--// rs->conn_dest->qpn);
-
- return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
- }
-@@ -1935,7 +1912,6 @@ static void ds_poll_cqs(struct rsocket *rs)
- if (ds_wr_is_recv(wc.wr_id)) {
- if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
- ds_valid_recv(qp, &wc)) {
--// printf("%s recv over QP\n", __func__);
- rs->rqe_avail--;
- rmsg = &rs->dmsg[rs->rmsg_tail];
- rmsg->qp = qp;
-@@ -1944,13 +1920,11 @@ static void ds_poll_cqs(struct rsocket *rs)
- if (++rs->rmsg_tail == rs->rq_size + 1)
- rs->rmsg_tail = 0;
- } else {
--// printf("%s invalid recv\n", __func__);
- ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id));
- }
- } else {
- smsg = (struct ds_smsg *)
- (rs->sbuf + ds_wr_offset(wc.wr_id));
--// printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
- smsg->next = rs->smsg_free;
- rs->smsg_free = smsg;
- rs->sqe_avail++;
-@@ -1993,9 +1967,7 @@ static int ds_get_cq_event(struct rsocket *rs)
- if (!rs->cq_armed)
- return 0;
-
--// printf("wait for epoll event\n");
- ret = epoll_wait(rs->epfd, &event, 1, -1);
--// printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
- if (ret <= 0)
- return ret;
-
-@@ -2012,46 +1984,35 @@ static int ds_get_cq_event(struct rsocket *rs)
-
- static int rs_have_rdata(struct rsocket *rs);
- static int ds_can_send(struct rsocket *rs);
-+static int rs_poll_all(struct rsocket *rs);
-+static int ds_all_sends_done(struct rsocket *rs);
-
- static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
- {
- int ret = 0;
-
-- if (test == rs_have_rdata)
-- printf("%s test rs_have_rdata\n", __func__);
-- else if (test == ds_can_send)
-- printf("%s test ds_can_send\n", __func__);
-- else
-- printf("%s test ?\n", __func__);
--
- fastlock_acquire(&rs->cq_lock);
- do {
- ds_poll_cqs(rs);
- if (test(rs)) {
-- printf("%s test succeeded\n", __func__);
- ret = 0;
- break;
- } else if (nonblock) {
- ret = ERR(EWOULDBLOCK);
-- printf("%s nonblocking \n", __func__);
- } else if (!rs->cq_armed) {
-- printf("%s req notify \n", __func__);
- ds_req_notify_cqs(rs);
- rs->cq_armed = 1;
- } else {
- fastlock_acquire(&rs->cq_wait_lock);
- fastlock_release(&rs->cq_lock);
-
-- printf("%s wait for event \n", __func__);
- ret = ds_get_cq_event(rs);
-- printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
- fastlock_release(&rs->cq_wait_lock);
- fastlock_acquire(&rs->cq_lock);
- }
- } while (!ret);
-
- fastlock_release(&rs->cq_lock);
--// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- return ret;
- }
-
-@@ -2063,7 +2024,6 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
-
- do {
- ret = ds_process_cqs(rs, 1, test);
--// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- if (!ret || nonblock || errno != EWOULDBLOCK)
- return ret;
-
-@@ -2178,15 +2138,12 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
- struct ds_header *hdr;
- int ret;
-
--// printf("%s \n", __func__);
- if (!(rs->state & rs_readable))
- return ERR(EINVAL);
-
- if (!rs_have_rdata(rs)) {
--// printf("%s need rdata \n", __func__);
- ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
- rs_have_rdata);
--// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- if (ret)
- return ret;
- }
-@@ -2198,10 +2155,7 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
-
- memcpy(buf, (void *) hdr + hdr->length, len);
- if (addrlen)
--{
- ds_set_src(src_addr, addrlen, hdr);
--//PRINTADDR(src_addr);
--}
-
- if (!(flags & MSG_PEEK)) {
- ds_post_recv(rs, rmsg->qp, rmsg->offset);
-@@ -2210,7 +2164,6 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
- rs->rqe_avail++;
- }
-
--// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- return len;
- }
-
-@@ -2444,7 +2397,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- struct iovec miov[8];
- ssize_t ret;
-
--// printf("%s\n", __func__);
- if (iovcnt > 8)
- return ERR(ENOTSUP);
-
-@@ -2471,7 +2423,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
- msg.msg_iov = miov;
- msg.msg_iovlen = iovcnt + 1;
--// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
- ret = sendmsg(rs->udp_sock, &msg, flags);
- return ret > 0 ? ret - hdr.length : ret;
- }
-@@ -2480,14 +2431,11 @@ static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
- int flags, uint8_t op)
- {
- struct iovec iov;
--// printf("%s\n", __func__);
- if (buf && len) {
--// printf("%s have buffer\n", __func__);
- iov.iov_base = (void *) buf;
- iov.iov_len = len;
- return ds_sendv_udp(rs, &iov, 1, flags, op);
- } else {
--// printf("%s no buffer\n", __func__);
- return ds_sendv_udp(rs, NULL, 0, flags, op);
- }
- }
-@@ -2499,14 +2447,11 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
- uint64_t offset;
- int ret = 0;
-
--// printf("%s\n", __func__);
- if (!rs->conn_dest->ah)
- return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
-
- if (!ds_can_send(rs)) {
-- printf("can't send\n");
- ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
-- printf("ds_get_comp %d\n", ret);
- if (ret)
- return ret;
- }
-@@ -2522,9 +2467,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
- sge.lkey = rs->conn_dest->qp->smr->lkey;
- offset = (uint8_t *) msg - rs->sbuf;
-
--// printf("%s - sending over QP\n", __func__);
- ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
--// printf("%s - ds_post_send %d %s\n", __func__, ret, strerror(errno));
- return ret ? ret : len;
- }
-
-@@ -2625,8 +2568,6 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
- struct rsocket *rs;
- int ret;
-
--// PRINTADDR(dest_addr);
--// printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
- rs = idm_at(&idm, socket);
- if (rs->type == SOCK_STREAM) {
- if (dest_addr || addrlen)
-@@ -2642,28 +2583,13 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
- }
-
- fastlock_acquire(&rs->slock);
--// printf("%s - checking conn dest %p\n", __func__, rs->conn_dest);
--// printf("dest addr: af %d", dest_addr->sa_family); PRINTADDR(dest_addr);
--// if (rs->conn_dest) {
--// int i;
--// printf("conn addr: af %d", rs->conn_dest->addr.sa.sa_family); PRINTADDR(&rs->conn_dest->addr);
--// for (i=0;i<16;i++)
--// printf("%x", ((uint8_t *)dest_addr)[i]);
--// printf("\n");
--// for (i=0;i<16;i++)
--// printf("%x", ((uint8_t *)&rs->conn_dest->addr)[i]);
--// printf("\n");
--// }
- if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
--// printf("%s - getting conn dest\n", __func__);
- ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
--// printf("%s - get conn dest %d %s\n", __func__, ret, strerror(errno));
- if (ret)
- goto out;
- }
-
- ret = dsend(rs, buf, len, flags);
--// printf("%s - dsend %d %s\n", __func__, ret, strerror(errno));
- out:
- fastlock_release(&rs->slock);
- return ret;
-@@ -3130,6 +3056,9 @@ int rshutdown(int socket, int how)
-
- static void ds_shutdown(struct rsocket *rs)
- {
-+ if (rs->svcs)
-+ rs_modify_svcs(rs, 0);
-+
- if (rs->fd_flags & O_NONBLOCK)
- rs_set_nonblocking(rs, 0);
-
-@@ -3718,17 +3647,14 @@ static void rs_svc_process_sock(void)
- struct rs_svc_msg msg;
-
- read(svc_sock[1], &msg, sizeof msg);
-- switch (msg.op) {
-- case RS_SVC_INSERT:
-+ if (msg.svcs & RS_SVC_DGRAM) {
- msg.status = rs_svc_add_rs(msg.rs);
-- break;
-- case RS_SVC_REMOVE:
-+ } else if (!msg.svcs) {
- msg.status = rs_svc_rm_rs(msg.rs);
-- break;
-- default:
-- msg.status = ENOTSUP;
-- break;
- }
-+
-+ if (!msg.status)
-+ msg.rs->svcs = msg.svcs;
- write(svc_sock[1], &msg, sizeof msg);
- }
-
-@@ -3828,7 +3754,6 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
- struct ibv_sge sge;
- uint64_t offset;
-
--// PRINTADDR(src);
- if (!ds_can_send(rs)) {
- if (ds_get_comp(rs, 0, ds_can_send))
- return;
-@@ -3839,18 +3764,13 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
- rs->sqe_avail--;
-
- ds_format_hdr(&hdr, src);
--// printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version,
--// hdr.length, hdr.port);
- memcpy((void *) msg, &hdr, hdr.length);
- memcpy((void *) msg + hdr.length, buf, len);
--// printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf));
- sge.addr = (uintptr_t) msg;
- sge.length = hdr.length + len;
- sge.lkey = rs->conn_dest->qp->smr->lkey;
- offset = (uint8_t *) msg - rs->sbuf;
-
--// printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version,
--// ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port);
- ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
- }
-
-@@ -3863,8 +3783,6 @@ static void rs_svc_process_rs(struct rsocket *rs)
- int len, ret;
-
- ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
--// PRINTADDR(&addr);
--// printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8]));
- if (ret < DS_UDP_IPV4_HDR_LEN)
- return;
-
-@@ -3879,22 +3797,27 @@ static void rs_svc_process_rs(struct rsocket *rs)
- if (ret)
- return;
-
-+ if (udp_hdr->op == RS_OP_DATA) {
-+ fastlock_acquire(&rs->slock);
-+ cur_dest = rs->conn_dest;
-+ rs->conn_dest = dest;
-+ ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
-+ rs->conn_dest = cur_dest;
-+ fastlock_release(&rs->slock);
-+ }
-+
- if (!dest->ah || (dest->qpn != udp_hdr->qpn))
- rs_svc_create_ah(rs, dest, udp_hdr->qpn);
-
- /* to do: handle when dest local ip address doesn't match udp ip */
-- if (udp_hdr->op != RS_OP_DATA)
-- return;
--
-- fastlock_acquire(&rs->slock);
-- cur_dest = rs->conn_dest;
-- rs->conn_dest = &dest->qp->dest;
-- rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
--
-- rs->conn_dest = dest;
-- ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
-- rs->conn_dest = cur_dest;
-- fastlock_release(&rs->slock);
-+ if (udp_hdr->op == RS_OP_DATA) {
-+ fastlock_acquire(&rs->slock);
-+ cur_dest = rs->conn_dest;
-+ rs->conn_dest = &dest->qp->dest;
-+ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
-+ rs->conn_dest = cur_dest;
-+ fastlock_release(&rs->slock);
-+ }
- }
-
- static void *rs_svc_run(void *arg)