From 7b3d40c0c07353233c3eefc081e477d928e5765a Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Wed, 12 Dec 2012 16:49:45 -0800 Subject: [PATCH] refresh --- meta | 7 +- patches/dsocket | 434 ++++++++++++++++----------- patches/refresh-temp | 676 ------------------------------------------- 3 files changed, 263 insertions(+), 854 deletions(-) delete mode 100644 patches/refresh-temp diff --git a/meta b/meta index 9645b984..80927284 100644 --- a/meta +++ b/meta @@ -1,9 +1,8 @@ Version: 1 -Previous: f9129f248e598f6d687a22e32bafc7e8164d6a4b -Head: 39d8533b49387b416ce7c88b8f665fe1e3b784cc +Previous: 4942c4abe0f1953b07ee7dffaf9d79be8983f692 +Head: b2ae615cc6c0119babd24414c657cb1962392e02 Applied: - dsocket: 13732c8a437be83b664fe683516ec300a145f76a - refresh-temp: 39d8533b49387b416ce7c88b8f665fe1e3b784cc + dsocket: b2ae615cc6c0119babd24414c657cb1962392e02 Unapplied: udpong: a42957509acbde99a7d8469e0819b7d75af51289 test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7 diff --git a/patches/dsocket b/patches/dsocket index 7ce47b80..25e35aa6 100644 --- a/patches/dsocket +++ b/patches/dsocket @@ -1,5 +1,5 @@ Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21 -Top: f1822f3bbe2c9b92b5e2ca8b4e5c3cece427c5ff +Top: 136936c0a82503ee0da9daccd8b948cd09e58b64 Author: Sean Hefty Date: 2012-11-09 10:26:38 -0800 @@ -62,7 +62,7 @@ index 1484f65..a660208 100644 +rsocket QP. \ No newline at end of file diff --git a/src/cma.c b/src/cma.c -index 388be61..0f58966 100755 +index 388be61..ff9b426 100755 --- a/src/cma.c +++ b/src/cma.c @@ -513,7 +513,7 @@ int rdma_destroy_id(struct rdma_cm_id *id) @@ -74,7 +74,7 @@ index 388be61..0f58966 100755 { if (!addr) return 0; -@@ -2232,9 +2232,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id) +@@ -2232,9 +2232,19 @@ void rdma_destroy_ep(struct rdma_cm_id *id) int ucma_max_qpsize(struct rdma_cm_id *id) { struct cma_id_private *id_priv; @@ -85,6 +85,7 @@ index 388be61..0f58966 100755 + if (id && id_priv->cma_dev) { + max_size = id_priv->cma_dev->max_qpsize; + } else { ++ ucma_init(); + for (i = 0; i < cma_dev_cnt; i++) { + if (!max_size || max_size > cma_dev_array[i].max_qpsize) + max_size = cma_dev_array[i].max_qpsize; @@ -112,7 +113,7 @@ index 0a0370e..7135a61 100644 { errno = err; diff --git a/src/rsocket.c b/src/rsocket.c -index a060f66..c61d689 100644 +index a060f66..6fa4c68 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -47,6 +47,8 @@ @@ -218,7 +219,7 @@ index a060f66..c61d689 100644 rs_connect_error = 0x0800, rs_disconnected = 0x1000, rs_error = 0x2000, -@@ -170,68 +212,248 @@ enum rs_state { +@@ -170,68 +212,249 @@ enum rs_state { #define RS_OPT_SWAP_SGL 1 @@ -441,6 +442,7 @@ index a060f66..c61d689 100644 + + msg.op = RS_SVC_INSERT; + msg.status = EINVAL; ++ printf("%s rs %p\n", __func__, rs); + msg.rs = rs; + write(svc_sock[0], &msg, sizeof msg); + read(svc_sock[0], &msg, sizeof msg); @@ -499,7 +501,7 @@ index a060f66..c61d689 100644 static int rs_value_to_scale(int value, int bits) { return value <= (1 << (bits - 1)) ? -@@ -307,10 +529,10 @@ out: +@@ -307,10 +530,10 @@ out: pthread_mutex_unlock(&mut); } @@ -512,7 +514,7 @@ index a060f66..c61d689 100644 pthread_mutex_unlock(&mut); return rs->index; } -@@ -322,7 +544,7 @@ static void rs_remove(struct rsocket *rs) +@@ -322,7 +545,7 @@ static void rs_remove(struct rsocket *rs) pthread_mutex_unlock(&mut); } @@ -521,7 +523,7 @@ index a060f66..c61d689 100644 { struct rsocket *rs; -@@ -330,29 +552,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) +@@ -330,29 +553,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) if (!rs) return NULL; @@ -566,7 +568,7 @@ index a060f66..c61d689 100644 dlist_init(&rs->iomap_list); dlist_init(&rs->iomap_queue); return rs; -@@ -360,13 +592,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) +@@ -360,13 +593,29 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) static int rs_set_nonblocking(struct rsocket *rs, long arg) { @@ -584,7 +586,9 @@ index a060f66..c61d689 100644 + if (!ret && rs->state < rs_connected) + ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); + } else { ++ printf("%s set nonblock\n", __func__); + ret = fcntl(rs->epfd, F_SETFL, arg); ++ printf("%s fcntl %d\n", __func__, ret); + + if (!ret && rs->qp_list) { + qp = rs->qp_list; @@ -598,7 +602,7 @@ index a060f66..c61d689 100644 return ret; } -@@ -390,17 +636,39 @@ static void rs_set_qp_size(struct rsocket *rs) +@@ -390,17 +639,43 @@ static void rs_set_qp_size(struct rsocket *rs) rs->rq_size = 2; } @@ -606,6 +610,8 @@ index a060f66..c61d689 100644 +{ + uint16_t max_size; + ++ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, ++ rs->rq_size, rs->rbuf_size); + max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE); + + if (rs->sq_size > max_size) @@ -622,6 +628,8 @@ index a060f66..c61d689 100644 + rs->sq_size = rs->sbuf_size / RS_SNDLOWAT; + else + rs->sbuf_size = rs->sq_size * RS_SNDLOWAT; ++ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, ++ rs->rq_size, rs->rbuf_size); +} + static int rs_init_bufs(struct rsocket *rs) @@ -640,7 +648,7 @@ index a060f66..c61d689 100644 rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size); if (!rs->smr) -@@ -410,7 +678,7 @@ static int rs_init_bufs(struct rsocket *rs) +@@ -410,7 +685,7 @@ static int rs_init_bufs(struct rsocket *rs) sizeof(*rs->target_iomap) * rs->target_iomap_size; rs->target_buffer_list = malloc(len); if (!rs->target_buffer_list) @@ -649,7 +657,7 @@ index a060f66..c61d689 100644 rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len); if (!rs->target_mr) -@@ -423,7 +691,7 @@ static int rs_init_bufs(struct rsocket *rs) +@@ -423,7 +698,7 @@ static int rs_init_bufs(struct rsocket *rs) rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf)); if (!rs->rbuf) @@ -658,26 +666,21 @@ index a060f66..c61d689 100644 rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size); if (!rs->rmr) -@@ -440,15 +708,32 @@ static int rs_init_bufs(struct rsocket *rs) +@@ -440,37 +715,56 @@ static int rs_init_bufs(struct rsocket *rs) return 0; } -static int rs_create_cq(struct rsocket *rs) +static int ds_init_bufs(struct ds_qp *qp) - { -- rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs); -- if (!rs->cm_id->recv_cq_channel) ++{ + qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf)); + if (!qp->rbuf) + return ERR(ENOMEM); + + qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size); + if (!qp->smr) - return -1; - -- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size, -- rs->cm_id, rs->cm_id->recv_cq_channel, 0); -- if (!rs->cm_id->recv_cq) ++ return -1; ++ + qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size); + if (!qp->rmr) + return -1; @@ -686,18 +689,26 @@ index a060f66..c61d689 100644 +} + +static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) -+{ + { +- rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs); +- if (!rs->cm_id->recv_cq_channel) + cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs); ++ printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel); + if (!cm_id->recv_cq_channel) -+ return -1; -+ + return -1; + +- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size, +- rs->cm_id, rs->cm_id->recv_cq_channel, 0); +- if (!rs->cm_id->recv_cq) + cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size, + cm_id, cm_id->recv_cq_channel, 0); ++ printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size); + if (!cm_id->recv_cq) goto err1; if (rs->fd_flags & O_NONBLOCK) { -@@ -456,21 +741,20 @@ static int rs_create_cq(struct rsocket *rs) ++ printf("%s set nonblock\n", __func__); + if (rs_set_nonblocking(rs, O_NONBLOCK)) goto err2; } @@ -726,7 +737,7 @@ index a060f66..c61d689 100644 { struct ibv_recv_wr wr, *bad; -@@ -482,6 +766,23 @@ rs_post_recv(struct rsocket *rs) +@@ -482,6 +776,23 @@ rs_post_recv(struct rsocket *rs) return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad)); } @@ -750,7 +761,7 @@ index a060f66..c61d689 100644 static int rs_create_ep(struct rsocket *rs) { struct ibv_qp_init_attr qp_attr; -@@ -492,7 +793,7 @@ static int rs_create_ep(struct rsocket *rs) +@@ -492,7 +803,7 @@ static int rs_create_ep(struct rsocket *rs) if (ret) return ret; @@ -759,7 +770,7 @@ index a060f66..c61d689 100644 if (ret) return ret; -@@ -549,8 +850,74 @@ static void rs_free_iomappings(struct rsocket *rs) +@@ -549,8 +860,73 @@ static void rs_free_iomappings(struct rsocket *rs) } } @@ -789,6 +800,8 @@ index a060f66..c61d689 100644 + +static void ds_free(struct rsocket *rs) +{ ++ struct ds_qp *qp; ++ + if (rs->state & (rs_readable | rs_writable)) + rs_remove_from_svc(rs); + @@ -801,12 +814,9 @@ index a060f66..c61d689 100644 + if (rs->dmsg) + free(rs->dmsg); + -+ if (rs->smsg_free) -+ free(rs->smsg_free); -+ -+ while (rs->qp_list) { -+ ds_remove_qp(rs, rs->qp_list); -+ ds_free_qp(rs->qp_list); ++ while ((qp = rs->qp_list)) { ++ ds_remove_qp(rs, qp); ++ ds_free_qp(qp); + } + + if (rs->epfd >= 0) @@ -834,7 +844,7 @@ index a060f66..c61d689 100644 if (rs->index >= 0) rs_remove(rs); -@@ -582,7 +949,7 @@ static void rs_free(struct rsocket *rs) +@@ -582,7 +958,7 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } @@ -843,7 +853,7 @@ index a060f66..c61d689 100644 fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); fastlock_destroy(&rs->rlock); -@@ -636,29 +1003,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) +@@ -636,29 +1012,89 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) rs->sseq_comp = ntohs(conn->credits); } @@ -859,6 +869,40 @@ index a060f66..c61d689 100644 + + return 0; +} ++ ++static int ds_init_ep(struct rsocket *rs) ++{ ++ struct ds_smsg *msg; ++ int i, ret; ++ ++ ds_set_qp_size(rs); ++ ++ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT); ++ if (!rs->sbuf) ++ return ERR(ENOMEM); ++ ++ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg)); ++ if (!rs->dmsg) ++ return ERR(ENOMEM); ++ ++ rs->sqe_avail = rs->sq_size; ++ rs->rqe_avail = rs->rq_size; ++ ++ rs->smsg_free = (struct ds_smsg *) rs->sbuf; ++ msg = rs->smsg_free; ++ for (i = 0; i < rs->sq_size - 1; i++) { ++ msg->next = (void *) msg + RS_SNDLOWAT; ++ msg = msg->next; ++ } ++ msg->next = NULL; ++ ++ ret = rs_add_to_svc(rs); ++ if (ret) ++ return ret; ++ ++ rs->state = rs_readable | rs_writable; ++ return 0; ++} + int rsocket(int domain, int type, int protocol) { @@ -886,15 +930,16 @@ index a060f66..c61d689 100644 + ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); + if (ret) + goto err; - -- ret = rs_insert(rs); ++ + rs->cm_id->route.addr.src_addr.sa_family = domain; + index = rs->cm_id->channel->fd; + } else { ++ printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size); + ret = ds_init(rs, domain); + if (ret) + goto err; -+ + +- ret = rs_insert(rs); + index = rs->udp_sock; + } + @@ -906,7 +951,7 @@ index a060f66..c61d689 100644 return rs->index; err: -@@ -672,9 +1064,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) +@@ -672,9 +1108,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) int ret; rs = idm_at(&idm, socket); @@ -918,17 +963,17 @@ index a060f66..c61d689 100644 + if (!ret) + rs->state = rs_bound; + } else { -+ ret = bind(rs->udp_sock, addr, addrlen); -+ if (!ret) { -+ ret = rs_add_to_svc(rs); -+ if (!ret) -+ rs->state = rs_readable | rs_writable; ++ if (rs->state == rs_init) { ++ ret = ds_init_ep(rs); ++ if (ret) ++ return ret; + } ++ ret = bind(rs->udp_sock, addr, addrlen); + } return ret; } -@@ -710,7 +1111,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) +@@ -710,7 +1155,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) int ret; rs = idm_at(&idm, socket); @@ -937,7 +982,7 @@ index a060f66..c61d689 100644 if (!new_rs) return ERR(ENOMEM); -@@ -718,7 +1119,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) +@@ -718,7 +1163,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) if (ret) goto err; @@ -946,45 +991,10 @@ index a060f66..c61d689 100644 if (ret < 0) goto err; -@@ -855,13 +1256,268 @@ connected: +@@ -855,13 +1300,256 @@ connected: return ret; } -+static int ds_init_ep(struct rsocket *rs) -+{ -+ struct ds_smsg *msg; -+ int i, ret; -+ -+ ds_set_qp_size(rs); -+ -+ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT); -+ if (!rs->sbuf) -+ return ERR(ENOMEM); -+ -+ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg)); -+ if (!rs->dmsg) -+ return ERR(ENOMEM); -+ -+ rs->sbuf_bytes_avail = rs->sbuf_size; -+ rs->sqe_avail = rs->sq_size; -+ rs->rqe_avail = rs->rq_size; -+ -+ rs->smsg_free = (struct ds_smsg *) rs->sbuf; -+ msg = rs->smsg_free; -+ for (i = 0; i < rs->sq_size - 1; i++) { -+ msg->next = (void *) msg + i * RS_SNDLOWAT; -+ msg = msg->next; -+ } -+ msg->next = NULL; -+ -+ ret = rs_add_to_svc(rs); -+ if (ret) -+ return ret; -+ -+ rs->state = rs_readable | rs_writable; -+ return 0; -+} -+ +static int rs_any_addr(const union socket_addr *addr) +{ + if (addr->sa.sa_family == AF_INET) { @@ -1068,38 +1078,44 @@ index a060f66..c61d689 100644 +} + +static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, -+ socklen_t addrlen, struct ds_qp **qp) ++ socklen_t addrlen, struct ds_qp **new_qp) +{ ++ struct ds_qp *qp; + struct ibv_qp_init_attr qp_attr; + struct epoll_event event; + int i, ret; + -+ *qp = calloc(1, sizeof(struct ds_qp)); -+ if (!*qp) ++printf("%s\n", __func__); ++ qp = calloc(1, sizeof(*qp)); ++ if (!qp) + return ERR(ENOMEM); + -+ (*qp)->rs = rs; -+ ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP); ++ qp->rs = rs; ++ ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP); ++ printf("%s rdma_create_id %d\n", __func__, ret); + if (ret) + goto err; + -+ ds_format_hdr(&(*qp)->hdr, src_addr); -+ ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa); ++ ds_format_hdr(&qp->hdr, src_addr); ++ ret = rdma_bind_addr(qp->cm_id, &src_addr->sa); ++ printf("%s rdma_bind_addr %d\n", __func__, ret); + if (ret) + goto err; + -+ ret = ds_init_bufs(*qp); ++ ret = ds_init_bufs(qp); ++ printf("%s ds_init_bufs %d\n", __func__, ret); + if (ret) + goto err; + -+ ret = rs_create_cq(rs, (*qp)->cm_id); ++ ret = rs_create_cq(rs, qp->cm_id); ++ printf("%s rs_create_cq %d\n", __func__, ret); + if (ret) + goto err; + + memset(&qp_attr, 0, sizeof qp_attr); + qp_attr.qp_context = qp; -+ qp_attr.send_cq = rs->cm_id->send_cq; -+ qp_attr.recv_cq = rs->cm_id->recv_cq; ++ qp_attr.send_cq = qp->cm_id->send_cq; ++ qp_attr.recv_cq = qp->cm_id->recv_cq; + qp_attr.qp_type = IBV_QPT_UD; + qp_attr.sq_sig_all = 1; + qp_attr.cap.max_send_wr = rs->sq_size; @@ -1107,31 +1123,35 @@ index a060f66..c61d689 100644 + qp_attr.cap.max_send_sge = 2; + qp_attr.cap.max_recv_sge = 1; + qp_attr.cap.max_inline_data = rs->sq_inline; -+ ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr); ++ ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr); ++ printf("%s rdma_create_qp %d\n", __func__, ret); + if (ret) + goto err; + -+ ret = ds_add_qp_dest(*qp, src_addr, addrlen); ++ ret = ds_add_qp_dest(qp, src_addr, addrlen); ++ printf("%s ds_add_qp_dest %d\n", __func__, ret); + if (ret) + goto err; + + event.events = EPOLLIN; -+ event.data.ptr = *qp; ++ event.data.ptr = qp; + ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, -+ (*qp)->cm_id->recv_cq_channel->fd, &event); ++ qp->cm_id->recv_cq_channel->fd, &event); ++ printf("%s epoll_ctl %d\n", __func__, ret); + if (ret) + goto err; + + for (i = 0; i < rs->rq_size; i++) { -+ ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT); ++ ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT); + if (ret) + goto err; + } + -+ ds_insert_qp(rs, *qp); ++ ds_insert_qp(rs, qp); ++ *new_qp = qp; + return 0; +err: -+ ds_free_qp(*qp); ++ ds_free_qp(qp); + return ret; +} + @@ -1158,38 +1178,42 @@ index a060f66..c61d689 100644 + union socket_addr src_addr; + socklen_t src_len; + struct ds_qp *qp; ++ struct ds_dest **tdest, *new_dest; + int ret = 0; + ++ printf("%s \n", __func__); + fastlock_acquire(&rs->map_lock); -+ dest = tfind(addr, &rs->dest_map, ds_compare_addr); -+ if (dest) -+ goto out; -+ -+ if (rs->state == rs_init) { -+ ret = ds_init_ep(rs); -+ if (ret) -+ goto out; -+ } ++ tdest = tfind(addr, &rs->dest_map, ds_compare_addr); ++ printf("%s tfind %p\n", __func__, dest); ++ if (tdest) ++ goto found; + + ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len); ++ printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno)); + if (ret) + goto out; + + ret = ds_get_qp(rs, &src_addr, src_len, &qp); ++ printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno)); + if (ret) + goto out; + -+ if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) { -+ *dest = calloc(1, sizeof(struct ds_dest)); -+ if (!*dest) { ++ tdest = tfind(addr, &rs->dest_map, ds_compare_addr); ++ if (!tdest) { ++ printf("%s adding dest into map\n", __func__); ++ new_dest = calloc(1, sizeof(*new_dest)); ++ if (!new_dest) { + ret = ERR(ENOMEM); + goto out; + } + -+ memcpy(&(*dest)->addr, addr, addrlen); -+ (*dest)->qp = qp; -+ tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr); ++ memcpy(&new_dest->addr, addr, addrlen); ++ new_dest->qp = qp; ++ tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr); + } ++ ++found: ++ *dest = *tdest; +out: + fastlock_release(&rs->map_lock); + return ret; @@ -1207,17 +1231,26 @@ index a060f66..c61d689 100644 + memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); + ret = rs_do_connect(rs); + } else { ++ printf("%s\n", __func__); ++ if (rs->state == rs_init) { ++ ret = ds_init_ep(rs); ++ if (ret) ++ return ret; ++ } ++ + fastlock_acquire(&rs->slock); + ret = connect(rs->udp_sock, addr, addrlen); ++ printf("%s connect %d %s\n", __func__, ret, strerror(errno)); + if (!ret) + ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); ++ printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno)); + fastlock_release(&rs->slock); + } + return ret; } static int rs_post_write_msg(struct rsocket *rs, -@@ -903,6 +1559,24 @@ static int rs_post_write(struct rsocket *rs, +@@ -903,6 +1591,24 @@ static int rs_post_write(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } @@ -1242,7 +1275,7 @@ index a060f66..c61d689 100644 /* * Update target SGE before sending data. Otherwise the remote side may * update the entry before we do. -@@ -1046,7 +1720,7 @@ static int rs_poll_cq(struct rsocket *rs) +@@ -1046,7 +1752,7 @@ static int rs_poll_cq(struct rsocket *rs) rs->state = rs_disconnected; return 0; } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) { @@ -1251,12 +1284,15 @@ index a060f66..c61d689 100644 } break; case RS_OP_WRITE: -@@ -1137,42 +1811,213 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs - - fastlock_acquire(&rs->cq_lock); - do { -- rs_update_credits(rs); -- ret = rs_poll_cq(rs); +@@ -1133,46 +1839,217 @@ static int rs_get_cq_event(struct rsocket *rs) + */ + static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) + { +- int ret; ++ int ret; ++ ++ fastlock_acquire(&rs->cq_lock); ++ do { + rs_update_credits(rs); + ret = rs_poll_cq(rs); + if (test(rs)) { @@ -1424,22 +1460,24 @@ index a060f66..c61d689 100644 +static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) +{ + int ret = 0; -+ -+ fastlock_acquire(&rs->cq_lock); -+ do { + + fastlock_acquire(&rs->cq_lock); + do { +- rs_update_credits(rs); +- ret = rs_poll_cq(rs); + ds_poll_cqs(rs); if (test(rs)) { -+ printf("%s test succeeded\n", __func__); ++// printf("%s test succeeded\n", __func__); ret = 0; break; - } else if (ret) { - break; } else if (nonblock) { ret = ERR(EWOULDBLOCK); -+ printf("%s nonblocking \n", __func__); ++// printf("%s nonblocking \n", __func__); } else if (!rs->cq_armed) { - ibv_req_notify_cq(rs->cm_id->recv_cq, 0); -+ printf("%s req notify \n", __func__); ++// printf("%s req notify \n", __func__); + ds_req_notify_cqs(rs); rs->cq_armed = 1; } else { @@ -1449,7 +1487,7 @@ index a060f66..c61d689 100644 - ret = rs_get_cq_event(rs); + ret = ds_get_cq_event(rs); -+ printf("%s get event ret %d %s\n", __func__, ret, strerror(errno)); ++// printf("%s get event ret %d %s\n", __func__, ret, strerror(errno)); fastlock_release(&rs->cq_wait_lock); fastlock_acquire(&rs->cq_lock); } @@ -1457,7 +1495,7 @@ index a060f66..c61d689 100644 - rs_update_credits(rs); fastlock_release(&rs->cq_lock); -+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); ++// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); return ret; } @@ -1471,11 +1509,11 @@ index a060f66..c61d689 100644 do { - ret = rs_process_cq(rs, 1, test); + ret = ds_process_cqs(rs, 1, test); -+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); ++// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); if (!ret || nonblock || errno != EWOULDBLOCK) return ret; -@@ -1184,7 +2029,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc +@@ -1184,7 +2061,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc (e.tv_usec - s.tv_usec) + 1; } while (poll_time <= polling_time); @@ -1484,7 +1522,7 @@ index a060f66..c61d689 100644 return ret; } -@@ -1219,9 +2064,19 @@ static int rs_can_send(struct rsocket *rs) +@@ -1219,9 +2096,19 @@ static int rs_can_send(struct rsocket *rs) (rs->target_sgl[rs->target_sge].length != 0); } @@ -1505,7 +1543,7 @@ index a060f66..c61d689 100644 } static int rs_conn_can_send_ctrl(struct rsocket *rs) -@@ -1236,7 +2091,7 @@ static int rs_have_rdata(struct rsocket *rs) +@@ -1236,7 +2123,7 @@ static int rs_have_rdata(struct rsocket *rs) static int rs_conn_have_rdata(struct rsocket *rs) { @@ -1514,7 +1552,7 @@ index a060f66..c61d689 100644 } static int rs_conn_all_sends_done(struct rsocket *rs) -@@ -1245,6 +2100,70 @@ static int rs_conn_all_sends_done(struct rsocket *rs) +@@ -1245,6 +2132,70 @@ static int rs_conn_all_sends_done(struct rsocket *rs) !(rs->state & rs_connected); } @@ -1550,16 +1588,15 @@ index a060f66..c61d689 100644 + struct ds_header *hdr; + int ret; + -+ret = 0; -+ printf("%s \n", __func__); ++// printf("%s \n", __func__); + if (!(rs->state & rs_readable)) + return ERR(EINVAL); + + if (!rs_have_rdata(rs)) { -+ printf("%s need rdata \n", __func__); ++// printf("%s need rdata \n", __func__); + ret = ds_get_comp(rs, rs_nonblocking(rs, flags), + rs_have_rdata); -+ printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno)); ++// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); + if (ret) + return ret; + } @@ -1579,13 +1616,14 @@ index a060f66..c61d689 100644 + rs->rmsg_head = 0; + } + ++ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); + return len; +} + static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len) { size_t left = len; -@@ -1290,6 +2209,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) +@@ -1290,6 +2241,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) int ret; rs = idm_at(&idm, socket); @@ -1599,7 +1637,7 @@ index a060f66..c61d689 100644 if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { -@@ -1339,7 +2265,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) +@@ -1339,7 +2297,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) rs->rbuf_bytes_avail += rsize; } @@ -1608,7 +1646,7 @@ index a060f66..c61d689 100644 fastlock_release(&rs->rlock); return ret ? ret : len - left; -@@ -1348,8 +2274,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) +@@ -1348,8 +2306,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen) { @@ -1626,7 +1664,7 @@ index a060f66..c61d689 100644 ret = rrecv(socket, buf, len, flags); if (ret > 0 && src_addr) rgetpeername(socket, src_addr, addrlen); -@@ -1391,14 +2326,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) +@@ -1391,14 +2358,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) struct rs_iomap iom; int ret; @@ -1643,7 +1681,7 @@ index a060f66..c61d689 100644 ret = ERR(ECONNRESET); break; } -@@ -1447,10 +2382,90 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) +@@ -1447,10 +2414,99 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) } rs->iomap_pending = !dlist_empty(&rs->iomap_queue); @@ -1658,12 +1696,14 @@ index a060f66..c61d689 100644 + struct ds_udp_header hdr; + struct msghdr msg; + struct iovec miov[8]; ++ ssize_t ret; + ++// printf("%s\n", __func__); + if (iovcnt > 8) + return ERR(ENOTSUP); + + hdr.tag = htonl(DS_UDP_TAG); -+ hdr.version = 1; ++ hdr.version = rs->conn_dest->qp->hdr.version; + hdr.op = op; + hdr.reserved = 0; + hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF); @@ -1685,18 +1725,24 @@ index a060f66..c61d689 100644 + msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa); + msg.msg_iov = miov; + msg.msg_iovlen = iovcnt + 1; -+ return sendmsg(rs->udp_sock, &msg, flags); ++// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen); ++ ret = sendmsg(rs->udp_sock, &msg, flags); ++ printf("%s ret %d %s\n", __func__, ret, strerror(errno)); ++ return ret > 0 ? ret - sizeof hdr : ret; +} + +static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, + int flags, uint8_t op) +{ + struct iovec iov; ++ printf("%s\n", __func__); + if (buf && len) { ++// printf("%s have buffer\n", __func__); + iov.iov_base = (void *) buf; + iov.iov_len = len; + return ds_sendv_udp(rs, &iov, 1, flags, op); + } else { ++// printf("%s no buffer\n", __func__); + return ds_sendv_udp(rs, NULL, 0, flags, op); + } +} @@ -1708,6 +1754,7 @@ index a060f66..c61d689 100644 + uint64_t offset; + int ret = 0; + ++ printf("%s\n", __func__); + if (!rs->conn_dest->ah) + return ds_send_udp(rs, buf, len, flags, RS_OP_DATA); + @@ -1735,7 +1782,7 @@ index a060f66..c61d689 100644 /* * We overlap sending the data, by posting a small work request immediately, * then increasing the size of the send on each iteration. -@@ -1464,6 +2479,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) +@@ -1464,6 +2520,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) int ret = 0; rs = idm_at(&idm, socket); @@ -1749,7 +1796,7 @@ index a060f66..c61d689 100644 if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { -@@ -1485,7 +2507,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) +@@ -1485,7 +2548,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) rs_conn_can_send); if (ret) break; @@ -1758,7 +1805,7 @@ index a060f66..c61d689 100644 ret = ERR(ECONNRESET); break; } -@@ -1538,10 +2560,27 @@ out: +@@ -1538,10 +2601,39 @@ out: ssize_t rsendto(int socket, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) { @@ -1767,6 +1814,7 @@ index a060f66..c61d689 100644 + struct rsocket *rs; + int ret; + ++ printf("%s\n", __func__); + rs = idm_at(&idm, socket); + if (rs->type == SOCK_STREAM) { + if (dest_addr || addrlen) @@ -1774,14 +1822,25 @@ index a060f66..c61d689 100644 + + return rsend(socket, buf, len, flags); + } - -- return rsend(socket, buf, len, flags); ++ ++ if (rs->state == rs_init) { ++ ret = ds_init_ep(rs); ++ if (ret) ++ return ret; ++ } ++ + fastlock_acquire(&rs->slock); ++ printf("%s check conn dest\n", __func__); + if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) { ++ printf("%s need conn dest\n", __func__); + ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest); + if (ret) + goto out; + } ++ else ++ printf("%s connected\n", __func__); + +- return rsend(socket, buf, len, flags); + ret = dsend(rs, buf, len, flags); +out: + fastlock_release(&rs->slock); @@ -1789,7 +1848,7 @@ index a060f66..c61d689 100644 } static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len) -@@ -1600,7 +2639,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags +@@ -1600,7 +2692,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags rs_conn_can_send); if (ret) break; @@ -1798,7 +1857,7 @@ index a060f66..c61d689 100644 ret = ERR(ECONNRESET); break; } -@@ -1653,7 +2692,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) +@@ -1653,7 +2745,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) if (msg->msg_control && msg->msg_controllen) return ERR(ENOTSUP); @@ -1807,7 +1866,7 @@ index a060f66..c61d689 100644 } ssize_t rwrite(int socket, const void *buf, size_t count) -@@ -1690,8 +2729,8 @@ static int rs_poll_rs(struct rsocket *rs, int events, +@@ -1690,8 +2782,8 @@ static int rs_poll_rs(struct rsocket *rs, int events, int ret; check_cq: @@ -1818,7 +1877,7 @@ index a060f66..c61d689 100644 rs_process_cq(rs, nonblock, test); revents = 0; -@@ -1707,6 +2746,16 @@ check_cq: +@@ -1707,6 +2799,16 @@ check_cq: } return revents; @@ -1835,7 +1894,7 @@ index a060f66..c61d689 100644 } if (rs->state == rs_listening) { -@@ -1766,11 +2815,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) +@@ -1766,11 +2868,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) if (fds[i].revents) return 1; @@ -1855,7 +1914,7 @@ index a060f66..c61d689 100644 rfds[i].events = POLLIN; } else { rfds[i].fd = fds[i].fd; -@@ -1793,7 +2845,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) +@@ -1793,7 +2898,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) rs = idm_lookup(&idm, fds[i].fd); if (rs) { @@ -1867,7 +1926,7 @@ index a060f66..c61d689 100644 fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); } else { fds[i].revents = rfds[i].revents; -@@ -1949,7 +3004,7 @@ int rshutdown(int socket, int how) +@@ -1949,7 +3057,7 @@ int rshutdown(int socket, int how) rs = idm_at(&idm, socket); if (how == SHUT_RD) { @@ -1876,7 +1935,7 @@ index a060f66..c61d689 100644 return 0; } -@@ -1959,10 +3014,10 @@ int rshutdown(int socket, int how) +@@ -1959,10 +3067,10 @@ int rshutdown(int socket, int how) if (rs->state & rs_connected) { if (how == SHUT_RDWR) { ctrl = RS_CTRL_DISCONNECT; @@ -1890,7 +1949,7 @@ index a060f66..c61d689 100644 RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT; } if (!rs->ctrl_avail) { -@@ -1987,13 +3042,29 @@ int rshutdown(int socket, int how) +@@ -1987,13 +3095,29 @@ int rshutdown(int socket, int how) return 0; } @@ -1922,7 +1981,7 @@ index a060f66..c61d689 100644 rs_free(rs); return 0; -@@ -2018,8 +3089,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) +@@ -2018,8 +3142,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); @@ -1937,7 +1996,7 @@ index a060f66..c61d689 100644 } int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) -@@ -2027,8 +3102,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) +@@ -2027,8 +3155,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); @@ -1952,7 +2011,7 @@ index a060f66..c61d689 100644 } int rsetsockopt(int socket, int level, int optname, -@@ -2040,18 +3119,26 @@ int rsetsockopt(int socket, int level, int optname, +@@ -2040,18 +3172,26 @@ int rsetsockopt(int socket, int level, int optname, ret = ERR(ENOTSUP); rs = idm_at(&idm, socket); @@ -1986,7 +2045,7 @@ index a060f66..c61d689 100644 opt_on = *(int *) optval; break; case SO_RCVBUF: -@@ -2101,9 +3188,11 @@ int rsetsockopt(int socket, int level, int optname, +@@ -2101,9 +3241,11 @@ int rsetsockopt(int socket, int level, int optname, opts = &rs->ipv6_opts; switch (optname) { case IPV6_V6ONLY: @@ -2001,7 +2060,7 @@ index a060f66..c61d689 100644 opt_on = *(int *) optval; break; default: -@@ -2315,7 +3404,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse +@@ -2315,7 +3457,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE))) return ERR(EINVAL); @@ -2010,7 +2069,7 @@ index a060f66..c61d689 100644 if (prot & PROT_WRITE) { iomr = rs_get_iomap_mr(rs); access |= IBV_ACCESS_REMOTE_WRITE; -@@ -2349,7 +3438,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse +@@ -2349,7 +3491,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse dlist_insert_tail(&iomr->entry, &rs->iomap_list); } out: @@ -2019,7 +2078,7 @@ index a060f66..c61d689 100644 return offset; } -@@ -2361,7 +3450,7 @@ int riounmap(int socket, void *buf, size_t len) +@@ -2361,7 +3503,7 @@ int riounmap(int socket, void *buf, size_t len) int ret = 0; rs = idm_at(&idm, socket); @@ -2028,7 +2087,7 @@ index a060f66..c61d689 100644 for (entry = rs->iomap_list.next; entry != &rs->iomap_list; entry = entry->next) { -@@ -2382,7 +3471,7 @@ int riounmap(int socket, void *buf, size_t len) +@@ -2382,7 +3524,7 @@ int riounmap(int socket, void *buf, size_t len) } ret = ERR(EINVAL); out: @@ -2037,7 +2096,7 @@ index a060f66..c61d689 100644 return ret; } -@@ -2426,7 +3515,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla +@@ -2426,7 +3568,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla rs_conn_can_send); if (ret) break; @@ -2046,7 +2105,7 @@ index a060f66..c61d689 100644 ret = ERR(ECONNRESET); break; } -@@ -2476,3 +3565,269 @@ out: +@@ -2476,3 +3618,296 @@ out: return (ret && left == count) ? ret : count - left; } @@ -2090,9 +2149,11 @@ index a060f66..c61d689 100644 + } + + svc_rss[++svc_cnt] = rs; ++ printf("%s rs %p\n", __func__, rs); + svc_fds[svc_cnt].fd = rs->udp_sock; + svc_fds[svc_cnt].events = POLLIN; + svc_fds[svc_cnt].revents = 0; ++ printf("add rs udp sock %d\n",rs->udp_sock); + return 0; +} + @@ -2116,6 +2177,7 @@ index a060f66..c61d689 100644 + struct rs_svc_msg msg; + + read(svc_sock[1], &msg, sizeof msg); ++ printf("%s op %d\n",__func__, msg.op); + switch (msg.op) { + case RS_SVC_INSERT: + msg.status = rs_svc_add_rs(msg.rs); @@ -2127,6 +2189,7 @@ index a060f66..c61d689 100644 + msg.status = ENOTSUP; + break; + } ++ printf("%s status %d\n",__func__, msg.status); + write(svc_sock[1], &msg, sizeof msg); +} + @@ -2160,6 +2223,7 @@ index a060f66..c61d689 100644 + struct ibv_ah_attr attr; + int ret; + ++ printf("%s\n",__func__); + if (dest->ah) { + fastlock_acquire(&rs->slock); + ibv_destroy_ah(dest->ah); @@ -2211,7 +2275,18 @@ index a060f66..c61d689 100644 +static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, + union socket_addr *addr) +{ -+ return (udp_hdr->tag == DS_UDP_TAG) && ++printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag, ++ udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length); ++ ++printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n", ++udp_hdr->tag == ntohl(DS_UDP_TAG), ++ udp_hdr->version == 4, addr->sa.sa_family == AF_INET, ++ udp_hdr->length == DS_UDP_IPV4_HDR_LEN, ++ udp_hdr->version == 6, addr->sa.sa_family == AF_INET6, ++ udp_hdr->length == DS_UDP_IPV6_HDR_LEN); ++ ++ ++ return (udp_hdr->tag == ntohl(DS_UDP_TAG)) && + ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET && + udp_hdr->length == DS_UDP_IPV4_HDR_LEN) || + (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 && @@ -2226,6 +2301,7 @@ index a060f66..c61d689 100644 + struct ibv_sge sge; + uint64_t offset; + ++ printf("%s\n",__func__); + if (!ds_can_send(rs)) { + if (ds_get_comp(rs, 0, ds_can_send)) + return; @@ -2254,7 +2330,9 @@ index a060f66..c61d689 100644 + socklen_t addrlen = sizeof addr; + int len, ret; + ++ printf("%s\n",__func__); + ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen); ++ printf("%s recvfrom %d\n",__func__, ret); + if (ret < DS_UDP_IPV4_HDR_LEN) + return; + @@ -2262,10 +2340,12 @@ index a060f66..c61d689 100644 + if (!rs_svc_valid_udp_hdr(udp_hdr, &addr)) + return; + ++ printf("%s valid hdr\n",__func__); + len = ret - udp_hdr->length; + udp_hdr->tag = ntohl(udp_hdr->tag); + udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF; + ret = ds_get_dest(rs, &addr.sa, addrlen, &dest); ++ printf("%s ds_get_dest %d\n",__func__, ret); + if (ret) + return; + @@ -2277,10 +2357,12 @@ index a060f66..c61d689 100644 + cur_dest = rs->conn_dest; + if (udp_hdr->op == RS_OP_DATA) { + rs->conn_dest = &dest->qp->dest; ++ printf("%s forwarding msg\n",__func__); + rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); + } + + rs->conn_dest = dest; ++ printf("%s sending resp\n",__func__); + ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL); + rs->conn_dest = cur_dest; + fastlock_release(&rs->slock); @@ -2291,6 +2373,7 @@ index a060f66..c61d689 100644 + struct rs_svc_msg msg; + int i, ret; + ++ printf("%s\n",__func__); + ret = rs_svc_grow_sets(); + if (ret) { + msg.status = ret; @@ -2301,10 +2384,13 @@ index a060f66..c61d689 100644 + svc_fds[0].fd = svc_sock[1]; + svc_fds[0].events = POLLIN; + do { ++ printf("%s svc cnt %d\n",__func__, svc_cnt); + for (i = 0; i <= svc_cnt; i++) + svc_fds[i].revents = 0; + ++ printf("%s poll\n",__func__); + poll(svc_fds, svc_cnt + 1, -1); ++ printf("%s poll done\n",__func__); + if (svc_fds[0].revents) + rs_svc_process_sock(); + @@ -2312,7 +2398,7 @@ index a060f66..c61d689 100644 + if (svc_fds[i].revents) + rs_svc_process_rs(svc_rss[i]); + } -+ } while (svc_cnt > 1); ++ } while (svc_cnt >= 1); + + return NULL; +} diff --git a/patches/refresh-temp b/patches/refresh-temp deleted file mode 100644 index c15e7fc6..00000000 --- a/patches/refresh-temp +++ /dev/null @@ -1,676 +0,0 @@ -Bottom: f1822f3bbe2c9b92b5e2ca8b4e5c3cece427c5ff -Top: 136936c0a82503ee0da9daccd8b948cd09e58b64 -Author: Sean Hefty -Date: 2012-12-12 16:49:44 -0800 - -Refresh of dsocket - ---- - -diff --git a/src/cma.c b/src/cma.c -index 0f58966..ff9b426 100755 ---- a/src/cma.c -+++ b/src/cma.c -@@ -2238,6 +2238,7 @@ int ucma_max_qpsize(struct rdma_cm_id *id) - if (id && id_priv->cma_dev) { - max_size = id_priv->cma_dev->max_qpsize; - } else { -+ ucma_init(); - for (i = 0; i < cma_dev_cnt; i++) { - if (!max_size || max_size > cma_dev_array[i].max_qpsize) - max_size = cma_dev_array[i].max_qpsize; -diff --git a/src/rsocket.c b/src/rsocket.c -index c61d689..6fa4c68 100644 ---- a/src/rsocket.c -+++ b/src/rsocket.c -@@ -399,6 +399,7 @@ static int rs_add_to_svc(struct rsocket *rs) - - msg.op = RS_SVC_INSERT; - msg.status = EINVAL; -+ printf("%s rs %p\n", __func__, rs); - msg.rs = rs; - write(svc_sock[0], &msg, sizeof msg); - read(svc_sock[0], &msg, sizeof msg); -@@ -602,7 +603,9 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg) - if (!ret && rs->state < rs_connected) - ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); - } else { -+ printf("%s set nonblock\n", __func__); - ret = fcntl(rs->epfd, F_SETFL, arg); -+ printf("%s fcntl %d\n", __func__, ret); - - if (!ret && rs->qp_list) { - qp = rs->qp_list; -@@ -640,6 +643,8 @@ static void ds_set_qp_size(struct rsocket *rs) - { - uint16_t max_size; - -+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, -+ rs->rq_size, rs->rbuf_size); - max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE); - - if (rs->sq_size > max_size) -@@ -656,6 +661,8 @@ static void ds_set_qp_size(struct rsocket *rs) - rs->sq_size = rs->sbuf_size / RS_SNDLOWAT; - else - rs->sbuf_size = rs->sq_size * RS_SNDLOWAT; -+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, -+ rs->rq_size, rs->rbuf_size); - } - - static int rs_init_bufs(struct rsocket *rs) -@@ -728,15 +735,18 @@ static int ds_init_bufs(struct ds_qp *qp) - static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) - { - cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs); -+ printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel); - if (!cm_id->recv_cq_channel) - return -1; - - cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size, - cm_id, cm_id->recv_cq_channel, 0); -+ printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size); - if (!cm_id->recv_cq) - goto err1; - - if (rs->fd_flags & O_NONBLOCK) { -+ printf("%s set nonblock\n", __func__); - if (rs_set_nonblocking(rs, O_NONBLOCK)) - goto err2; - } -@@ -876,6 +886,8 @@ static void ds_free_qp(struct ds_qp *qp) - - static void ds_free(struct rsocket *rs) - { -+ struct ds_qp *qp; -+ - if (rs->state & (rs_readable | rs_writable)) - rs_remove_from_svc(rs); - -@@ -888,12 +900,9 @@ static void ds_free(struct rsocket *rs) - if (rs->dmsg) - free(rs->dmsg); - -- if (rs->smsg_free) -- free(rs->smsg_free); -- -- while (rs->qp_list) { -- ds_remove_qp(rs, rs->qp_list); -- ds_free_qp(rs->qp_list); -+ while ((qp = rs->qp_list)) { -+ ds_remove_qp(rs, qp); -+ ds_free_qp(qp); - } - - if (rs->epfd >= 0) -@@ -1016,6 +1025,40 @@ static int ds_init(struct rsocket *rs, int domain) - return 0; - } - -+static int ds_init_ep(struct rsocket *rs) -+{ -+ struct ds_smsg *msg; -+ int i, ret; -+ -+ ds_set_qp_size(rs); -+ -+ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT); -+ if (!rs->sbuf) -+ return ERR(ENOMEM); -+ -+ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg)); -+ if (!rs->dmsg) -+ return ERR(ENOMEM); -+ -+ rs->sqe_avail = rs->sq_size; -+ rs->rqe_avail = rs->rq_size; -+ -+ rs->smsg_free = (struct ds_smsg *) rs->sbuf; -+ msg = rs->smsg_free; -+ for (i = 0; i < rs->sq_size - 1; i++) { -+ msg->next = (void *) msg + RS_SNDLOWAT; -+ msg = msg->next; -+ } -+ msg->next = NULL; -+ -+ ret = rs_add_to_svc(rs); -+ if (ret) -+ return ret; -+ -+ rs->state = rs_readable | rs_writable; -+ return 0; -+} -+ - int rsocket(int domain, int type, int protocol) - { - struct rsocket *rs; -@@ -1040,6 +1083,7 @@ int rsocket(int domain, int type, int protocol) - rs->cm_id->route.addr.src_addr.sa_family = domain; - index = rs->cm_id->channel->fd; - } else { -+ printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size); - ret = ds_init(rs, domain); - if (ret) - goto err; -@@ -1069,12 +1113,12 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) - if (!ret) - rs->state = rs_bound; - } else { -- ret = bind(rs->udp_sock, addr, addrlen); -- if (!ret) { -- ret = rs_add_to_svc(rs); -- if (!ret) -- rs->state = rs_readable | rs_writable; -+ if (rs->state == rs_init) { -+ ret = ds_init_ep(rs); -+ if (ret) -+ return ret; - } -+ ret = bind(rs->udp_sock, addr, addrlen); - } - return ret; - } -@@ -1256,41 +1300,6 @@ connected: - return ret; - } - --static int ds_init_ep(struct rsocket *rs) --{ -- struct ds_smsg *msg; -- int i, ret; -- -- ds_set_qp_size(rs); -- -- rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT); -- if (!rs->sbuf) -- return ERR(ENOMEM); -- -- rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg)); -- if (!rs->dmsg) -- return ERR(ENOMEM); -- -- rs->sbuf_bytes_avail = rs->sbuf_size; -- rs->sqe_avail = rs->sq_size; -- rs->rqe_avail = rs->rq_size; -- -- rs->smsg_free = (struct ds_smsg *) rs->sbuf; -- msg = rs->smsg_free; -- for (i = 0; i < rs->sq_size - 1; i++) { -- msg->next = (void *) msg + i * RS_SNDLOWAT; -- msg = msg->next; -- } -- msg->next = NULL; -- -- ret = rs_add_to_svc(rs); -- if (ret) -- return ret; -- -- rs->state = rs_readable | rs_writable; -- return 0; --} -- - static int rs_any_addr(const union socket_addr *addr) - { - if (addr->sa.sa_family == AF_INET) { -@@ -1374,38 +1383,44 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr, - } - - static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, -- socklen_t addrlen, struct ds_qp **qp) -+ socklen_t addrlen, struct ds_qp **new_qp) - { -+ struct ds_qp *qp; - struct ibv_qp_init_attr qp_attr; - struct epoll_event event; - int i, ret; - -- *qp = calloc(1, sizeof(struct ds_qp)); -- if (!*qp) -+printf("%s\n", __func__); -+ qp = calloc(1, sizeof(*qp)); -+ if (!qp) - return ERR(ENOMEM); - -- (*qp)->rs = rs; -- ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP); -+ qp->rs = rs; -+ ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP); -+ printf("%s rdma_create_id %d\n", __func__, ret); - if (ret) - goto err; - -- ds_format_hdr(&(*qp)->hdr, src_addr); -- ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa); -+ ds_format_hdr(&qp->hdr, src_addr); -+ ret = rdma_bind_addr(qp->cm_id, &src_addr->sa); -+ printf("%s rdma_bind_addr %d\n", __func__, ret); - if (ret) - goto err; - -- ret = ds_init_bufs(*qp); -+ ret = ds_init_bufs(qp); -+ printf("%s ds_init_bufs %d\n", __func__, ret); - if (ret) - goto err; - -- ret = rs_create_cq(rs, (*qp)->cm_id); -+ ret = rs_create_cq(rs, qp->cm_id); -+ printf("%s rs_create_cq %d\n", __func__, ret); - if (ret) - goto err; - - memset(&qp_attr, 0, sizeof qp_attr); - qp_attr.qp_context = qp; -- qp_attr.send_cq = rs->cm_id->send_cq; -- qp_attr.recv_cq = rs->cm_id->recv_cq; -+ qp_attr.send_cq = qp->cm_id->send_cq; -+ qp_attr.recv_cq = qp->cm_id->recv_cq; - qp_attr.qp_type = IBV_QPT_UD; - qp_attr.sq_sig_all = 1; - qp_attr.cap.max_send_wr = rs->sq_size; -@@ -1413,31 +1428,35 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, - qp_attr.cap.max_send_sge = 2; - qp_attr.cap.max_recv_sge = 1; - qp_attr.cap.max_inline_data = rs->sq_inline; -- ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr); -+ ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr); -+ printf("%s rdma_create_qp %d\n", __func__, ret); - if (ret) - goto err; - -- ret = ds_add_qp_dest(*qp, src_addr, addrlen); -+ ret = ds_add_qp_dest(qp, src_addr, addrlen); -+ printf("%s ds_add_qp_dest %d\n", __func__, ret); - if (ret) - goto err; - - event.events = EPOLLIN; -- event.data.ptr = *qp; -+ event.data.ptr = qp; - ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, -- (*qp)->cm_id->recv_cq_channel->fd, &event); -+ qp->cm_id->recv_cq_channel->fd, &event); -+ printf("%s epoll_ctl %d\n", __func__, ret); - if (ret) - goto err; - - for (i = 0; i < rs->rq_size; i++) { -- ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT); -+ ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT); - if (ret) - goto err; - } - -- ds_insert_qp(rs, *qp); -+ ds_insert_qp(rs, qp); -+ *new_qp = qp; - return 0; - err: -- ds_free_qp(*qp); -+ ds_free_qp(qp); - return ret; - } - -@@ -1464,38 +1483,42 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr, - union socket_addr src_addr; - socklen_t src_len; - struct ds_qp *qp; -+ struct ds_dest **tdest, *new_dest; - int ret = 0; - -+ printf("%s \n", __func__); - fastlock_acquire(&rs->map_lock); -- dest = tfind(addr, &rs->dest_map, ds_compare_addr); -- if (dest) -- goto out; -- -- if (rs->state == rs_init) { -- ret = ds_init_ep(rs); -- if (ret) -- goto out; -- } -+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr); -+ printf("%s tfind %p\n", __func__, dest); -+ if (tdest) -+ goto found; - - ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len); -+ printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno)); - if (ret) - goto out; - - ret = ds_get_qp(rs, &src_addr, src_len, &qp); -+ printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno)); - if (ret) - goto out; - -- if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) { -- *dest = calloc(1, sizeof(struct ds_dest)); -- if (!*dest) { -+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr); -+ if (!tdest) { -+ printf("%s adding dest into map\n", __func__); -+ new_dest = calloc(1, sizeof(*new_dest)); -+ if (!new_dest) { - ret = ERR(ENOMEM); - goto out; - } - -- memcpy(&(*dest)->addr, addr, addrlen); -- (*dest)->qp = qp; -- tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr); -+ memcpy(&new_dest->addr, addr, addrlen); -+ new_dest->qp = qp; -+ tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr); - } -+ -+found: -+ *dest = *tdest; - out: - fastlock_release(&rs->map_lock); - return ret; -@@ -1511,10 +1534,19 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) - memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); - ret = rs_do_connect(rs); - } else { -+ printf("%s\n", __func__); -+ if (rs->state == rs_init) { -+ ret = ds_init_ep(rs); -+ if (ret) -+ return ret; -+ } -+ - fastlock_acquire(&rs->slock); - ret = connect(rs->udp_sock, addr, addrlen); -+ printf("%s connect %d %s\n", __func__, ret, strerror(errno)); - if (!ret) - ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); -+ printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno)); - fastlock_release(&rs->slock); - } - return ret; -@@ -1983,14 +2015,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r - do { - ds_poll_cqs(rs); - if (test(rs)) { -- printf("%s test succeeded\n", __func__); -+// printf("%s test succeeded\n", __func__); - ret = 0; - break; - } else if (nonblock) { - ret = ERR(EWOULDBLOCK); -- printf("%s nonblocking \n", __func__); -+// printf("%s nonblocking \n", __func__); - } else if (!rs->cq_armed) { -- printf("%s req notify \n", __func__); -+// printf("%s req notify \n", __func__); - ds_req_notify_cqs(rs); - rs->cq_armed = 1; - } else { -@@ -1998,14 +2030,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r - fastlock_release(&rs->cq_lock); - - ret = ds_get_cq_event(rs); -- printf("%s get event ret %d %s\n", __func__, ret, strerror(errno)); -+// printf("%s get event ret %d %s\n", __func__, ret, strerror(errno)); - fastlock_release(&rs->cq_wait_lock); - fastlock_acquire(&rs->cq_lock); - } - } while (!ret); - - fastlock_release(&rs->cq_lock); -- printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); -+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); - return ret; - } - -@@ -2017,7 +2049,7 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc - - do { - ret = ds_process_cqs(rs, 1, test); -- printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); -+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); - if (!ret || nonblock || errno != EWOULDBLOCK) - return ret; - -@@ -2132,16 +2164,15 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags, - struct ds_header *hdr; - int ret; - --ret = 0; -- printf("%s \n", __func__); -+// printf("%s \n", __func__); - if (!(rs->state & rs_readable)) - return ERR(EINVAL); - - if (!rs_have_rdata(rs)) { -- printf("%s need rdata \n", __func__); -+// printf("%s need rdata \n", __func__); - ret = ds_get_comp(rs, rs_nonblocking(rs, flags), - rs_have_rdata); -- printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno)); -+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); - if (ret) - return ret; - } -@@ -2161,6 +2192,7 @@ ret = 0; - rs->rmsg_head = 0; - } - -+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); - return len; - } - -@@ -2392,12 +2424,14 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, - struct ds_udp_header hdr; - struct msghdr msg; - struct iovec miov[8]; -+ ssize_t ret; - -+// printf("%s\n", __func__); - if (iovcnt > 8) - return ERR(ENOTSUP); - - hdr.tag = htonl(DS_UDP_TAG); -- hdr.version = 1; -+ hdr.version = rs->conn_dest->qp->hdr.version; - hdr.op = op; - hdr.reserved = 0; - hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF); -@@ -2419,18 +2453,24 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, - msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa); - msg.msg_iov = miov; - msg.msg_iovlen = iovcnt + 1; -- return sendmsg(rs->udp_sock, &msg, flags); -+// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen); -+ ret = sendmsg(rs->udp_sock, &msg, flags); -+ printf("%s ret %d %s\n", __func__, ret, strerror(errno)); -+ return ret > 0 ? ret - sizeof hdr : ret; - } - - static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, - int flags, uint8_t op) - { - struct iovec iov; -+ printf("%s\n", __func__); - if (buf && len) { -+// printf("%s have buffer\n", __func__); - iov.iov_base = (void *) buf; - iov.iov_len = len; - return ds_sendv_udp(rs, &iov, 1, flags, op); - } else { -+// printf("%s no buffer\n", __func__); - return ds_sendv_udp(rs, NULL, 0, flags, op); - } - } -@@ -2442,6 +2482,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) - uint64_t offset; - int ret = 0; - -+ printf("%s\n", __func__); - if (!rs->conn_dest->ah) - return ds_send_udp(rs, buf, len, flags, RS_OP_DATA); - -@@ -2563,6 +2604,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, - struct rsocket *rs; - int ret; - -+ printf("%s\n", __func__); - rs = idm_at(&idm, socket); - if (rs->type == SOCK_STREAM) { - if (dest_addr || addrlen) -@@ -2571,12 +2613,23 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, - return rsend(socket, buf, len, flags); - } - -+ if (rs->state == rs_init) { -+ ret = ds_init_ep(rs); -+ if (ret) -+ return ret; -+ } -+ - fastlock_acquire(&rs->slock); -+ printf("%s check conn dest\n", __func__); - if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) { -+ printf("%s need conn dest\n", __func__); - ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest); - if (ret) - goto out; - } -+ else -+ printf("%s connected\n", __func__); -+ - ret = dsend(rs, buf, len, flags); - out: - fastlock_release(&rs->slock); -@@ -3605,9 +3658,11 @@ static int rs_svc_add_rs(struct rsocket *rs) - } - - svc_rss[++svc_cnt] = rs; -+ printf("%s rs %p\n", __func__, rs); - svc_fds[svc_cnt].fd = rs->udp_sock; - svc_fds[svc_cnt].events = POLLIN; - svc_fds[svc_cnt].revents = 0; -+ printf("add rs udp sock %d\n",rs->udp_sock); - return 0; - } - -@@ -3631,6 +3686,7 @@ static void rs_svc_process_sock(void) - struct rs_svc_msg msg; - - read(svc_sock[1], &msg, sizeof msg); -+ printf("%s op %d\n",__func__, msg.op); - switch (msg.op) { - case RS_SVC_INSERT: - msg.status = rs_svc_add_rs(msg.rs); -@@ -3642,6 +3698,7 @@ static void rs_svc_process_sock(void) - msg.status = ENOTSUP; - break; - } -+ printf("%s status %d\n",__func__, msg.status); - write(svc_sock[1], &msg, sizeof msg); - } - -@@ -3675,6 +3732,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t - struct ibv_ah_attr attr; - int ret; - -+ printf("%s\n",__func__); - if (dest->ah) { - fastlock_acquire(&rs->slock); - ibv_destroy_ah(dest->ah); -@@ -3726,7 +3784,18 @@ out: - static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, - union socket_addr *addr) - { -- return (udp_hdr->tag == DS_UDP_TAG) && -+printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag, -+ udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length); -+ -+printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n", -+udp_hdr->tag == ntohl(DS_UDP_TAG), -+ udp_hdr->version == 4, addr->sa.sa_family == AF_INET, -+ udp_hdr->length == DS_UDP_IPV4_HDR_LEN, -+ udp_hdr->version == 6, addr->sa.sa_family == AF_INET6, -+ udp_hdr->length == DS_UDP_IPV6_HDR_LEN); -+ -+ -+ return (udp_hdr->tag == ntohl(DS_UDP_TAG)) && - ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET && - udp_hdr->length == DS_UDP_IPV4_HDR_LEN) || - (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 && -@@ -3741,6 +3810,7 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len, - struct ibv_sge sge; - uint64_t offset; - -+ printf("%s\n",__func__); - if (!ds_can_send(rs)) { - if (ds_get_comp(rs, 0, ds_can_send)) - return; -@@ -3769,7 +3839,9 @@ static void rs_svc_process_rs(struct rsocket *rs) - socklen_t addrlen = sizeof addr; - int len, ret; - -+ printf("%s\n",__func__); - ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen); -+ printf("%s recvfrom %d\n",__func__, ret); - if (ret < DS_UDP_IPV4_HDR_LEN) - return; - -@@ -3777,10 +3849,12 @@ static void rs_svc_process_rs(struct rsocket *rs) - if (!rs_svc_valid_udp_hdr(udp_hdr, &addr)) - return; - -+ printf("%s valid hdr\n",__func__); - len = ret - udp_hdr->length; - udp_hdr->tag = ntohl(udp_hdr->tag); - udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF; - ret = ds_get_dest(rs, &addr.sa, addrlen, &dest); -+ printf("%s ds_get_dest %d\n",__func__, ret); - if (ret) - return; - -@@ -3792,10 +3866,12 @@ static void rs_svc_process_rs(struct rsocket *rs) - cur_dest = rs->conn_dest; - if (udp_hdr->op == RS_OP_DATA) { - rs->conn_dest = &dest->qp->dest; -+ printf("%s forwarding msg\n",__func__); - rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); - } - - rs->conn_dest = dest; -+ printf("%s sending resp\n",__func__); - ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL); - rs->conn_dest = cur_dest; - fastlock_release(&rs->slock); -@@ -3806,6 +3882,7 @@ static void *rs_svc_run(void *arg) - struct rs_svc_msg msg; - int i, ret; - -+ printf("%s\n",__func__); - ret = rs_svc_grow_sets(); - if (ret) { - msg.status = ret; -@@ -3816,10 +3893,13 @@ static void *rs_svc_run(void *arg) - svc_fds[0].fd = svc_sock[1]; - svc_fds[0].events = POLLIN; - do { -+ printf("%s svc cnt %d\n",__func__, svc_cnt); - for (i = 0; i <= svc_cnt; i++) - svc_fds[i].revents = 0; - -+ printf("%s poll\n",__func__); - poll(svc_fds, svc_cnt + 1, -1); -+ printf("%s poll done\n",__func__); - if (svc_fds[0].revents) - rs_svc_process_sock(); - -@@ -3827,7 +3907,7 @@ static void *rs_svc_run(void *arg) - if (svc_fds[i].revents) - rs_svc_process_rs(svc_rss[i]); - } -- } while (svc_cnt > 1); -+ } while (svc_cnt >= 1); - - return NULL; - } -- 2.46.0