Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
-Top: 49030a049bcacc4789ad20b05e6a7a3ee28c5e0d
+Top: 232d6a57cc2f2d81d4457edeeef4cb9e418b9640
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-11-09 10:26:38 -0800
{
errno = err;
diff --git a/src/rsocket.c b/src/rsocket.c
-index a060f66..04f00dd 100644
+index a060f66..aca705b 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -47,6 +47,8 @@
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
-@@ -64,6 +66,28 @@
+@@ -64,6 +66,36 @@
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+
+struct rsocket;
+
++
++#define PRINTADDR(a) \
++printf("%s port %x ip %x\n", __func__, \
++ ((struct sockaddr_in *)a)->sin_port, \
++ ((struct sockaddr_in *)a)->sin_addr.s_addr)
++
++
++
+struct rs_svc_msg {
+ uint32_t op;
+ uint32_t status;
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
-@@ -100,6 +124,14 @@ enum {
+@@ -100,6 +132,14 @@ enum {
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
#define rs_msg_op(imm_data) (imm_data >> 29)
#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
enum {
RS_CTRL_DISCONNECT,
-@@ -111,6 +143,18 @@ struct rs_msg {
+@@ -111,6 +151,18 @@ struct rs_msg {
uint32_t data;
};
struct rs_sge {
uint64_t addr;
uint32_t key;
-@@ -145,8 +189,6 @@ struct rs_conn_data {
+@@ -145,8 +197,6 @@ struct rs_conn_data {
struct rs_sge data_buf;
};
/*
* rsocket states are ordered as passive, connecting, connected, disconnected.
*/
-@@ -160,9 +202,9 @@ enum rs_state {
+@@ -160,9 +210,9 @@ enum rs_state {
rs_connecting = rs_opening | 0x0040,
rs_accepting = rs_opening | 0x0080,
rs_connected = 0x0100,
rs_connect_error = 0x0800,
rs_disconnected = 0x1000,
rs_error = 0x2000,
-@@ -170,68 +212,249 @@ enum rs_state {
+@@ -170,68 +220,248 @@ enum rs_state {
#define RS_OPT_SWAP_SGL 1
- void *target_buffer_list;
- volatile struct rs_sge *target_sgl;
- struct rs_iomap *target_iomap;
--
++#define DS_UDP_TAG 0x55555555
+
- uint32_t rbuf_size;
- struct ibv_mr *rmr;
- uint8_t *rbuf;
-+#define DS_UDP_TAG 0x55555555
-
+-
- uint32_t sbuf_size;
- struct ibv_mr *smr;
- struct ibv_sge ssgl[2];
+
+ msg.op = RS_SVC_INSERT;
+ msg.status = EINVAL;
-+ printf("%s rs %p\n", __func__, rs);
+ msg.rs = rs;
+ write(svc_sock[0], &msg, sizeof msg);
+ read(svc_sock[0], &msg, sizeof msg);
static int rs_value_to_scale(int value, int bits)
{
return value <= (1 << (bits - 1)) ?
-@@ -307,10 +530,10 @@ out:
+@@ -307,10 +537,10 @@ out:
pthread_mutex_unlock(&mut);
}
pthread_mutex_unlock(&mut);
return rs->index;
}
-@@ -322,7 +545,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -322,7 +552,7 @@ static void rs_remove(struct rsocket *rs)
pthread_mutex_unlock(&mut);
}
{
struct rsocket *rs;
-@@ -330,29 +553,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -330,29 +560,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
if (!rs)
return NULL;
dlist_init(&rs->iomap_list);
dlist_init(&rs->iomap_queue);
return rs;
-@@ -360,13 +593,29 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -360,13 +600,26 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
static int rs_set_nonblocking(struct rsocket *rs, long arg)
{
+ if (!ret && rs->state < rs_connected)
+ ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
+ } else {
-+ printf("%s set nonblock\n", __func__);
+ ret = fcntl(rs->epfd, F_SETFL, arg);
-+ printf("%s fcntl %d\n", __func__, ret);
-+
+ if (!ret && rs->qp_list) {
+ qp = rs->qp_list;
+ do {
return ret;
}
-@@ -390,17 +639,43 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -390,17 +643,39 @@ static void rs_set_qp_size(struct rsocket *rs)
rs->rq_size = 2;
}
+{
+ uint16_t max_size;
+
-+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
-+ rs->rq_size, rs->rbuf_size);
+ max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
+
+ if (rs->sq_size > max_size)
+ rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
+ else
+ rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
-+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
-+ rs->rq_size, rs->rbuf_size);
+}
+
static int rs_init_bufs(struct rsocket *rs)
rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
if (!rs->rmr)
-@@ -440,37 +715,56 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -440,37 +715,57 @@ static int rs_init_bufs(struct rsocket *rs)
return 0;
}
-static int rs_create_cq(struct rsocket *rs)
+static int ds_init_bufs(struct ds_qp *qp)
+{
-+ qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
++ qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh),
++ sizeof(*qp->rbuf));
+ if (!qp->rbuf)
+ return ERR(ENOMEM);
+
+ if (!qp->smr)
+ return -1;
+
-+ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
++ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size +
++ sizeof(struct ibv_grh));
+ if (!qp->rmr)
+ return -1;
+
- rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
- if (!rs->cm_id->recv_cq_channel)
+ cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
-+ printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel);
+ if (!cm_id->recv_cq_channel)
return -1;
- if (!rs->cm_id->recv_cq)
+ cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
+ cm_id, cm_id->recv_cq_channel, 0);
-+ printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size);
+ if (!cm_id->recv_cq)
goto err1;
if (rs->fd_flags & O_NONBLOCK) {
-+ printf("%s set nonblock\n", __func__);
- if (rs_set_nonblocking(rs, O_NONBLOCK))
+- if (rs_set_nonblocking(rs, O_NONBLOCK))
++ if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
goto err2;
++ } else {
++ ibv_req_notify_cq(cm_id->recv_cq, 0);
}
- rs->cm_id->send_cq_channel = rs->cm_id->recv_cq_channel;
{
struct ibv_recv_wr wr, *bad;
-@@ -482,6 +776,23 @@ rs_post_recv(struct rsocket *rs)
+@@ -482,6 +777,26 @@ rs_post_recv(struct rsocket *rs)
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
-+static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
++static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset)
+{
+ struct ibv_recv_wr wr, *bad;
-+ struct ibv_sge sge;
++ struct ibv_sge sge[2];
+
-+ sge.addr = (uintptr_t) buf;
-+ sge.length = RS_SNDLOWAT;
-+ sge.lkey = qp->rmr->lkey;
++ sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size;
++ sge[0].length = sizeof(struct ibv_grh);
++ sge[0].lkey = qp->rmr->lkey;
++ sge[1].addr = (uintptr_t) qp->rbuf + offset;
++ sge[1].length = RS_SNDLOWAT;
++ sge[1].lkey = qp->rmr->lkey;
+
-+ wr.wr_id = ds_recv_wr_id((uint32_t) ((uint8_t *) buf - rs->rbuf));
++ wr.wr_id = ds_recv_wr_id(offset);
+ wr.next = NULL;
-+ wr.sg_list = &sge;
-+ wr.num_sge = 1;
++ wr.sg_list = sge;
++ wr.num_sge = 2;
+
+ return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
+}
static int rs_create_ep(struct rsocket *rs)
{
struct ibv_qp_init_attr qp_attr;
-@@ -492,7 +803,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -492,7 +807,7 @@ static int rs_create_ep(struct rsocket *rs)
if (ret)
return ret;
if (ret)
return ret;
-@@ -549,8 +860,73 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -549,8 +864,73 @@ static void rs_free_iomappings(struct rsocket *rs)
}
}
if (rs->index >= 0)
rs_remove(rs);
-@@ -582,7 +958,7 @@ static void rs_free(struct rsocket *rs)
+@@ -582,7 +962,7 @@ static void rs_free(struct rsocket *rs)
rdma_destroy_id(rs->cm_id);
}
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
-@@ -636,29 +1012,89 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -636,29 +1016,88 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
rs->sseq_comp = ntohs(conn->credits);
}
+ ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
+ if (ret)
+ goto err;
-+
+
+- ret = rs_insert(rs);
+ rs->cm_id->route.addr.src_addr.sa_family = domain;
+ index = rs->cm_id->channel->fd;
+ } else {
-+ printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size);
+ ret = ds_init(rs, domain);
+ if (ret)
+ goto err;
-
-- ret = rs_insert(rs);
++
+ index = rs->udp_sock;
+ }
+
return rs->index;
err:
-@@ -672,9 +1108,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -672,9 +1111,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
int ret;
rs = idm_at(&idm, socket);
return ret;
}
-@@ -710,7 +1155,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -710,7 +1158,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
int ret;
rs = idm_at(&idm, socket);
if (!new_rs)
return ERR(ENOMEM);
-@@ -718,7 +1163,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -718,7 +1166,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
if (ret)
goto err;
if (ret < 0)
goto err;
-@@ -855,13 +1300,256 @@ connected:
+@@ -729,7 +1177,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+ }
+
+ if (rs->fd_flags & O_NONBLOCK)
+- rs_set_nonblocking(new_rs, O_NONBLOCK);
++ fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
+
+ ret = rs_create_ep(new_rs);
+ if (ret)
+@@ -831,7 +1279,7 @@ connected:
+ break;
+ case rs_accepting:
+ if (!(rs->fd_flags & O_NONBLOCK))
+- rs_set_nonblocking(rs, 0);
++ fcntl(rs->cm_id->channel->fd, F_SETFL, 0);
+
+ ret = ucma_complete(rs->cm_id);
+ if (ret)
+@@ -855,13 +1303,251 @@ connected:
return ret;
}
+ int sock, ret;
+ uint16_t port;
+
++// printf("dest: "); PRINTADDR(dest_addr);
+ *src_len = sizeof src_addr;
+ ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
++// printf("src: "); PRINTADDR(src_addr);
+ if (ret || !rs_any_addr(src_addr))
+ return ret;
+
+ *src_len = sizeof src_addr;
+ ret = getsockname(sock, &src_addr->sa, src_len);
+ src_addr->sin.sin_port = port;
++// printf("selected src: ");
+out:
+ close(sock);
+ return ret;
+static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
+{
+ if (addr->sa.sa_family == AF_INET) {
++ PRINTADDR(addr);
+ hdr->version = 4;
+ hdr->length = DS_IPV4_HDR_LEN;
+ hdr->port = addr->sin.sin_port;
+ struct ibv_ah_attr attr;
+ int ret;
+
++// printf("%s\n", __func__);
+ memcpy(&qp->dest.addr, addr, addrlen);
+ qp->dest.qp = qp;
+ qp->dest.qpn = qp->cm_id->qp->qp_num;
+ attr.dlid = port_attr.lid;
+ attr.port_num = qp->cm_id->port_num;
+ qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
++// printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid,
++// attr.port_num, qp->dest.qpn);
+ if (!qp->dest.ah)
+ return ERR(ENOMEM);
+
+ struct epoll_event event;
+ int i, ret;
+
-+printf("%s\n", __func__);
++ PRINTADDR(src_addr);
+ qp = calloc(1, sizeof(*qp));
+ if (!qp)
+ return ERR(ENOMEM);
+
+ qp->rs = rs;
+ ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
-+ printf("%s rdma_create_id %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ ds_format_hdr(&qp->hdr, src_addr);
+ ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
-+ printf("%s rdma_bind_addr %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ ret = ds_init_bufs(qp);
-+ printf("%s ds_init_bufs %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ ret = rs_create_cq(rs, qp->cm_id);
-+ printf("%s rs_create_cq %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ qp_attr.sq_sig_all = 1;
+ qp_attr.cap.max_send_wr = rs->sq_size;
+ qp_attr.cap.max_recv_wr = rs->rq_size;
-+ qp_attr.cap.max_send_sge = 2;
-+ qp_attr.cap.max_recv_sge = 1;
++ qp_attr.cap.max_send_sge = 1;
++ qp_attr.cap.max_recv_sge = 2;
+ qp_attr.cap.max_inline_data = rs->sq_inline;
+ ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
-+ printf("%s rdma_create_qp %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ ret = ds_add_qp_dest(qp, src_addr, addrlen);
-+ printf("%s ds_add_qp_dest %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ event.data.ptr = qp;
+ ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
+ qp->cm_id->recv_cq_channel->fd, &event);
-+ printf("%s epoll_ctl %d\n", __func__, ret);
+ if (ret)
+ goto err;
+
+ for (i = 0; i < rs->rq_size; i++) {
-+ ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT);
++ ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT);
+ if (ret)
+ goto err;
+ }
+ struct ds_dest **tdest, *new_dest;
+ int ret = 0;
+
-+ printf("%s \n", __func__);
++ PRINTADDR(addr);
+ fastlock_acquire(&rs->map_lock);
+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
-+ printf("%s tfind %p\n", __func__, dest);
+ if (tdest)
+ goto found;
+
+ ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
-+ printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno));
++// printf("get src: "); PRINTADDR(&src_addr);
+ if (ret)
+ goto out;
+
+ ret = ds_get_qp(rs, &src_addr, src_len, &qp);
-+ printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno));
+ if (ret)
+ goto out;
+
+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
+ if (!tdest) {
-+ printf("%s adding dest into map\n", __func__);
+ new_dest = calloc(1, sizeof(*new_dest));
+ if (!new_dest) {
+ ret = ERR(ENOMEM);
+ memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
+ ret = rs_do_connect(rs);
+ } else {
-+ printf("%s\n", __func__);
+ if (rs->state == rs_init) {
+ ret = ds_init_ep(rs);
+ if (ret)
+ }
+
+ fastlock_acquire(&rs->slock);
++ PRINTADDR(addr);
+ ret = connect(rs->udp_sock, addr, addrlen);
-+ printf("%s connect %d %s\n", __func__, ret, strerror(errno));
+ if (!ret)
+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-+ printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno));
+ fastlock_release(&rs->slock);
+ }
+ return ret;
}
static int rs_post_write_msg(struct rsocket *rs,
-@@ -903,6 +1591,24 @@ static int rs_post_write(struct rsocket *rs,
+@@ -903,6 +1589,26 @@ static int rs_post_write(struct rsocket *rs,
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
+ wr.wr.ud.ah = rs->conn_dest->ah;
+ wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
+ wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
++// printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah,
++// rs->conn_dest->qpn);
+
+ return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
+}
}
break;
case RS_OP_WRITE:
-@@ -1133,46 +1839,217 @@ static int rs_get_cq_event(struct rsocket *rs)
- */
- static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
- {
-- int ret;
-+ int ret;
-+
-+ fastlock_acquire(&rs->cq_lock);
-+ do {
+@@ -1137,42 +1843,214 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
+
+ fastlock_acquire(&rs->cq_lock);
+ do {
+- rs_update_credits(rs);
+- ret = rs_poll_cq(rs);
+ rs_update_credits(rs);
+ ret = rs_poll_cq(rs);
+ if (test(rs)) {
+ return ret;
+}
+
-+static int ds_valid_recv(void *buf, uint32_t len)
++static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc)
+{
-+ struct ds_header *hdr = (struct ds_header *) buf;
-+ return ((len >= sizeof(*hdr)) &&
++ struct ds_header *hdr;
++
++ hdr = (struct ds_header *) (qp->rbuf + ds_wr_offset(wc->wr_id));
++ return ((wc->byte_len >= sizeof(struct ibv_grh) + sizeof(*hdr)) &&
+ ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
+ (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
+}
+
+ if (ds_wr_is_recv(wc.wr_id)) {
+ if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
-+ ds_valid_recv(qp->rbuf + ds_wr_offset(wc.wr_id),
-+ wc.byte_len)) {
++ ds_valid_recv(qp, &wc)) {
+ rs->rqe_avail--;
+ rmsg = &rs->dmsg[rs->rmsg_tail];
+ rmsg->qp = qp;
+ rmsg->offset = ds_wr_offset(wc.wr_id);
-+ rmsg->length = wc.byte_len;
++ rmsg->length = wc.byte_len - sizeof(struct ibv_grh);
+ if (++rs->rmsg_tail == rs->rq_size + 1)
+ rs->rmsg_tail = 0;
+ } else {
-+ ds_post_recv(rs, qp, qp->rbuf +
-+ ds_wr_offset(wc.wr_id));
++ printf("%s invalid recv\n", __func__);
++ ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id));
+ }
+ } else {
+ smsg = (struct ds_smsg *)
+ (rs->sbuf + ds_wr_offset(wc.wr_id));
++ printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
+ smsg->next = rs->smsg_free;
+ rs->smsg_free = smsg;
+ rs->sqe_avail++;
+ void *context;
+ int ret;
+
-+ printf("%s \n", __func__);
+ if (!rs->cq_armed)
+ return 0;
+
++// printf("wait for epoll event\n");
+ ret = epoll_wait(rs->epfd, &event, 1, -1);
-+ printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
++// printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (ret <= 0)
+ return ret;
+
+ qp = event.data.ptr;
+ ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context);
-+ printf("%s get cq event ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (!ret) {
+ ibv_ack_cq_events(qp->cm_id->recv_cq, 1);
+ qp->cq_armed = 0;
+static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+ int ret = 0;
-
- fastlock_acquire(&rs->cq_lock);
- do {
-- rs_update_credits(rs);
-- ret = rs_poll_cq(rs);
++
++ fastlock_acquire(&rs->cq_lock);
++ do {
+ ds_poll_cqs(rs);
if (test(rs)) {
+// printf("%s test succeeded\n", __func__);
if (!ret || nonblock || errno != EWOULDBLOCK)
return ret;
-@@ -1184,7 +2061,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1184,7 +2062,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
(e.tv_usec - s.tv_usec) + 1;
} while (poll_time <= polling_time);
return ret;
}
-@@ -1219,9 +2096,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1219,9 +2097,19 @@ static int rs_can_send(struct rsocket *rs)
(rs->target_sgl[rs->target_sge].length != 0);
}
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1236,7 +2123,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1236,7 +2124,7 @@ static int rs_have_rdata(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
}
static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1245,6 +2132,70 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1245,6 +2133,73 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
!(rs->state & rs_connected);
}
+ struct ds_header *hdr;
+ int ret;
+
-+// printf("%s \n", __func__);
++ printf("%s \n", __func__);
+ if (!(rs->state & rs_readable))
+ return ERR(EINVAL);
+
+ if (!rs_have_rdata(rs)) {
-+// printf("%s need rdata \n", __func__);
++ printf("%s need rdata \n", __func__);
+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_have_rdata);
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
++ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (ret)
+ return ret;
+ }
+
+ memcpy(buf, (void *) hdr + hdr->length, len);
+ if (addrlen)
++{
+ ds_set_src(src_addr, addrlen, hdr);
++PRINTADDR(src_addr);
++}
+
+ if (!(flags & MSG_PEEK)) {
-+ ds_post_recv(rs, rmsg->qp, hdr);
++ ds_post_recv(rs, rmsg->qp, rmsg->offset);
+ if (++rs->rmsg_head == rs->rq_size + 1)
+ rs->rmsg_head = 0;
+ }
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
{
size_t left = len;
-@@ -1290,6 +2241,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1290,6 +2245,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
int ret;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1339,7 +2297,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1339,7 +2301,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
rs->rbuf_bytes_avail += rsize;
}
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
-@@ -1348,8 +2306,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1348,8 +2310,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
ret = rrecv(socket, buf, len, flags);
if (ret > 0 && src_addr)
rgetpeername(socket, src_addr, addrlen);
-@@ -1391,14 +2358,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1391,14 +2362,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
struct rs_iomap iom;
int ret;
ret = ERR(ECONNRESET);
break;
}
-@@ -1447,10 +2414,99 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2418,99 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+ }
+
+ miov[0].iov_base = &hdr;
-+ miov[0].iov_len = sizeof hdr;
++ miov[0].iov_len = hdr.length;
+ if (iov && iovcnt)
+ memcpy(&miov[1], iov, sizeof *iov * iovcnt);
+
+ msg.msg_iovlen = iovcnt + 1;
+// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
+ ret = sendmsg(rs->udp_sock, &msg, flags);
-+ printf("%s ret %d %s\n", __func__, ret, strerror(errno));
+ return ret > 0 ? ret - sizeof hdr : ret;
+}
+
+ int flags, uint8_t op)
+{
+ struct iovec iov;
-+ printf("%s\n", __func__);
++// printf("%s\n", __func__);
+ if (buf && len) {
+// printf("%s have buffer\n", __func__);
+ iov.iov_base = (void *) buf;
+ sge.lkey = rs->conn_dest->qp->smr->lkey;
+ offset = (uint8_t *) msg - rs->sbuf;
+
++ printf("%s - sending over QP\n", __func__);
+ ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
+ return ret ? ret : len;
+}
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1464,6 +2520,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2524,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
int ret = 0;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1485,7 +2548,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2552,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1538,10 +2601,39 @@ out:
+@@ -1538,10 +2605,36 @@ out:
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
- return ERR(EISCONN);
+ struct rsocket *rs;
+ int ret;
-+
-+ printf("%s\n", __func__);
+
+- return rsend(socket, buf, len, flags);
++ PRINTADDR(dest_addr);
++ printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
+ rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_STREAM) {
+ if (dest_addr || addrlen)
+ }
+
+ fastlock_acquire(&rs->slock);
-+ printf("%s check conn dest\n", __func__);
+ if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
-+ printf("%s need conn dest\n", __func__);
+ ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
+ if (ret)
+ goto out;
+ }
-+ else
-+ printf("%s connected\n", __func__);
-
-- return rsend(socket, buf, len, flags);
++
+ ret = dsend(rs, buf, len, flags);
+out:
+ fastlock_release(&rs->slock);
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1600,7 +2692,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2693,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1653,7 +2745,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2746,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1690,8 +2782,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2783,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
int ret;
check_cq:
rs_process_cq(rs, nonblock, test);
revents = 0;
-@@ -1707,6 +2799,16 @@ check_cq:
+@@ -1707,6 +2800,16 @@ check_cq:
}
return revents;
}
if (rs->state == rs_listening) {
-@@ -1766,11 +2868,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2869,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
if (fds[i].revents)
return 1;
rfds[i].events = POLLIN;
} else {
rfds[i].fd = fds[i].fd;
-@@ -1793,7 +2898,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2899,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
-@@ -1949,7 +3057,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +3058,7 @@ int rshutdown(int socket, int how)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
return 0;
}
-@@ -1959,10 +3067,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +3068,10 @@ int rshutdown(int socket, int how)
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
-@@ -1987,13 +3095,29 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3096,29 @@ int rshutdown(int socket, int how)
return 0;
}
rs_free(rs);
return 0;
-@@ -2018,8 +3142,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3143,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2027,8 +3155,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3156,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rsetsockopt(int socket, int level, int optname,
-@@ -2040,18 +3172,26 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,22 +3173,31 @@ int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
opt_on = *(int *) optval;
break;
case SO_RCVBUF:
-@@ -2101,9 +3241,11 @@ int rsetsockopt(int socket, int level, int optname,
+- if (!rs->rbuf)
++ if ((rs->type == SOCK_STREAM && !rs->rbuf) ||
++ (rs->type == SOCK_DGRAM && !rs->qp_list))
+ rs->rbuf_size = (*(uint32_t *) optval) << 1;
+ ret = 0;
+ break;
+@@ -2101,9 +3243,11 @@ int rsetsockopt(int socket, int level, int optname,
opts = &rs->ipv6_opts;
switch (optname) {
case IPV6_V6ONLY:
opt_on = *(int *) optval;
break;
default:
-@@ -2315,7 +3457,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3459,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2349,7 +3491,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3493,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
return offset;
}
-@@ -2361,7 +3503,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3505,7 @@ int riounmap(int socket, void *buf, size_t len)
int ret = 0;
rs = idm_at(&idm, socket);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
-@@ -2382,7 +3524,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3526,7 @@ int riounmap(int socket, void *buf, size_t len)
}
ret = ERR(EINVAL);
out:
return ret;
}
-@@ -2426,7 +3568,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3570,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -2476,3 +3618,302 @@ out:
+@@ -2476,3 +3620,278 @@ out:
return (ret && left == count) ? ret : count - left;
}
+ }
+
+ svc_rss[++svc_cnt] = rs;
-+ printf("%s rs %p\n", __func__, rs);
+ svc_fds[svc_cnt].fd = rs->udp_sock;
+ svc_fds[svc_cnt].events = POLLIN;
+ svc_fds[svc_cnt].revents = 0;
-+ printf("add rs udp sock %d\n",rs->udp_sock);
+ return 0;
+}
+
+ struct rs_svc_msg msg;
+
+ read(svc_sock[1], &msg, sizeof msg);
-+ printf("%s op %d\n",__func__, msg.op);
+ switch (msg.op) {
+ case RS_SVC_INSERT:
+ msg.status = rs_svc_add_rs(msg.rs);
+ msg.status = ENOTSUP;
+ break;
+ }
-+ printf("%s status %d\n",__func__, msg.status);
+ write(svc_sock[1], &msg, sizeof msg);
+}
+
+ struct ibv_ah_attr attr;
+ int ret;
+
-+ printf("%s\n",__func__);
+ if (dest->ah) {
+ fastlock_acquire(&rs->slock);
+ ibv_destroy_ah(dest->ah);
+ }
+
+ ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps);
-+ printf("%s rdma_create_id %d %s\n",__func__, ret, strerror(errno));
+ if (ret)
+ return;
+
+ else
+ saddr.sin6.sin6_port = 0;
+ ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000);
-+ printf("%s rdma_resolve_addr %d %s\n",__func__, ret, strerror(errno));
+ if (ret)
+ goto out;
+
+ ret = rdma_resolve_route(id, 2000);
-+ printf("%s rdma_resolve_route %d %s\n",__func__, ret, strerror(errno));
+ if (ret)
+ goto out;
+
+ attr.static_rate = id->route.path_rec->rate;
+ attr.port_num = id->port_num;
+
-+ printf("%s getting slock \n",__func__);
+ fastlock_acquire(&rs->slock);
-+ printf("%s why am I not here? \n",__func__);
+ dest->qpn = qpn;
+ dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr);
-+ printf("%s ibv_create_ah %p %s\n",__func__, dest->ah, strerror(errno));
+ fastlock_release(&rs->slock);
+out:
+ rdma_destroy_id(id);
+static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
+ union socket_addr *addr)
+{
-+printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag,
-+ udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length);
-+
-+printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n",
-+udp_hdr->tag == ntohl(DS_UDP_TAG),
-+ udp_hdr->version == 4, addr->sa.sa_family == AF_INET,
-+ udp_hdr->length == DS_UDP_IPV4_HDR_LEN,
-+ udp_hdr->version == 6, addr->sa.sa_family == AF_INET6,
-+ udp_hdr->length == DS_UDP_IPV6_HDR_LEN);
-+
-+
+ return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
+ ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
+ udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
+ struct ibv_sge sge;
+ uint64_t offset;
+
-+ printf("%s\n",__func__);
++// PRINTADDR(src);
+ if (!ds_can_send(rs)) {
+ if (ds_get_comp(rs, 0, ds_can_send))
+ return;
+ rs->sqe_avail--;
+
+ ds_format_hdr(&hdr, src);
++// printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version,
++// hdr.length, hdr.port);
+ memcpy((void *) msg, &hdr, hdr.length);
+ memcpy((void *) msg + hdr.length, buf, len);
++// printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf));
+ sge.addr = (uintptr_t) msg;
+ sge.length = hdr.length + len;
+ sge.lkey = rs->conn_dest->qp->smr->lkey;
+ offset = (uint8_t *) msg - rs->sbuf;
+
++// printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version,
++// ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port);
+ ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
+}
+
+ socklen_t addrlen = sizeof addr;
+ int len, ret;
+
-+ printf("%s\n",__func__);
+ ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
-+ printf("%s recvfrom %d\n",__func__, ret);
++// PRINTADDR(&addr);
++// printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8]));
+ if (ret < DS_UDP_IPV4_HDR_LEN)
+ return;
+
+ if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
+ return;
+
-+ printf("%s valid hdr\n",__func__);
+ len = ret - udp_hdr->length;
+ udp_hdr->tag = ntohl(udp_hdr->tag);
+ udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
+ ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
-+ printf("%s ds_get_dest %d\n",__func__, ret);
+ if (ret)
+ return;
+
+ rs_svc_create_ah(rs, dest, udp_hdr->qpn);
+
+ /* to do: handle when dest local ip address doesn't match udp ip */
++ if (udp_hdr->op != RS_OP_DATA)
++ return;
++
+ fastlock_acquire(&rs->slock);
+ cur_dest = rs->conn_dest;
-+ if (udp_hdr->op == RS_OP_DATA) {
-+ rs->conn_dest = &dest->qp->dest;
-+ printf("%s forwarding msg\n",__func__);
-+ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
-+ }
++ rs->conn_dest = &dest->qp->dest;
++ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
+
+ rs->conn_dest = dest;
-+ printf("%s sending resp\n",__func__);
+ ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
+ rs->conn_dest = cur_dest;
+ fastlock_release(&rs->slock);
+ struct rs_svc_msg msg;
+ int i, ret;
+
-+ printf("%s\n",__func__);
+ ret = rs_svc_grow_sets();
+ if (ret) {
+ msg.status = ret;
+ svc_fds[0].fd = svc_sock[1];
+ svc_fds[0].events = POLLIN;
+ do {
-+ printf("%s svc cnt %d\n",__func__, svc_cnt);
+ for (i = 0; i <= svc_cnt; i++)
+ svc_fds[i].revents = 0;
+
-+ printf("%s poll\n",__func__);
+ poll(svc_fds, svc_cnt + 1, -1);
-+ printf("%s poll done\n",__func__);
+ if (svc_fds[0].revents)
+ rs_svc_process_sock();
+
+++ /dev/null
-Bottom: 49030a049bcacc4789ad20b05e6a7a3ee28c5e0d
-Top: 232d6a57cc2f2d81d4457edeeef4cb9e418b9640
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-12-15 00:15:42 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 04f00dd..aca705b 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -73,6 +73,14 @@ enum {
-
- struct rsocket;
-
-+
-+#define PRINTADDR(a) \
-+printf("%s port %x ip %x\n", __func__, \
-+ ((struct sockaddr_in *)a)->sin_port, \
-+ ((struct sockaddr_in *)a)->sin_addr.s_addr)
-+
-+
-+
- struct rs_svc_msg {
- uint32_t op;
- uint32_t status;
-@@ -399,7 +407,6 @@ static int rs_add_to_svc(struct rsocket *rs)
-
- msg.op = RS_SVC_INSERT;
- msg.status = EINVAL;
-- printf("%s rs %p\n", __func__, rs);
- msg.rs = rs;
- write(svc_sock[0], &msg, sizeof msg);
- read(svc_sock[0], &msg, sizeof msg);
-@@ -603,10 +610,7 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg)
- if (!ret && rs->state < rs_connected)
- ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
- } else {
-- printf("%s set nonblock\n", __func__);
- ret = fcntl(rs->epfd, F_SETFL, arg);
-- printf("%s fcntl %d\n", __func__, ret);
--
- if (!ret && rs->qp_list) {
- qp = rs->qp_list;
- do {
-@@ -643,8 +647,6 @@ static void ds_set_qp_size(struct rsocket *rs)
- {
- uint16_t max_size;
-
-- printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
-- rs->rq_size, rs->rbuf_size);
- max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
-
- if (rs->sq_size > max_size)
-@@ -661,8 +663,6 @@ static void ds_set_qp_size(struct rsocket *rs)
- rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
- else
- rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
-- printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
-- rs->rq_size, rs->rbuf_size);
- }
-
- static int rs_init_bufs(struct rsocket *rs)
-@@ -717,7 +717,8 @@ static int rs_init_bufs(struct rsocket *rs)
-
- static int ds_init_bufs(struct ds_qp *qp)
- {
-- qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
-+ qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh),
-+ sizeof(*qp->rbuf));
- if (!qp->rbuf)
- return ERR(ENOMEM);
-
-@@ -725,7 +726,8 @@ static int ds_init_bufs(struct ds_qp *qp)
- if (!qp->smr)
- return -1;
-
-- qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
-+ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size +
-+ sizeof(struct ibv_grh));
- if (!qp->rmr)
- return -1;
-
-@@ -735,20 +737,19 @@ static int ds_init_bufs(struct ds_qp *qp)
- static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
- {
- cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
-- printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel);
- if (!cm_id->recv_cq_channel)
- return -1;
-
- cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
- cm_id, cm_id->recv_cq_channel, 0);
-- printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size);
- if (!cm_id->recv_cq)
- goto err1;
-
- if (rs->fd_flags & O_NONBLOCK) {
-- printf("%s set nonblock\n", __func__);
-- if (rs_set_nonblocking(rs, O_NONBLOCK))
-+ if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
- goto err2;
-+ } else {
-+ ibv_req_notify_cq(cm_id->recv_cq, 0);
- }
-
- cm_id->send_cq_channel = cm_id->recv_cq_channel;
-@@ -776,19 +777,22 @@ static inline int rs_post_recv(struct rsocket *rs)
- return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
- }
-
--static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
-+static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset)
- {
- struct ibv_recv_wr wr, *bad;
-- struct ibv_sge sge;
-+ struct ibv_sge sge[2];
-
-- sge.addr = (uintptr_t) buf;
-- sge.length = RS_SNDLOWAT;
-- sge.lkey = qp->rmr->lkey;
-+ sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size;
-+ sge[0].length = sizeof(struct ibv_grh);
-+ sge[0].lkey = qp->rmr->lkey;
-+ sge[1].addr = (uintptr_t) qp->rbuf + offset;
-+ sge[1].length = RS_SNDLOWAT;
-+ sge[1].lkey = qp->rmr->lkey;
-
-- wr.wr_id = ds_recv_wr_id((uint32_t) ((uint8_t *) buf - rs->rbuf));
-+ wr.wr_id = ds_recv_wr_id(offset);
- wr.next = NULL;
-- wr.sg_list = &sge;
-- wr.num_sge = 1;
-+ wr.sg_list = sge;
-+ wr.num_sge = 2;
-
- return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
- }
-@@ -1083,7 +1087,6 @@ int rsocket(int domain, int type, int protocol)
- rs->cm_id->route.addr.src_addr.sa_family = domain;
- index = rs->cm_id->channel->fd;
- } else {
-- printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size);
- ret = ds_init(rs, domain);
- if (ret)
- goto err;
-@@ -1174,7 +1177,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
- }
-
- if (rs->fd_flags & O_NONBLOCK)
-- rs_set_nonblocking(new_rs, O_NONBLOCK);
-+ fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
-
- ret = rs_create_ep(new_rs);
- if (ret)
-@@ -1276,7 +1279,7 @@ connected:
- break;
- case rs_accepting:
- if (!(rs->fd_flags & O_NONBLOCK))
-- rs_set_nonblocking(rs, 0);
-+ fcntl(rs->cm_id->channel->fd, F_SETFL, 0);
-
- ret = ucma_complete(rs->cm_id);
- if (ret)
-@@ -1318,8 +1321,10 @@ static int ds_get_src_addr(struct rsocket *rs,
- int sock, ret;
- uint16_t port;
-
-+// printf("dest: "); PRINTADDR(dest_addr);
- *src_len = sizeof src_addr;
- ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
-+// printf("src: "); PRINTADDR(src_addr);
- if (ret || !rs_any_addr(src_addr))
- return ret;
-
-@@ -1335,6 +1340,7 @@ static int ds_get_src_addr(struct rsocket *rs,
- *src_len = sizeof src_addr;
- ret = getsockname(sock, &src_addr->sa, src_len);
- src_addr->sin.sin_port = port;
-+// printf("selected src: ");
- out:
- close(sock);
- return ret;
-@@ -1343,6 +1349,7 @@ out:
- static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
- {
- if (addr->sa.sa_family == AF_INET) {
-+ PRINTADDR(addr);
- hdr->version = 4;
- hdr->length = DS_IPV4_HDR_LEN;
- hdr->port = addr->sin.sin_port;
-@@ -1363,6 +1370,7 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
- struct ibv_ah_attr attr;
- int ret;
-
-+// printf("%s\n", __func__);
- memcpy(&qp->dest.addr, addr, addrlen);
- qp->dest.qp = qp;
- qp->dest.qpn = qp->cm_id->qp->qp_num;
-@@ -1375,6 +1383,8 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
- attr.dlid = port_attr.lid;
- attr.port_num = qp->cm_id->port_num;
- qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
-+// printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid,
-+// attr.port_num, qp->dest.qpn);
- if (!qp->dest.ah)
- return ERR(ENOMEM);
-
-@@ -1390,30 +1400,26 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
- struct epoll_event event;
- int i, ret;
-
--printf("%s\n", __func__);
-+ PRINTADDR(src_addr);
- qp = calloc(1, sizeof(*qp));
- if (!qp)
- return ERR(ENOMEM);
-
- qp->rs = rs;
- ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
-- printf("%s rdma_create_id %d\n", __func__, ret);
- if (ret)
- goto err;
-
- ds_format_hdr(&qp->hdr, src_addr);
- ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
-- printf("%s rdma_bind_addr %d\n", __func__, ret);
- if (ret)
- goto err;
-
- ret = ds_init_bufs(qp);
-- printf("%s ds_init_bufs %d\n", __func__, ret);
- if (ret)
- goto err;
-
- ret = rs_create_cq(rs, qp->cm_id);
-- printf("%s rs_create_cq %d\n", __func__, ret);
- if (ret)
- goto err;
-
-@@ -1425,16 +1431,14 @@ printf("%s\n", __func__);
- qp_attr.sq_sig_all = 1;
- qp_attr.cap.max_send_wr = rs->sq_size;
- qp_attr.cap.max_recv_wr = rs->rq_size;
-- qp_attr.cap.max_send_sge = 2;
-- qp_attr.cap.max_recv_sge = 1;
-+ qp_attr.cap.max_send_sge = 1;
-+ qp_attr.cap.max_recv_sge = 2;
- qp_attr.cap.max_inline_data = rs->sq_inline;
- ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
-- printf("%s rdma_create_qp %d\n", __func__, ret);
- if (ret)
- goto err;
-
- ret = ds_add_qp_dest(qp, src_addr, addrlen);
-- printf("%s ds_add_qp_dest %d\n", __func__, ret);
- if (ret)
- goto err;
-
-@@ -1442,12 +1446,11 @@ printf("%s\n", __func__);
- event.data.ptr = qp;
- ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
- qp->cm_id->recv_cq_channel->fd, &event);
-- printf("%s epoll_ctl %d\n", __func__, ret);
- if (ret)
- goto err;
-
- for (i = 0; i < rs->rq_size; i++) {
-- ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT);
-+ ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT);
- if (ret)
- goto err;
- }
-@@ -1486,26 +1489,23 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
- struct ds_dest **tdest, *new_dest;
- int ret = 0;
-
-- printf("%s \n", __func__);
-+ PRINTADDR(addr);
- fastlock_acquire(&rs->map_lock);
- tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
-- printf("%s tfind %p\n", __func__, dest);
- if (tdest)
- goto found;
-
- ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
-- printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno));
-+// printf("get src: "); PRINTADDR(&src_addr);
- if (ret)
- goto out;
-
- ret = ds_get_qp(rs, &src_addr, src_len, &qp);
-- printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno));
- if (ret)
- goto out;
-
- tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
- if (!tdest) {
-- printf("%s adding dest into map\n", __func__);
- new_dest = calloc(1, sizeof(*new_dest));
- if (!new_dest) {
- ret = ERR(ENOMEM);
-@@ -1534,7 +1534,6 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
- ret = rs_do_connect(rs);
- } else {
-- printf("%s\n", __func__);
- if (rs->state == rs_init) {
- ret = ds_init_ep(rs);
- if (ret)
-@@ -1542,11 +1541,10 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- }
-
- fastlock_acquire(&rs->slock);
-+ PRINTADDR(addr);
- ret = connect(rs->udp_sock, addr, addrlen);
-- printf("%s connect %d %s\n", __func__, ret, strerror(errno));
- if (!ret)
- ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-- printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno));
- fastlock_release(&rs->slock);
- }
- return ret;
-@@ -1605,6 +1603,8 @@ static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
- wr.wr.ud.ah = rs->conn_dest->ah;
- wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
- wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
-+// printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah,
-+// rs->conn_dest->qpn);
-
- return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
- }
-@@ -1894,10 +1894,12 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
- return ret;
- }
-
--static int ds_valid_recv(void *buf, uint32_t len)
-+static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc)
- {
-- struct ds_header *hdr = (struct ds_header *) buf;
-- return ((len >= sizeof(*hdr)) &&
-+ struct ds_header *hdr;
-+
-+ hdr = (struct ds_header *) (qp->rbuf + ds_wr_offset(wc->wr_id));
-+ return ((wc->byte_len >= sizeof(struct ibv_grh) + sizeof(*hdr)) &&
- ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
- (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
- }
-@@ -1931,22 +1933,22 @@ static void ds_poll_cqs(struct rsocket *rs)
-
- if (ds_wr_is_recv(wc.wr_id)) {
- if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
-- ds_valid_recv(qp->rbuf + ds_wr_offset(wc.wr_id),
-- wc.byte_len)) {
-+ ds_valid_recv(qp, &wc)) {
- rs->rqe_avail--;
- rmsg = &rs->dmsg[rs->rmsg_tail];
- rmsg->qp = qp;
- rmsg->offset = ds_wr_offset(wc.wr_id);
-- rmsg->length = wc.byte_len;
-+ rmsg->length = wc.byte_len - sizeof(struct ibv_grh);
- if (++rs->rmsg_tail == rs->rq_size + 1)
- rs->rmsg_tail = 0;
- } else {
-- ds_post_recv(rs, qp, qp->rbuf +
-- ds_wr_offset(wc.wr_id));
-+ printf("%s invalid recv\n", __func__);
-+ ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id));
- }
- } else {
- smsg = (struct ds_smsg *)
- (rs->sbuf + ds_wr_offset(wc.wr_id));
-+ printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
- smsg->next = rs->smsg_free;
- rs->smsg_free = smsg;
- rs->sqe_avail++;
-@@ -1986,18 +1988,17 @@ static int ds_get_cq_event(struct rsocket *rs)
- void *context;
- int ret;
-
-- printf("%s \n", __func__);
- if (!rs->cq_armed)
- return 0;
-
-+// printf("wait for epoll event\n");
- ret = epoll_wait(rs->epfd, &event, 1, -1);
-- printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
-+// printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
- if (ret <= 0)
- return ret;
-
- qp = event.data.ptr;
- ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context);
-- printf("%s get cq event ret %d errno %s\n", __func__, ret, strerror(errno));
- if (!ret) {
- ibv_ack_cq_events(qp->cm_id->recv_cq, 1);
- qp->cq_armed = 0;
-@@ -2164,15 +2165,15 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
- struct ds_header *hdr;
- int ret;
-
--// printf("%s \n", __func__);
-+ printf("%s \n", __func__);
- if (!(rs->state & rs_readable))
- return ERR(EINVAL);
-
- if (!rs_have_rdata(rs)) {
--// printf("%s need rdata \n", __func__);
-+ printf("%s need rdata \n", __func__);
- ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
- rs_have_rdata);
--// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- if (ret)
- return ret;
- }
-@@ -2184,10 +2185,13 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
-
- memcpy(buf, (void *) hdr + hdr->length, len);
- if (addrlen)
-+{
- ds_set_src(src_addr, addrlen, hdr);
-+PRINTADDR(src_addr);
-+}
-
- if (!(flags & MSG_PEEK)) {
-- ds_post_recv(rs, rmsg->qp, hdr);
-+ ds_post_recv(rs, rmsg->qp, rmsg->offset);
- if (++rs->rmsg_head == rs->rq_size + 1)
- rs->rmsg_head = 0;
- }
-@@ -2444,7 +2448,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- }
-
- miov[0].iov_base = &hdr;
-- miov[0].iov_len = sizeof hdr;
-+ miov[0].iov_len = hdr.length;
- if (iov && iovcnt)
- memcpy(&miov[1], iov, sizeof *iov * iovcnt);
-
-@@ -2455,7 +2459,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- msg.msg_iovlen = iovcnt + 1;
- // printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
- ret = sendmsg(rs->udp_sock, &msg, flags);
-- printf("%s ret %d %s\n", __func__, ret, strerror(errno));
- return ret > 0 ? ret - sizeof hdr : ret;
- }
-
-@@ -2463,7 +2466,7 @@ static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
- int flags, uint8_t op)
- {
- struct iovec iov;
-- printf("%s\n", __func__);
-+// printf("%s\n", __func__);
- if (buf && len) {
- // printf("%s have buffer\n", __func__);
- iov.iov_base = (void *) buf;
-@@ -2503,6 +2506,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
- sge.lkey = rs->conn_dest->qp->smr->lkey;
- offset = (uint8_t *) msg - rs->sbuf;
-
-+ printf("%s - sending over QP\n", __func__);
- ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
- return ret ? ret : len;
- }
-@@ -2604,7 +2608,8 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
- struct rsocket *rs;
- int ret;
-
-- printf("%s\n", __func__);
-+ PRINTADDR(dest_addr);
-+ printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
- rs = idm_at(&idm, socket);
- if (rs->type == SOCK_STREAM) {
- if (dest_addr || addrlen)
-@@ -2620,15 +2625,11 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
- }
-
- fastlock_acquire(&rs->slock);
-- printf("%s check conn dest\n", __func__);
- if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
-- printf("%s need conn dest\n", __func__);
- ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
- if (ret)
- goto out;
- }
-- else
-- printf("%s connected\n", __func__);
-
- ret = dsend(rs, buf, len, flags);
- out:
-@@ -3195,7 +3196,8 @@ int rsetsockopt(int socket, int level, int optname,
- opt_on = *(int *) optval;
- break;
- case SO_RCVBUF:
-- if (!rs->rbuf)
-+ if ((rs->type == SOCK_STREAM && !rs->rbuf) ||
-+ (rs->type == SOCK_DGRAM && !rs->qp_list))
- rs->rbuf_size = (*(uint32_t *) optval) << 1;
- ret = 0;
- break;
-@@ -3658,11 +3660,9 @@ static int rs_svc_add_rs(struct rsocket *rs)
- }
-
- svc_rss[++svc_cnt] = rs;
-- printf("%s rs %p\n", __func__, rs);
- svc_fds[svc_cnt].fd = rs->udp_sock;
- svc_fds[svc_cnt].events = POLLIN;
- svc_fds[svc_cnt].revents = 0;
-- printf("add rs udp sock %d\n",rs->udp_sock);
- return 0;
- }
-
-@@ -3686,7 +3686,6 @@ static void rs_svc_process_sock(void)
- struct rs_svc_msg msg;
-
- read(svc_sock[1], &msg, sizeof msg);
-- printf("%s op %d\n",__func__, msg.op);
- switch (msg.op) {
- case RS_SVC_INSERT:
- msg.status = rs_svc_add_rs(msg.rs);
-@@ -3698,7 +3697,6 @@ static void rs_svc_process_sock(void)
- msg.status = ENOTSUP;
- break;
- }
-- printf("%s status %d\n",__func__, msg.status);
- write(svc_sock[1], &msg, sizeof msg);
- }
-
-@@ -3732,7 +3730,6 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
- struct ibv_ah_attr attr;
- int ret;
-
-- printf("%s\n",__func__);
- if (dest->ah) {
- fastlock_acquire(&rs->slock);
- ibv_destroy_ah(dest->ah);
-@@ -3741,7 +3738,6 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
- }
-
- ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps);
-- printf("%s rdma_create_id %d %s\n",__func__, ret, strerror(errno));
- if (ret)
- return;
-
-@@ -3752,12 +3748,10 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
- else
- saddr.sin6.sin6_port = 0;
- ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000);
-- printf("%s rdma_resolve_addr %d %s\n",__func__, ret, strerror(errno));
- if (ret)
- goto out;
-
- ret = rdma_resolve_route(id, 2000);
-- printf("%s rdma_resolve_route %d %s\n",__func__, ret, strerror(errno));
- if (ret)
- goto out;
-
-@@ -3776,12 +3770,9 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
- attr.static_rate = id->route.path_rec->rate;
- attr.port_num = id->port_num;
-
-- printf("%s getting slock \n",__func__);
- fastlock_acquire(&rs->slock);
-- printf("%s why am I not here? \n",__func__);
- dest->qpn = qpn;
- dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr);
-- printf("%s ibv_create_ah %p %s\n",__func__, dest->ah, strerror(errno));
- fastlock_release(&rs->slock);
- out:
- rdma_destroy_id(id);
-@@ -3790,17 +3781,6 @@ out:
- static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
- union socket_addr *addr)
- {
--printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag,
-- udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length);
--
--printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n",
--udp_hdr->tag == ntohl(DS_UDP_TAG),
-- udp_hdr->version == 4, addr->sa.sa_family == AF_INET,
-- udp_hdr->length == DS_UDP_IPV4_HDR_LEN,
-- udp_hdr->version == 6, addr->sa.sa_family == AF_INET6,
-- udp_hdr->length == DS_UDP_IPV6_HDR_LEN);
--
--
- return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
- ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
- udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
-@@ -3816,7 +3796,7 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
- struct ibv_sge sge;
- uint64_t offset;
-
-- printf("%s\n",__func__);
-+// PRINTADDR(src);
- if (!ds_can_send(rs)) {
- if (ds_get_comp(rs, 0, ds_can_send))
- return;
-@@ -3827,13 +3807,18 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
- rs->sqe_avail--;
-
- ds_format_hdr(&hdr, src);
-+// printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version,
-+// hdr.length, hdr.port);
- memcpy((void *) msg, &hdr, hdr.length);
- memcpy((void *) msg + hdr.length, buf, len);
-+// printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf));
- sge.addr = (uintptr_t) msg;
- sge.length = hdr.length + len;
- sge.lkey = rs->conn_dest->qp->smr->lkey;
- offset = (uint8_t *) msg - rs->sbuf;
-
-+// printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version,
-+// ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port);
- ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
- }
-
-@@ -3845,9 +3830,9 @@ static void rs_svc_process_rs(struct rsocket *rs)
- socklen_t addrlen = sizeof addr;
- int len, ret;
-
-- printf("%s\n",__func__);
- ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
-- printf("%s recvfrom %d\n",__func__, ret);
-+// PRINTADDR(&addr);
-+// printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8]));
- if (ret < DS_UDP_IPV4_HDR_LEN)
- return;
-
-@@ -3855,12 +3840,10 @@ static void rs_svc_process_rs(struct rsocket *rs)
- if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
- return;
-
-- printf("%s valid hdr\n",__func__);
- len = ret - udp_hdr->length;
- udp_hdr->tag = ntohl(udp_hdr->tag);
- udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
- ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
-- printf("%s ds_get_dest %d\n",__func__, ret);
- if (ret)
- return;
-
-@@ -3868,16 +3851,15 @@ static void rs_svc_process_rs(struct rsocket *rs)
- rs_svc_create_ah(rs, dest, udp_hdr->qpn);
-
- /* to do: handle when dest local ip address doesn't match udp ip */
-+ if (udp_hdr->op != RS_OP_DATA)
-+ return;
-+
- fastlock_acquire(&rs->slock);
- cur_dest = rs->conn_dest;
-- if (udp_hdr->op == RS_OP_DATA) {
-- rs->conn_dest = &dest->qp->dest;
-- printf("%s forwarding msg\n",__func__);
-- rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
-- }
-+ rs->conn_dest = &dest->qp->dest;
-+ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
-
- rs->conn_dest = dest;
-- printf("%s sending resp\n",__func__);
- ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
- rs->conn_dest = cur_dest;
- fastlock_release(&rs->slock);
-@@ -3888,7 +3870,6 @@ static void *rs_svc_run(void *arg)
- struct rs_svc_msg msg;
- int i, ret;
-
-- printf("%s\n",__func__);
- ret = rs_svc_grow_sets();
- if (ret) {
- msg.status = ret;
-@@ -3899,13 +3880,10 @@ static void *rs_svc_run(void *arg)
- svc_fds[0].fd = svc_sock[1];
- svc_fds[0].events = POLLIN;
- do {
-- printf("%s svc cnt %d\n",__func__, svc_cnt);
- for (i = 0; i <= svc_cnt; i++)
- svc_fds[i].revents = 0;
-
-- printf("%s poll\n",__func__);
- poll(svc_fds, svc_cnt + 1, -1);
-- printf("%s poll done\n",__func__);
- if (svc_fds[0].revents)
- rs_svc_process_sock();