Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
-Top: f1822f3bbe2c9b92b5e2ca8b4e5c3cece427c5ff
+Top: 136936c0a82503ee0da9daccd8b948cd09e58b64
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-11-09 10:26:38 -0800
+rsocket QP.
\ No newline at end of file
diff --git a/src/cma.c b/src/cma.c
-index 388be61..0f58966 100755
+index 388be61..ff9b426 100755
--- a/src/cma.c
+++ b/src/cma.c
@@ -513,7 +513,7 @@ int rdma_destroy_id(struct rdma_cm_id *id)
{
if (!addr)
return 0;
-@@ -2232,9 +2232,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
+@@ -2232,9 +2232,19 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
int ucma_max_qpsize(struct rdma_cm_id *id)
{
struct cma_id_private *id_priv;
+ if (id && id_priv->cma_dev) {
+ max_size = id_priv->cma_dev->max_qpsize;
+ } else {
++ ucma_init();
+ for (i = 0; i < cma_dev_cnt; i++) {
+ if (!max_size || max_size > cma_dev_array[i].max_qpsize)
+ max_size = cma_dev_array[i].max_qpsize;
{
errno = err;
diff --git a/src/rsocket.c b/src/rsocket.c
-index a060f66..c61d689 100644
+index a060f66..6fa4c68 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -47,6 +47,8 @@
rs_connect_error = 0x0800,
rs_disconnected = 0x1000,
rs_error = 0x2000,
-@@ -170,68 +212,248 @@ enum rs_state {
+@@ -170,68 +212,249 @@ enum rs_state {
#define RS_OPT_SWAP_SGL 1
+
+ msg.op = RS_SVC_INSERT;
+ msg.status = EINVAL;
++ printf("%s rs %p\n", __func__, rs);
+ msg.rs = rs;
+ write(svc_sock[0], &msg, sizeof msg);
+ read(svc_sock[0], &msg, sizeof msg);
static int rs_value_to_scale(int value, int bits)
{
return value <= (1 << (bits - 1)) ?
-@@ -307,10 +529,10 @@ out:
+@@ -307,10 +530,10 @@ out:
pthread_mutex_unlock(&mut);
}
pthread_mutex_unlock(&mut);
return rs->index;
}
-@@ -322,7 +544,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -322,7 +545,7 @@ static void rs_remove(struct rsocket *rs)
pthread_mutex_unlock(&mut);
}
{
struct rsocket *rs;
-@@ -330,29 +552,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -330,29 +553,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
if (!rs)
return NULL;
dlist_init(&rs->iomap_list);
dlist_init(&rs->iomap_queue);
return rs;
-@@ -360,13 +592,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -360,13 +593,29 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
static int rs_set_nonblocking(struct rsocket *rs, long arg)
{
+ if (!ret && rs->state < rs_connected)
+ ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
+ } else {
++ printf("%s set nonblock\n", __func__);
+ ret = fcntl(rs->epfd, F_SETFL, arg);
++ printf("%s fcntl %d\n", __func__, ret);
+
+ if (!ret && rs->qp_list) {
+ qp = rs->qp_list;
return ret;
}
-@@ -390,17 +636,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -390,17 +639,43 @@ static void rs_set_qp_size(struct rsocket *rs)
rs->rq_size = 2;
}
+{
+ uint16_t max_size;
+
++ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
++ rs->rq_size, rs->rbuf_size);
+ max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
+
+ if (rs->sq_size > max_size)
+ rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
+ else
+ rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
++ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
++ rs->rq_size, rs->rbuf_size);
+}
+
static int rs_init_bufs(struct rsocket *rs)
rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
if (!rs->smr)
-@@ -410,7 +678,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -410,7 +685,7 @@ static int rs_init_bufs(struct rsocket *rs)
sizeof(*rs->target_iomap) * rs->target_iomap_size;
rs->target_buffer_list = malloc(len);
if (!rs->target_buffer_list)
rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
if (!rs->target_mr)
-@@ -423,7 +691,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -423,7 +698,7 @@ static int rs_init_bufs(struct rsocket *rs)
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
if (!rs->rmr)
-@@ -440,15 +708,32 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -440,37 +715,56 @@ static int rs_init_bufs(struct rsocket *rs)
return 0;
}
-static int rs_create_cq(struct rsocket *rs)
+static int ds_init_bufs(struct ds_qp *qp)
- {
-- rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
-- if (!rs->cm_id->recv_cq_channel)
++{
+ qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
+ if (!qp->rbuf)
+ return ERR(ENOMEM);
+
+ qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
+ if (!qp->smr)
- return -1;
-
-- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
-- rs->cm_id, rs->cm_id->recv_cq_channel, 0);
-- if (!rs->cm_id->recv_cq)
++ return -1;
++
+ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
+ if (!qp->rmr)
+ return -1;
+}
+
+static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
-+{
+ {
+- rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
+- if (!rs->cm_id->recv_cq_channel)
+ cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
++ printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel);
+ if (!cm_id->recv_cq_channel)
-+ return -1;
-+
+ return -1;
+
+- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
+- rs->cm_id, rs->cm_id->recv_cq_channel, 0);
+- if (!rs->cm_id->recv_cq)
+ cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
+ cm_id, cm_id->recv_cq_channel, 0);
++ printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size);
+ if (!cm_id->recv_cq)
goto err1;
if (rs->fd_flags & O_NONBLOCK) {
-@@ -456,21 +741,20 @@ static int rs_create_cq(struct rsocket *rs)
++ printf("%s set nonblock\n", __func__);
+ if (rs_set_nonblocking(rs, O_NONBLOCK))
goto err2;
}
{
struct ibv_recv_wr wr, *bad;
-@@ -482,6 +766,23 @@ rs_post_recv(struct rsocket *rs)
+@@ -482,6 +776,23 @@ rs_post_recv(struct rsocket *rs)
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
static int rs_create_ep(struct rsocket *rs)
{
struct ibv_qp_init_attr qp_attr;
-@@ -492,7 +793,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -492,7 +803,7 @@ static int rs_create_ep(struct rsocket *rs)
if (ret)
return ret;
if (ret)
return ret;
-@@ -549,8 +850,74 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -549,8 +860,73 @@ static void rs_free_iomappings(struct rsocket *rs)
}
}
+
+static void ds_free(struct rsocket *rs)
+{
++ struct ds_qp *qp;
++
+ if (rs->state & (rs_readable | rs_writable))
+ rs_remove_from_svc(rs);
+
+ if (rs->dmsg)
+ free(rs->dmsg);
+
-+ if (rs->smsg_free)
-+ free(rs->smsg_free);
-+
-+ while (rs->qp_list) {
-+ ds_remove_qp(rs, rs->qp_list);
-+ ds_free_qp(rs->qp_list);
++ while ((qp = rs->qp_list)) {
++ ds_remove_qp(rs, qp);
++ ds_free_qp(qp);
+ }
+
+ if (rs->epfd >= 0)
if (rs->index >= 0)
rs_remove(rs);
-@@ -582,7 +949,7 @@ static void rs_free(struct rsocket *rs)
+@@ -582,7 +958,7 @@ static void rs_free(struct rsocket *rs)
rdma_destroy_id(rs->cm_id);
}
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
-@@ -636,29 +1003,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -636,29 +1012,89 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
rs->sseq_comp = ntohs(conn->credits);
}
+
+ return 0;
+}
++
++static int ds_init_ep(struct rsocket *rs)
++{
++ struct ds_smsg *msg;
++ int i, ret;
++
++ ds_set_qp_size(rs);
++
++ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
++ if (!rs->sbuf)
++ return ERR(ENOMEM);
++
++ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
++ if (!rs->dmsg)
++ return ERR(ENOMEM);
++
++ rs->sqe_avail = rs->sq_size;
++ rs->rqe_avail = rs->rq_size;
++
++ rs->smsg_free = (struct ds_smsg *) rs->sbuf;
++ msg = rs->smsg_free;
++ for (i = 0; i < rs->sq_size - 1; i++) {
++ msg->next = (void *) msg + RS_SNDLOWAT;
++ msg = msg->next;
++ }
++ msg->next = NULL;
++
++ ret = rs_add_to_svc(rs);
++ if (ret)
++ return ret;
++
++ rs->state = rs_readable | rs_writable;
++ return 0;
++}
+
int rsocket(int domain, int type, int protocol)
{
+ ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
+ if (ret)
+ goto err;
-
-- ret = rs_insert(rs);
++
+ rs->cm_id->route.addr.src_addr.sa_family = domain;
+ index = rs->cm_id->channel->fd;
+ } else {
++ printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size);
+ ret = ds_init(rs, domain);
+ if (ret)
+ goto err;
-+
+
+- ret = rs_insert(rs);
+ index = rs->udp_sock;
+ }
+
return rs->index;
err:
-@@ -672,9 +1064,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -672,9 +1108,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
int ret;
rs = idm_at(&idm, socket);
+ if (!ret)
+ rs->state = rs_bound;
+ } else {
-+ ret = bind(rs->udp_sock, addr, addrlen);
-+ if (!ret) {
-+ ret = rs_add_to_svc(rs);
-+ if (!ret)
-+ rs->state = rs_readable | rs_writable;
++ if (rs->state == rs_init) {
++ ret = ds_init_ep(rs);
++ if (ret)
++ return ret;
+ }
++ ret = bind(rs->udp_sock, addr, addrlen);
+ }
return ret;
}
-@@ -710,7 +1111,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -710,7 +1155,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
int ret;
rs = idm_at(&idm, socket);
if (!new_rs)
return ERR(ENOMEM);
-@@ -718,7 +1119,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -718,7 +1163,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
if (ret)
goto err;
if (ret < 0)
goto err;
-@@ -855,13 +1256,268 @@ connected:
+@@ -855,13 +1300,256 @@ connected:
return ret;
}
-+static int ds_init_ep(struct rsocket *rs)
-+{
-+ struct ds_smsg *msg;
-+ int i, ret;
-+
-+ ds_set_qp_size(rs);
-+
-+ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
-+ if (!rs->sbuf)
-+ return ERR(ENOMEM);
-+
-+ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
-+ if (!rs->dmsg)
-+ return ERR(ENOMEM);
-+
-+ rs->sbuf_bytes_avail = rs->sbuf_size;
-+ rs->sqe_avail = rs->sq_size;
-+ rs->rqe_avail = rs->rq_size;
-+
-+ rs->smsg_free = (struct ds_smsg *) rs->sbuf;
-+ msg = rs->smsg_free;
-+ for (i = 0; i < rs->sq_size - 1; i++) {
-+ msg->next = (void *) msg + i * RS_SNDLOWAT;
-+ msg = msg->next;
-+ }
-+ msg->next = NULL;
-+
-+ ret = rs_add_to_svc(rs);
-+ if (ret)
-+ return ret;
-+
-+ rs->state = rs_readable | rs_writable;
-+ return 0;
-+}
-+
+static int rs_any_addr(const union socket_addr *addr)
+{
+ if (addr->sa.sa_family == AF_INET) {
+}
+
+static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-+ socklen_t addrlen, struct ds_qp **qp)
++ socklen_t addrlen, struct ds_qp **new_qp)
+{
++ struct ds_qp *qp;
+ struct ibv_qp_init_attr qp_attr;
+ struct epoll_event event;
+ int i, ret;
+
-+ *qp = calloc(1, sizeof(struct ds_qp));
-+ if (!*qp)
++printf("%s\n", __func__);
++ qp = calloc(1, sizeof(*qp));
++ if (!qp)
+ return ERR(ENOMEM);
+
-+ (*qp)->rs = rs;
-+ ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
++ qp->rs = rs;
++ ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
++ printf("%s rdma_create_id %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
-+ ds_format_hdr(&(*qp)->hdr, src_addr);
-+ ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
++ ds_format_hdr(&qp->hdr, src_addr);
++ ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
++ printf("%s rdma_bind_addr %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
-+ ret = ds_init_bufs(*qp);
++ ret = ds_init_bufs(qp);
++ printf("%s ds_init_bufs %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
-+ ret = rs_create_cq(rs, (*qp)->cm_id);
++ ret = rs_create_cq(rs, qp->cm_id);
++ printf("%s rs_create_cq %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ memset(&qp_attr, 0, sizeof qp_attr);
+ qp_attr.qp_context = qp;
-+ qp_attr.send_cq = rs->cm_id->send_cq;
-+ qp_attr.recv_cq = rs->cm_id->recv_cq;
++ qp_attr.send_cq = qp->cm_id->send_cq;
++ qp_attr.recv_cq = qp->cm_id->recv_cq;
+ qp_attr.qp_type = IBV_QPT_UD;
+ qp_attr.sq_sig_all = 1;
+ qp_attr.cap.max_send_wr = rs->sq_size;
+ qp_attr.cap.max_send_sge = 2;
+ qp_attr.cap.max_recv_sge = 1;
+ qp_attr.cap.max_inline_data = rs->sq_inline;
-+ ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
++ ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
++ printf("%s rdma_create_qp %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
-+ ret = ds_add_qp_dest(*qp, src_addr, addrlen);
++ ret = ds_add_qp_dest(qp, src_addr, addrlen);
++ printf("%s ds_add_qp_dest %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ event.events = EPOLLIN;
-+ event.data.ptr = *qp;
++ event.data.ptr = qp;
+ ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
-+ (*qp)->cm_id->recv_cq_channel->fd, &event);
++ qp->cm_id->recv_cq_channel->fd, &event);
++ printf("%s epoll_ctl %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ for (i = 0; i < rs->rq_size; i++) {
-+ ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
++ ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT);
+ if (ret)
+ goto err;
+ }
+
-+ ds_insert_qp(rs, *qp);
++ ds_insert_qp(rs, qp);
++ *new_qp = qp;
+ return 0;
+err:
-+ ds_free_qp(*qp);
++ ds_free_qp(qp);
+ return ret;
+}
+
+ union socket_addr src_addr;
+ socklen_t src_len;
+ struct ds_qp *qp;
++ struct ds_dest **tdest, *new_dest;
+ int ret = 0;
+
++ printf("%s \n", __func__);
+ fastlock_acquire(&rs->map_lock);
-+ dest = tfind(addr, &rs->dest_map, ds_compare_addr);
-+ if (dest)
-+ goto out;
-+
-+ if (rs->state == rs_init) {
-+ ret = ds_init_ep(rs);
-+ if (ret)
-+ goto out;
-+ }
++ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
++ printf("%s tfind %p\n", __func__, dest);
++ if (tdest)
++ goto found;
+
+ ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
++ printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno));
+ if (ret)
+ goto out;
+
+ ret = ds_get_qp(rs, &src_addr, src_len, &qp);
++ printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno));
+ if (ret)
+ goto out;
+
-+ if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) {
-+ *dest = calloc(1, sizeof(struct ds_dest));
-+ if (!*dest) {
++ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
++ if (!tdest) {
++ printf("%s adding dest into map\n", __func__);
++ new_dest = calloc(1, sizeof(*new_dest));
++ if (!new_dest) {
+ ret = ERR(ENOMEM);
+ goto out;
+ }
+
-+ memcpy(&(*dest)->addr, addr, addrlen);
-+ (*dest)->qp = qp;
-+ tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr);
++ memcpy(&new_dest->addr, addr, addrlen);
++ new_dest->qp = qp;
++ tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr);
+ }
++
++found:
++ *dest = *tdest;
+out:
+ fastlock_release(&rs->map_lock);
+ return ret;
+ memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
+ ret = rs_do_connect(rs);
+ } else {
++ printf("%s\n", __func__);
++ if (rs->state == rs_init) {
++ ret = ds_init_ep(rs);
++ if (ret)
++ return ret;
++ }
++
+ fastlock_acquire(&rs->slock);
+ ret = connect(rs->udp_sock, addr, addrlen);
++ printf("%s connect %d %s\n", __func__, ret, strerror(errno));
+ if (!ret)
+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
++ printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno));
+ fastlock_release(&rs->slock);
+ }
+ return ret;
}
static int rs_post_write_msg(struct rsocket *rs,
-@@ -903,6 +1559,24 @@ static int rs_post_write(struct rsocket *rs,
+@@ -903,6 +1591,24 @@ static int rs_post_write(struct rsocket *rs,
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
/*
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
-@@ -1046,7 +1720,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1046,7 +1752,7 @@ static int rs_poll_cq(struct rsocket *rs)
rs->state = rs_disconnected;
return 0;
} else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
}
break;
case RS_OP_WRITE:
-@@ -1137,42 +1811,213 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
-
- fastlock_acquire(&rs->cq_lock);
- do {
-- rs_update_credits(rs);
-- ret = rs_poll_cq(rs);
+@@ -1133,46 +1839,217 @@ static int rs_get_cq_event(struct rsocket *rs)
+ */
+ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+ {
+- int ret;
++ int ret;
++
++ fastlock_acquire(&rs->cq_lock);
++ do {
+ rs_update_credits(rs);
+ ret = rs_poll_cq(rs);
+ if (test(rs)) {
+static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+ int ret = 0;
-+
-+ fastlock_acquire(&rs->cq_lock);
-+ do {
+
+ fastlock_acquire(&rs->cq_lock);
+ do {
+- rs_update_credits(rs);
+- ret = rs_poll_cq(rs);
+ ds_poll_cqs(rs);
if (test(rs)) {
-+ printf("%s test succeeded\n", __func__);
++// printf("%s test succeeded\n", __func__);
ret = 0;
break;
- } else if (ret) {
- break;
} else if (nonblock) {
ret = ERR(EWOULDBLOCK);
-+ printf("%s nonblocking \n", __func__);
++// printf("%s nonblocking \n", __func__);
} else if (!rs->cq_armed) {
- ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
-+ printf("%s req notify \n", __func__);
++// printf("%s req notify \n", __func__);
+ ds_req_notify_cqs(rs);
rs->cq_armed = 1;
} else {
- ret = rs_get_cq_event(rs);
+ ret = ds_get_cq_event(rs);
-+ printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
++// printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
fastlock_release(&rs->cq_wait_lock);
fastlock_acquire(&rs->cq_lock);
}
- rs_update_credits(rs);
fastlock_release(&rs->cq_lock);
-+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
++// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
return ret;
}
do {
- ret = rs_process_cq(rs, 1, test);
+ ret = ds_process_cqs(rs, 1, test);
-+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
++// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
if (!ret || nonblock || errno != EWOULDBLOCK)
return ret;
-@@ -1184,7 +2029,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1184,7 +2061,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
(e.tv_usec - s.tv_usec) + 1;
} while (poll_time <= polling_time);
return ret;
}
-@@ -1219,9 +2064,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1219,9 +2096,19 @@ static int rs_can_send(struct rsocket *rs)
(rs->target_sgl[rs->target_sge].length != 0);
}
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1236,7 +2091,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1236,7 +2123,7 @@ static int rs_have_rdata(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
}
static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1245,6 +2100,70 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1245,6 +2132,70 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
!(rs->state & rs_connected);
}
+ struct ds_header *hdr;
+ int ret;
+
-+ret = 0;
-+ printf("%s \n", __func__);
++// printf("%s \n", __func__);
+ if (!(rs->state & rs_readable))
+ return ERR(EINVAL);
+
+ if (!rs_have_rdata(rs)) {
-+ printf("%s need rdata \n", __func__);
++// printf("%s need rdata \n", __func__);
+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_have_rdata);
-+ printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno));
++// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (ret)
+ return ret;
+ }
+ rs->rmsg_head = 0;
+ }
+
++ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+ return len;
+}
+
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
{
size_t left = len;
-@@ -1290,6 +2209,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1290,6 +2241,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
int ret;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1339,7 +2265,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1339,7 +2297,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
rs->rbuf_bytes_avail += rsize;
}
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
-@@ -1348,8 +2274,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1348,8 +2306,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
ret = rrecv(socket, buf, len, flags);
if (ret > 0 && src_addr)
rgetpeername(socket, src_addr, addrlen);
-@@ -1391,14 +2326,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1391,14 +2358,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
struct rs_iomap iom;
int ret;
ret = ERR(ECONNRESET);
break;
}
-@@ -1447,10 +2382,90 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2414,99 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+ struct ds_udp_header hdr;
+ struct msghdr msg;
+ struct iovec miov[8];
++ ssize_t ret;
+
++// printf("%s\n", __func__);
+ if (iovcnt > 8)
+ return ERR(ENOTSUP);
+
+ hdr.tag = htonl(DS_UDP_TAG);
-+ hdr.version = 1;
++ hdr.version = rs->conn_dest->qp->hdr.version;
+ hdr.op = op;
+ hdr.reserved = 0;
+ hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
+ msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
+ msg.msg_iov = miov;
+ msg.msg_iovlen = iovcnt + 1;
-+ return sendmsg(rs->udp_sock, &msg, flags);
++// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
++ ret = sendmsg(rs->udp_sock, &msg, flags);
++ printf("%s ret %d %s\n", __func__, ret, strerror(errno));
++ return ret > 0 ? ret - sizeof hdr : ret;
+}
+
+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
+ int flags, uint8_t op)
+{
+ struct iovec iov;
++ printf("%s\n", __func__);
+ if (buf && len) {
++// printf("%s have buffer\n", __func__);
+ iov.iov_base = (void *) buf;
+ iov.iov_len = len;
+ return ds_sendv_udp(rs, &iov, 1, flags, op);
+ } else {
++// printf("%s no buffer\n", __func__);
+ return ds_sendv_udp(rs, NULL, 0, flags, op);
+ }
+}
+ uint64_t offset;
+ int ret = 0;
+
++ printf("%s\n", __func__);
+ if (!rs->conn_dest->ah)
+ return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
+
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1464,6 +2479,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2520,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
int ret = 0;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1485,7 +2507,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2548,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1538,10 +2560,27 @@ out:
+@@ -1538,10 +2601,39 @@ out:
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
+ struct rsocket *rs;
+ int ret;
+
++ printf("%s\n", __func__);
+ rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_STREAM) {
+ if (dest_addr || addrlen)
+
+ return rsend(socket, buf, len, flags);
+ }
-
-- return rsend(socket, buf, len, flags);
++
++ if (rs->state == rs_init) {
++ ret = ds_init_ep(rs);
++ if (ret)
++ return ret;
++ }
++
+ fastlock_acquire(&rs->slock);
++ printf("%s check conn dest\n", __func__);
+ if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
++ printf("%s need conn dest\n", __func__);
+ ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
+ if (ret)
+ goto out;
+ }
++ else
++ printf("%s connected\n", __func__);
+
+- return rsend(socket, buf, len, flags);
+ ret = dsend(rs, buf, len, flags);
+out:
+ fastlock_release(&rs->slock);
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1600,7 +2639,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2692,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1653,7 +2692,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2745,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1690,8 +2729,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2782,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
int ret;
check_cq:
rs_process_cq(rs, nonblock, test);
revents = 0;
-@@ -1707,6 +2746,16 @@ check_cq:
+@@ -1707,6 +2799,16 @@ check_cq:
}
return revents;
}
if (rs->state == rs_listening) {
-@@ -1766,11 +2815,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2868,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
if (fds[i].revents)
return 1;
rfds[i].events = POLLIN;
} else {
rfds[i].fd = fds[i].fd;
-@@ -1793,7 +2845,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2898,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
-@@ -1949,7 +3004,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +3057,7 @@ int rshutdown(int socket, int how)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
return 0;
}
-@@ -1959,10 +3014,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +3067,10 @@ int rshutdown(int socket, int how)
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
-@@ -1987,13 +3042,29 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3095,29 @@ int rshutdown(int socket, int how)
return 0;
}
rs_free(rs);
return 0;
-@@ -2018,8 +3089,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3142,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2027,8 +3102,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3155,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rsetsockopt(int socket, int level, int optname,
-@@ -2040,18 +3119,26 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,18 +3172,26 @@ int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
opt_on = *(int *) optval;
break;
case SO_RCVBUF:
-@@ -2101,9 +3188,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2101,9 +3241,11 @@ int rsetsockopt(int socket, int level, int optname,
opts = &rs->ipv6_opts;
switch (optname) {
case IPV6_V6ONLY:
opt_on = *(int *) optval;
break;
default:
-@@ -2315,7 +3404,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3457,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2349,7 +3438,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3491,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
return offset;
}
-@@ -2361,7 +3450,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3503,7 @@ int riounmap(int socket, void *buf, size_t len)
int ret = 0;
rs = idm_at(&idm, socket);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
-@@ -2382,7 +3471,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3524,7 @@ int riounmap(int socket, void *buf, size_t len)
}
ret = ERR(EINVAL);
out:
return ret;
}
-@@ -2426,7 +3515,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3568,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -2476,3 +3565,269 @@ out:
+@@ -2476,3 +3618,296 @@ out:
return (ret && left == count) ? ret : count - left;
}
+ }
+
+ svc_rss[++svc_cnt] = rs;
++ printf("%s rs %p\n", __func__, rs);
+ svc_fds[svc_cnt].fd = rs->udp_sock;
+ svc_fds[svc_cnt].events = POLLIN;
+ svc_fds[svc_cnt].revents = 0;
++ printf("add rs udp sock %d\n",rs->udp_sock);
+ return 0;
+}
+
+ struct rs_svc_msg msg;
+
+ read(svc_sock[1], &msg, sizeof msg);
++ printf("%s op %d\n",__func__, msg.op);
+ switch (msg.op) {
+ case RS_SVC_INSERT:
+ msg.status = rs_svc_add_rs(msg.rs);
+ msg.status = ENOTSUP;
+ break;
+ }
++ printf("%s status %d\n",__func__, msg.status);
+ write(svc_sock[1], &msg, sizeof msg);
+}
+
+ struct ibv_ah_attr attr;
+ int ret;
+
++ printf("%s\n",__func__);
+ if (dest->ah) {
+ fastlock_acquire(&rs->slock);
+ ibv_destroy_ah(dest->ah);
+static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
+ union socket_addr *addr)
+{
-+ return (udp_hdr->tag == DS_UDP_TAG) &&
++printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag,
++ udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length);
++
++printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n",
++udp_hdr->tag == ntohl(DS_UDP_TAG),
++ udp_hdr->version == 4, addr->sa.sa_family == AF_INET,
++ udp_hdr->length == DS_UDP_IPV4_HDR_LEN,
++ udp_hdr->version == 6, addr->sa.sa_family == AF_INET6,
++ udp_hdr->length == DS_UDP_IPV6_HDR_LEN);
++
++
++ return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
+ ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
+ udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
+ (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 &&
+ struct ibv_sge sge;
+ uint64_t offset;
+
++ printf("%s\n",__func__);
+ if (!ds_can_send(rs)) {
+ if (ds_get_comp(rs, 0, ds_can_send))
+ return;
+ socklen_t addrlen = sizeof addr;
+ int len, ret;
+
++ printf("%s\n",__func__);
+ ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
++ printf("%s recvfrom %d\n",__func__, ret);
+ if (ret < DS_UDP_IPV4_HDR_LEN)
+ return;
+
+ if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
+ return;
+
++ printf("%s valid hdr\n",__func__);
+ len = ret - udp_hdr->length;
+ udp_hdr->tag = ntohl(udp_hdr->tag);
+ udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
+ ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
++ printf("%s ds_get_dest %d\n",__func__, ret);
+ if (ret)
+ return;
+
+ cur_dest = rs->conn_dest;
+ if (udp_hdr->op == RS_OP_DATA) {
+ rs->conn_dest = &dest->qp->dest;
++ printf("%s forwarding msg\n",__func__);
+ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
+ }
+
+ rs->conn_dest = dest;
++ printf("%s sending resp\n",__func__);
+ ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL);
+ rs->conn_dest = cur_dest;
+ fastlock_release(&rs->slock);
+ struct rs_svc_msg msg;
+ int i, ret;
+
++ printf("%s\n",__func__);
+ ret = rs_svc_grow_sets();
+ if (ret) {
+ msg.status = ret;
+ svc_fds[0].fd = svc_sock[1];
+ svc_fds[0].events = POLLIN;
+ do {
++ printf("%s svc cnt %d\n",__func__, svc_cnt);
+ for (i = 0; i <= svc_cnt; i++)
+ svc_fds[i].revents = 0;
+
++ printf("%s poll\n",__func__);
+ poll(svc_fds, svc_cnt + 1, -1);
++ printf("%s poll done\n",__func__);
+ if (svc_fds[0].revents)
+ rs_svc_process_sock();
+
+ if (svc_fds[i].revents)
+ rs_svc_process_rs(svc_rss[i]);
+ }
-+ } while (svc_cnt > 1);
++ } while (svc_cnt >= 1);
+
+ return NULL;
+}
+++ /dev/null
-Bottom: f1822f3bbe2c9b92b5e2ca8b4e5c3cece427c5ff
-Top: 136936c0a82503ee0da9daccd8b948cd09e58b64
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-12-12 16:49:44 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/cma.c b/src/cma.c
-index 0f58966..ff9b426 100755
---- a/src/cma.c
-+++ b/src/cma.c
-@@ -2238,6 +2238,7 @@ int ucma_max_qpsize(struct rdma_cm_id *id)
- if (id && id_priv->cma_dev) {
- max_size = id_priv->cma_dev->max_qpsize;
- } else {
-+ ucma_init();
- for (i = 0; i < cma_dev_cnt; i++) {
- if (!max_size || max_size > cma_dev_array[i].max_qpsize)
- max_size = cma_dev_array[i].max_qpsize;
-diff --git a/src/rsocket.c b/src/rsocket.c
-index c61d689..6fa4c68 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -399,6 +399,7 @@ static int rs_add_to_svc(struct rsocket *rs)
-
- msg.op = RS_SVC_INSERT;
- msg.status = EINVAL;
-+ printf("%s rs %p\n", __func__, rs);
- msg.rs = rs;
- write(svc_sock[0], &msg, sizeof msg);
- read(svc_sock[0], &msg, sizeof msg);
-@@ -602,7 +603,9 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg)
- if (!ret && rs->state < rs_connected)
- ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
- } else {
-+ printf("%s set nonblock\n", __func__);
- ret = fcntl(rs->epfd, F_SETFL, arg);
-+ printf("%s fcntl %d\n", __func__, ret);
-
- if (!ret && rs->qp_list) {
- qp = rs->qp_list;
-@@ -640,6 +643,8 @@ static void ds_set_qp_size(struct rsocket *rs)
- {
- uint16_t max_size;
-
-+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
-+ rs->rq_size, rs->rbuf_size);
- max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
-
- if (rs->sq_size > max_size)
-@@ -656,6 +661,8 @@ static void ds_set_qp_size(struct rsocket *rs)
- rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
- else
- rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
-+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
-+ rs->rq_size, rs->rbuf_size);
- }
-
- static int rs_init_bufs(struct rsocket *rs)
-@@ -728,15 +735,18 @@ static int ds_init_bufs(struct ds_qp *qp)
- static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
- {
- cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
-+ printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel);
- if (!cm_id->recv_cq_channel)
- return -1;
-
- cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
- cm_id, cm_id->recv_cq_channel, 0);
-+ printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size);
- if (!cm_id->recv_cq)
- goto err1;
-
- if (rs->fd_flags & O_NONBLOCK) {
-+ printf("%s set nonblock\n", __func__);
- if (rs_set_nonblocking(rs, O_NONBLOCK))
- goto err2;
- }
-@@ -876,6 +886,8 @@ static void ds_free_qp(struct ds_qp *qp)
-
- static void ds_free(struct rsocket *rs)
- {
-+ struct ds_qp *qp;
-+
- if (rs->state & (rs_readable | rs_writable))
- rs_remove_from_svc(rs);
-
-@@ -888,12 +900,9 @@ static void ds_free(struct rsocket *rs)
- if (rs->dmsg)
- free(rs->dmsg);
-
-- if (rs->smsg_free)
-- free(rs->smsg_free);
--
-- while (rs->qp_list) {
-- ds_remove_qp(rs, rs->qp_list);
-- ds_free_qp(rs->qp_list);
-+ while ((qp = rs->qp_list)) {
-+ ds_remove_qp(rs, qp);
-+ ds_free_qp(qp);
- }
-
- if (rs->epfd >= 0)
-@@ -1016,6 +1025,40 @@ static int ds_init(struct rsocket *rs, int domain)
- return 0;
- }
-
-+static int ds_init_ep(struct rsocket *rs)
-+{
-+ struct ds_smsg *msg;
-+ int i, ret;
-+
-+ ds_set_qp_size(rs);
-+
-+ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
-+ if (!rs->sbuf)
-+ return ERR(ENOMEM);
-+
-+ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
-+ if (!rs->dmsg)
-+ return ERR(ENOMEM);
-+
-+ rs->sqe_avail = rs->sq_size;
-+ rs->rqe_avail = rs->rq_size;
-+
-+ rs->smsg_free = (struct ds_smsg *) rs->sbuf;
-+ msg = rs->smsg_free;
-+ for (i = 0; i < rs->sq_size - 1; i++) {
-+ msg->next = (void *) msg + RS_SNDLOWAT;
-+ msg = msg->next;
-+ }
-+ msg->next = NULL;
-+
-+ ret = rs_add_to_svc(rs);
-+ if (ret)
-+ return ret;
-+
-+ rs->state = rs_readable | rs_writable;
-+ return 0;
-+}
-+
- int rsocket(int domain, int type, int protocol)
- {
- struct rsocket *rs;
-@@ -1040,6 +1083,7 @@ int rsocket(int domain, int type, int protocol)
- rs->cm_id->route.addr.src_addr.sa_family = domain;
- index = rs->cm_id->channel->fd;
- } else {
-+ printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size);
- ret = ds_init(rs, domain);
- if (ret)
- goto err;
-@@ -1069,12 +1113,12 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
- if (!ret)
- rs->state = rs_bound;
- } else {
-- ret = bind(rs->udp_sock, addr, addrlen);
-- if (!ret) {
-- ret = rs_add_to_svc(rs);
-- if (!ret)
-- rs->state = rs_readable | rs_writable;
-+ if (rs->state == rs_init) {
-+ ret = ds_init_ep(rs);
-+ if (ret)
-+ return ret;
- }
-+ ret = bind(rs->udp_sock, addr, addrlen);
- }
- return ret;
- }
-@@ -1256,41 +1300,6 @@ connected:
- return ret;
- }
-
--static int ds_init_ep(struct rsocket *rs)
--{
-- struct ds_smsg *msg;
-- int i, ret;
--
-- ds_set_qp_size(rs);
--
-- rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
-- if (!rs->sbuf)
-- return ERR(ENOMEM);
--
-- rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
-- if (!rs->dmsg)
-- return ERR(ENOMEM);
--
-- rs->sbuf_bytes_avail = rs->sbuf_size;
-- rs->sqe_avail = rs->sq_size;
-- rs->rqe_avail = rs->rq_size;
--
-- rs->smsg_free = (struct ds_smsg *) rs->sbuf;
-- msg = rs->smsg_free;
-- for (i = 0; i < rs->sq_size - 1; i++) {
-- msg->next = (void *) msg + i * RS_SNDLOWAT;
-- msg = msg->next;
-- }
-- msg->next = NULL;
--
-- ret = rs_add_to_svc(rs);
-- if (ret)
-- return ret;
--
-- rs->state = rs_readable | rs_writable;
-- return 0;
--}
--
- static int rs_any_addr(const union socket_addr *addr)
- {
- if (addr->sa.sa_family == AF_INET) {
-@@ -1374,38 +1383,44 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
- }
-
- static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-- socklen_t addrlen, struct ds_qp **qp)
-+ socklen_t addrlen, struct ds_qp **new_qp)
- {
-+ struct ds_qp *qp;
- struct ibv_qp_init_attr qp_attr;
- struct epoll_event event;
- int i, ret;
-
-- *qp = calloc(1, sizeof(struct ds_qp));
-- if (!*qp)
-+printf("%s\n", __func__);
-+ qp = calloc(1, sizeof(*qp));
-+ if (!qp)
- return ERR(ENOMEM);
-
-- (*qp)->rs = rs;
-- ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
-+ qp->rs = rs;
-+ ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
-+ printf("%s rdma_create_id %d\n", __func__, ret);
- if (ret)
- goto err;
-
-- ds_format_hdr(&(*qp)->hdr, src_addr);
-- ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
-+ ds_format_hdr(&qp->hdr, src_addr);
-+ ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
-+ printf("%s rdma_bind_addr %d\n", __func__, ret);
- if (ret)
- goto err;
-
-- ret = ds_init_bufs(*qp);
-+ ret = ds_init_bufs(qp);
-+ printf("%s ds_init_bufs %d\n", __func__, ret);
- if (ret)
- goto err;
-
-- ret = rs_create_cq(rs, (*qp)->cm_id);
-+ ret = rs_create_cq(rs, qp->cm_id);
-+ printf("%s rs_create_cq %d\n", __func__, ret);
- if (ret)
- goto err;
-
- memset(&qp_attr, 0, sizeof qp_attr);
- qp_attr.qp_context = qp;
-- qp_attr.send_cq = rs->cm_id->send_cq;
-- qp_attr.recv_cq = rs->cm_id->recv_cq;
-+ qp_attr.send_cq = qp->cm_id->send_cq;
-+ qp_attr.recv_cq = qp->cm_id->recv_cq;
- qp_attr.qp_type = IBV_QPT_UD;
- qp_attr.sq_sig_all = 1;
- qp_attr.cap.max_send_wr = rs->sq_size;
-@@ -1413,31 +1428,35 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
- qp_attr.cap.max_send_sge = 2;
- qp_attr.cap.max_recv_sge = 1;
- qp_attr.cap.max_inline_data = rs->sq_inline;
-- ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
-+ ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
-+ printf("%s rdma_create_qp %d\n", __func__, ret);
- if (ret)
- goto err;
-
-- ret = ds_add_qp_dest(*qp, src_addr, addrlen);
-+ ret = ds_add_qp_dest(qp, src_addr, addrlen);
-+ printf("%s ds_add_qp_dest %d\n", __func__, ret);
- if (ret)
- goto err;
-
- event.events = EPOLLIN;
-- event.data.ptr = *qp;
-+ event.data.ptr = qp;
- ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
-- (*qp)->cm_id->recv_cq_channel->fd, &event);
-+ qp->cm_id->recv_cq_channel->fd, &event);
-+ printf("%s epoll_ctl %d\n", __func__, ret);
- if (ret)
- goto err;
-
- for (i = 0; i < rs->rq_size; i++) {
-- ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
-+ ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT);
- if (ret)
- goto err;
- }
-
-- ds_insert_qp(rs, *qp);
-+ ds_insert_qp(rs, qp);
-+ *new_qp = qp;
- return 0;
- err:
-- ds_free_qp(*qp);
-+ ds_free_qp(qp);
- return ret;
- }
-
-@@ -1464,38 +1483,42 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
- union socket_addr src_addr;
- socklen_t src_len;
- struct ds_qp *qp;
-+ struct ds_dest **tdest, *new_dest;
- int ret = 0;
-
-+ printf("%s \n", __func__);
- fastlock_acquire(&rs->map_lock);
-- dest = tfind(addr, &rs->dest_map, ds_compare_addr);
-- if (dest)
-- goto out;
--
-- if (rs->state == rs_init) {
-- ret = ds_init_ep(rs);
-- if (ret)
-- goto out;
-- }
-+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
-+ printf("%s tfind %p\n", __func__, dest);
-+ if (tdest)
-+ goto found;
-
- ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
-+ printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno));
- if (ret)
- goto out;
-
- ret = ds_get_qp(rs, &src_addr, src_len, &qp);
-+ printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno));
- if (ret)
- goto out;
-
-- if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) {
-- *dest = calloc(1, sizeof(struct ds_dest));
-- if (!*dest) {
-+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
-+ if (!tdest) {
-+ printf("%s adding dest into map\n", __func__);
-+ new_dest = calloc(1, sizeof(*new_dest));
-+ if (!new_dest) {
- ret = ERR(ENOMEM);
- goto out;
- }
-
-- memcpy(&(*dest)->addr, addr, addrlen);
-- (*dest)->qp = qp;
-- tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr);
-+ memcpy(&new_dest->addr, addr, addrlen);
-+ new_dest->qp = qp;
-+ tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr);
- }
-+
-+found:
-+ *dest = *tdest;
- out:
- fastlock_release(&rs->map_lock);
- return ret;
-@@ -1511,10 +1534,19 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
- ret = rs_do_connect(rs);
- } else {
-+ printf("%s\n", __func__);
-+ if (rs->state == rs_init) {
-+ ret = ds_init_ep(rs);
-+ if (ret)
-+ return ret;
-+ }
-+
- fastlock_acquire(&rs->slock);
- ret = connect(rs->udp_sock, addr, addrlen);
-+ printf("%s connect %d %s\n", __func__, ret, strerror(errno));
- if (!ret)
- ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-+ printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno));
- fastlock_release(&rs->slock);
- }
- return ret;
-@@ -1983,14 +2015,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r
- do {
- ds_poll_cqs(rs);
- if (test(rs)) {
-- printf("%s test succeeded\n", __func__);
-+// printf("%s test succeeded\n", __func__);
- ret = 0;
- break;
- } else if (nonblock) {
- ret = ERR(EWOULDBLOCK);
-- printf("%s nonblocking \n", __func__);
-+// printf("%s nonblocking \n", __func__);
- } else if (!rs->cq_armed) {
-- printf("%s req notify \n", __func__);
-+// printf("%s req notify \n", __func__);
- ds_req_notify_cqs(rs);
- rs->cq_armed = 1;
- } else {
-@@ -1998,14 +2030,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r
- fastlock_release(&rs->cq_lock);
-
- ret = ds_get_cq_event(rs);
-- printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
-+// printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
- fastlock_release(&rs->cq_wait_lock);
- fastlock_acquire(&rs->cq_lock);
- }
- } while (!ret);
-
- fastlock_release(&rs->cq_lock);
-- printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- return ret;
- }
-
-@@ -2017,7 +2049,7 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
-
- do {
- ret = ds_process_cqs(rs, 1, test);
-- printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- if (!ret || nonblock || errno != EWOULDBLOCK)
- return ret;
-
-@@ -2132,16 +2164,15 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
- struct ds_header *hdr;
- int ret;
-
--ret = 0;
-- printf("%s \n", __func__);
-+// printf("%s \n", __func__);
- if (!(rs->state & rs_readable))
- return ERR(EINVAL);
-
- if (!rs_have_rdata(rs)) {
-- printf("%s need rdata \n", __func__);
-+// printf("%s need rdata \n", __func__);
- ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
- rs_have_rdata);
-- printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno));
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- if (ret)
- return ret;
- }
-@@ -2161,6 +2192,7 @@ ret = 0;
- rs->rmsg_head = 0;
- }
-
-+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- return len;
- }
-
-@@ -2392,12 +2424,14 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- struct ds_udp_header hdr;
- struct msghdr msg;
- struct iovec miov[8];
-+ ssize_t ret;
-
-+// printf("%s\n", __func__);
- if (iovcnt > 8)
- return ERR(ENOTSUP);
-
- hdr.tag = htonl(DS_UDP_TAG);
-- hdr.version = 1;
-+ hdr.version = rs->conn_dest->qp->hdr.version;
- hdr.op = op;
- hdr.reserved = 0;
- hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
-@@ -2419,18 +2453,24 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
- msg.msg_iov = miov;
- msg.msg_iovlen = iovcnt + 1;
-- return sendmsg(rs->udp_sock, &msg, flags);
-+// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
-+ ret = sendmsg(rs->udp_sock, &msg, flags);
-+ printf("%s ret %d %s\n", __func__, ret, strerror(errno));
-+ return ret > 0 ? ret - sizeof hdr : ret;
- }
-
- static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
- int flags, uint8_t op)
- {
- struct iovec iov;
-+ printf("%s\n", __func__);
- if (buf && len) {
-+// printf("%s have buffer\n", __func__);
- iov.iov_base = (void *) buf;
- iov.iov_len = len;
- return ds_sendv_udp(rs, &iov, 1, flags, op);
- } else {
-+// printf("%s no buffer\n", __func__);
- return ds_sendv_udp(rs, NULL, 0, flags, op);
- }
- }
-@@ -2442,6 +2482,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
- uint64_t offset;
- int ret = 0;
-
-+ printf("%s\n", __func__);
- if (!rs->conn_dest->ah)
- return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
-
-@@ -2563,6 +2604,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
- struct rsocket *rs;
- int ret;
-
-+ printf("%s\n", __func__);
- rs = idm_at(&idm, socket);
- if (rs->type == SOCK_STREAM) {
- if (dest_addr || addrlen)
-@@ -2571,12 +2613,23 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
- return rsend(socket, buf, len, flags);
- }
-
-+ if (rs->state == rs_init) {
-+ ret = ds_init_ep(rs);
-+ if (ret)
-+ return ret;
-+ }
-+
- fastlock_acquire(&rs->slock);
-+ printf("%s check conn dest\n", __func__);
- if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
-+ printf("%s need conn dest\n", __func__);
- ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
- if (ret)
- goto out;
- }
-+ else
-+ printf("%s connected\n", __func__);
-+
- ret = dsend(rs, buf, len, flags);
- out:
- fastlock_release(&rs->slock);
-@@ -3605,9 +3658,11 @@ static int rs_svc_add_rs(struct rsocket *rs)
- }
-
- svc_rss[++svc_cnt] = rs;
-+ printf("%s rs %p\n", __func__, rs);
- svc_fds[svc_cnt].fd = rs->udp_sock;
- svc_fds[svc_cnt].events = POLLIN;
- svc_fds[svc_cnt].revents = 0;
-+ printf("add rs udp sock %d\n",rs->udp_sock);
- return 0;
- }
-
-@@ -3631,6 +3686,7 @@ static void rs_svc_process_sock(void)
- struct rs_svc_msg msg;
-
- read(svc_sock[1], &msg, sizeof msg);
-+ printf("%s op %d\n",__func__, msg.op);
- switch (msg.op) {
- case RS_SVC_INSERT:
- msg.status = rs_svc_add_rs(msg.rs);
-@@ -3642,6 +3698,7 @@ static void rs_svc_process_sock(void)
- msg.status = ENOTSUP;
- break;
- }
-+ printf("%s status %d\n",__func__, msg.status);
- write(svc_sock[1], &msg, sizeof msg);
- }
-
-@@ -3675,6 +3732,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
- struct ibv_ah_attr attr;
- int ret;
-
-+ printf("%s\n",__func__);
- if (dest->ah) {
- fastlock_acquire(&rs->slock);
- ibv_destroy_ah(dest->ah);
-@@ -3726,7 +3784,18 @@ out:
- static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
- union socket_addr *addr)
- {
-- return (udp_hdr->tag == DS_UDP_TAG) &&
-+printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag,
-+ udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length);
-+
-+printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n",
-+udp_hdr->tag == ntohl(DS_UDP_TAG),
-+ udp_hdr->version == 4, addr->sa.sa_family == AF_INET,
-+ udp_hdr->length == DS_UDP_IPV4_HDR_LEN,
-+ udp_hdr->version == 6, addr->sa.sa_family == AF_INET6,
-+ udp_hdr->length == DS_UDP_IPV6_HDR_LEN);
-+
-+
-+ return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
- ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
- udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
- (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 &&
-@@ -3741,6 +3810,7 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
- struct ibv_sge sge;
- uint64_t offset;
-
-+ printf("%s\n",__func__);
- if (!ds_can_send(rs)) {
- if (ds_get_comp(rs, 0, ds_can_send))
- return;
-@@ -3769,7 +3839,9 @@ static void rs_svc_process_rs(struct rsocket *rs)
- socklen_t addrlen = sizeof addr;
- int len, ret;
-
-+ printf("%s\n",__func__);
- ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
-+ printf("%s recvfrom %d\n",__func__, ret);
- if (ret < DS_UDP_IPV4_HDR_LEN)
- return;
-
-@@ -3777,10 +3849,12 @@ static void rs_svc_process_rs(struct rsocket *rs)
- if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
- return;
-
-+ printf("%s valid hdr\n",__func__);
- len = ret - udp_hdr->length;
- udp_hdr->tag = ntohl(udp_hdr->tag);
- udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
- ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
-+ printf("%s ds_get_dest %d\n",__func__, ret);
- if (ret)
- return;
-
-@@ -3792,10 +3866,12 @@ static void rs_svc_process_rs(struct rsocket *rs)
- cur_dest = rs->conn_dest;
- if (udp_hdr->op == RS_OP_DATA) {
- rs->conn_dest = &dest->qp->dest;
-+ printf("%s forwarding msg\n",__func__);
- rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
- }
-
- rs->conn_dest = dest;
-+ printf("%s sending resp\n",__func__);
- ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL);
- rs->conn_dest = cur_dest;
- fastlock_release(&rs->slock);
-@@ -3806,6 +3882,7 @@ static void *rs_svc_run(void *arg)
- struct rs_svc_msg msg;
- int i, ret;
-
-+ printf("%s\n",__func__);
- ret = rs_svc_grow_sets();
- if (ret) {
- msg.status = ret;
-@@ -3816,10 +3893,13 @@ static void *rs_svc_run(void *arg)
- svc_fds[0].fd = svc_sock[1];
- svc_fds[0].events = POLLIN;
- do {
-+ printf("%s svc cnt %d\n",__func__, svc_cnt);
- for (i = 0; i <= svc_cnt; i++)
- svc_fds[i].revents = 0;
-
-+ printf("%s poll\n",__func__);
- poll(svc_fds, svc_cnt + 1, -1);
-+ printf("%s poll done\n",__func__);
- if (svc_fds[0].revents)
- rs_svc_process_sock();
-
-@@ -3827,7 +3907,7 @@ static void *rs_svc_run(void *arg)
- if (svc_fds[i].revents)
- rs_svc_process_rs(svc_rss[i]);
- }
-- } while (svc_cnt > 1);
-+ } while (svc_cnt >= 1);
-
- return NULL;
- }