From: Sean Hefty Date: Sat, 15 Dec 2012 08:15:42 +0000 (-0800) Subject: Refresh of dsocket X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=72b8742fa8779d5b6c6c21850867ce4c21b0c99c;p=~shefty%2Flibrdmacm.git Refresh of dsocket --- diff --git a/src/rsocket.c b/src/rsocket.c index 04f00dda..aca705bc 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -73,6 +73,14 @@ enum { struct rsocket; + +#define PRINTADDR(a) \ +printf("%s port %x ip %x\n", __func__, \ + ((struct sockaddr_in *)a)->sin_port, \ + ((struct sockaddr_in *)a)->sin_addr.s_addr) + + + struct rs_svc_msg { uint32_t op; uint32_t status; @@ -399,7 +407,6 @@ static int rs_add_to_svc(struct rsocket *rs) msg.op = RS_SVC_INSERT; msg.status = EINVAL; - printf("%s rs %p\n", __func__, rs); msg.rs = rs; write(svc_sock[0], &msg, sizeof msg); read(svc_sock[0], &msg, sizeof msg); @@ -603,10 +610,7 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg) if (!ret && rs->state < rs_connected) ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); } else { - printf("%s set nonblock\n", __func__); ret = fcntl(rs->epfd, F_SETFL, arg); - printf("%s fcntl %d\n", __func__, ret); - if (!ret && rs->qp_list) { qp = rs->qp_list; do { @@ -643,8 +647,6 @@ static void ds_set_qp_size(struct rsocket *rs) { uint16_t max_size; - printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, - rs->rq_size, rs->rbuf_size); max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE); if (rs->sq_size > max_size) @@ -661,8 +663,6 @@ static void ds_set_qp_size(struct rsocket *rs) rs->sq_size = rs->sbuf_size / RS_SNDLOWAT; else rs->sbuf_size = rs->sq_size * RS_SNDLOWAT; - printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, - rs->rq_size, rs->rbuf_size); } static int rs_init_bufs(struct rsocket *rs) @@ -717,7 +717,8 @@ static int rs_init_bufs(struct rsocket *rs) static int ds_init_bufs(struct ds_qp *qp) { - qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf)); + qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), + sizeof(*qp->rbuf)); if (!qp->rbuf) return ERR(ENOMEM); @@ -725,7 +726,8 @@ static int ds_init_bufs(struct ds_qp *qp) if (!qp->smr) return -1; - qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size); + qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size + + sizeof(struct ibv_grh)); if (!qp->rmr) return -1; @@ -735,20 +737,19 @@ static int ds_init_bufs(struct ds_qp *qp) static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) { cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs); - printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel); if (!cm_id->recv_cq_channel) return -1; cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size, cm_id, cm_id->recv_cq_channel, 0); - printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size); if (!cm_id->recv_cq) goto err1; if (rs->fd_flags & O_NONBLOCK) { - printf("%s set nonblock\n", __func__); - if (rs_set_nonblocking(rs, O_NONBLOCK)) + if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK)) goto err2; + } else { + ibv_req_notify_cq(cm_id->recv_cq, 0); } cm_id->send_cq_channel = cm_id->recv_cq_channel; @@ -776,19 +777,22 @@ static inline int rs_post_recv(struct rsocket *rs) return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad)); } -static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf) +static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset) { struct ibv_recv_wr wr, *bad; - struct ibv_sge sge; + struct ibv_sge sge[2]; - sge.addr = (uintptr_t) buf; - sge.length = RS_SNDLOWAT; - sge.lkey = qp->rmr->lkey; + sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size; + sge[0].length = sizeof(struct ibv_grh); + sge[0].lkey = qp->rmr->lkey; + sge[1].addr = (uintptr_t) qp->rbuf + offset; + sge[1].length = RS_SNDLOWAT; + sge[1].lkey = qp->rmr->lkey; - wr.wr_id = ds_recv_wr_id((uint32_t) ((uint8_t *) buf - rs->rbuf)); + wr.wr_id = ds_recv_wr_id(offset); wr.next = NULL; - wr.sg_list = &sge; - wr.num_sge = 1; + wr.sg_list = sge; + wr.num_sge = 2; return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad)); } @@ -1083,7 +1087,6 @@ int rsocket(int domain, int type, int protocol) rs->cm_id->route.addr.src_addr.sa_family = domain; index = rs->cm_id->channel->fd; } else { - printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size); ret = ds_init(rs, domain); if (ret) goto err; @@ -1174,7 +1177,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) } if (rs->fd_flags & O_NONBLOCK) - rs_set_nonblocking(new_rs, O_NONBLOCK); + fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); ret = rs_create_ep(new_rs); if (ret) @@ -1276,7 +1279,7 @@ connected: break; case rs_accepting: if (!(rs->fd_flags & O_NONBLOCK)) - rs_set_nonblocking(rs, 0); + fcntl(rs->cm_id->channel->fd, F_SETFL, 0); ret = ucma_complete(rs->cm_id); if (ret) @@ -1318,8 +1321,10 @@ static int ds_get_src_addr(struct rsocket *rs, int sock, ret; uint16_t port; +// printf("dest: "); PRINTADDR(dest_addr); *src_len = sizeof src_addr; ret = getsockname(rs->udp_sock, &src_addr->sa, src_len); +// printf("src: "); PRINTADDR(src_addr); if (ret || !rs_any_addr(src_addr)) return ret; @@ -1335,6 +1340,7 @@ static int ds_get_src_addr(struct rsocket *rs, *src_len = sizeof src_addr; ret = getsockname(sock, &src_addr->sa, src_len); src_addr->sin.sin_port = port; +// printf("selected src: "); out: close(sock); return ret; @@ -1343,6 +1349,7 @@ out: static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr) { if (addr->sa.sa_family == AF_INET) { + PRINTADDR(addr); hdr->version = 4; hdr->length = DS_IPV4_HDR_LEN; hdr->port = addr->sin.sin_port; @@ -1363,6 +1370,7 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr, struct ibv_ah_attr attr; int ret; +// printf("%s\n", __func__); memcpy(&qp->dest.addr, addr, addrlen); qp->dest.qp = qp; qp->dest.qpn = qp->cm_id->qp->qp_num; @@ -1375,6 +1383,8 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr, attr.dlid = port_attr.lid; attr.port_num = qp->cm_id->port_num; qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr); +// printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid, +// attr.port_num, qp->dest.qpn); if (!qp->dest.ah) return ERR(ENOMEM); @@ -1390,30 +1400,26 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, struct epoll_event event; int i, ret; -printf("%s\n", __func__); + PRINTADDR(src_addr); qp = calloc(1, sizeof(*qp)); if (!qp) return ERR(ENOMEM); qp->rs = rs; ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP); - printf("%s rdma_create_id %d\n", __func__, ret); if (ret) goto err; ds_format_hdr(&qp->hdr, src_addr); ret = rdma_bind_addr(qp->cm_id, &src_addr->sa); - printf("%s rdma_bind_addr %d\n", __func__, ret); if (ret) goto err; ret = ds_init_bufs(qp); - printf("%s ds_init_bufs %d\n", __func__, ret); if (ret) goto err; ret = rs_create_cq(rs, qp->cm_id); - printf("%s rs_create_cq %d\n", __func__, ret); if (ret) goto err; @@ -1425,16 +1431,14 @@ printf("%s\n", __func__); qp_attr.sq_sig_all = 1; qp_attr.cap.max_send_wr = rs->sq_size; qp_attr.cap.max_recv_wr = rs->rq_size; - qp_attr.cap.max_send_sge = 2; - qp_attr.cap.max_recv_sge = 1; + qp_attr.cap.max_send_sge = 1; + qp_attr.cap.max_recv_sge = 2; qp_attr.cap.max_inline_data = rs->sq_inline; ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr); - printf("%s rdma_create_qp %d\n", __func__, ret); if (ret) goto err; ret = ds_add_qp_dest(qp, src_addr, addrlen); - printf("%s ds_add_qp_dest %d\n", __func__, ret); if (ret) goto err; @@ -1442,12 +1446,11 @@ printf("%s\n", __func__); event.data.ptr = qp; ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, qp->cm_id->recv_cq_channel->fd, &event); - printf("%s epoll_ctl %d\n", __func__, ret); if (ret) goto err; for (i = 0; i < rs->rq_size; i++) { - ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT); + ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT); if (ret) goto err; } @@ -1486,26 +1489,23 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr, struct ds_dest **tdest, *new_dest; int ret = 0; - printf("%s \n", __func__); + PRINTADDR(addr); fastlock_acquire(&rs->map_lock); tdest = tfind(addr, &rs->dest_map, ds_compare_addr); - printf("%s tfind %p\n", __func__, dest); if (tdest) goto found; ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len); - printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno)); +// printf("get src: "); PRINTADDR(&src_addr); if (ret) goto out; ret = ds_get_qp(rs, &src_addr, src_len, &qp); - printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno)); if (ret) goto out; tdest = tfind(addr, &rs->dest_map, ds_compare_addr); if (!tdest) { - printf("%s adding dest into map\n", __func__); new_dest = calloc(1, sizeof(*new_dest)); if (!new_dest) { ret = ERR(ENOMEM); @@ -1534,7 +1534,6 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); ret = rs_do_connect(rs); } else { - printf("%s\n", __func__); if (rs->state == rs_init) { ret = ds_init_ep(rs); if (ret) @@ -1542,11 +1541,10 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) } fastlock_acquire(&rs->slock); + PRINTADDR(addr); ret = connect(rs->udp_sock, addr, addrlen); - printf("%s connect %d %s\n", __func__, ret, strerror(errno)); if (!ret) ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); - printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno)); fastlock_release(&rs->slock); } return ret; @@ -1605,6 +1603,8 @@ static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge, wr.wr.ud.ah = rs->conn_dest->ah; wr.wr.ud.remote_qpn = rs->conn_dest->qpn; wr.wr.ud.remote_qkey = RDMA_UDP_QKEY; +// printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah, +// rs->conn_dest->qpn); return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad)); } @@ -1894,10 +1894,12 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc return ret; } -static int ds_valid_recv(void *buf, uint32_t len) +static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc) { - struct ds_header *hdr = (struct ds_header *) buf; - return ((len >= sizeof(*hdr)) && + struct ds_header *hdr; + + hdr = (struct ds_header *) (qp->rbuf + ds_wr_offset(wc->wr_id)); + return ((wc->byte_len >= sizeof(struct ibv_grh) + sizeof(*hdr)) && ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) || (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN))); } @@ -1931,22 +1933,22 @@ static void ds_poll_cqs(struct rsocket *rs) if (ds_wr_is_recv(wc.wr_id)) { if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS && - ds_valid_recv(qp->rbuf + ds_wr_offset(wc.wr_id), - wc.byte_len)) { + ds_valid_recv(qp, &wc)) { rs->rqe_avail--; rmsg = &rs->dmsg[rs->rmsg_tail]; rmsg->qp = qp; rmsg->offset = ds_wr_offset(wc.wr_id); - rmsg->length = wc.byte_len; + rmsg->length = wc.byte_len - sizeof(struct ibv_grh); if (++rs->rmsg_tail == rs->rq_size + 1) rs->rmsg_tail = 0; } else { - ds_post_recv(rs, qp, qp->rbuf + - ds_wr_offset(wc.wr_id)); + printf("%s invalid recv\n", __func__); + ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id)); } } else { smsg = (struct ds_smsg *) (rs->sbuf + ds_wr_offset(wc.wr_id)); + printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free); smsg->next = rs->smsg_free; rs->smsg_free = smsg; rs->sqe_avail++; @@ -1986,18 +1988,17 @@ static int ds_get_cq_event(struct rsocket *rs) void *context; int ret; - printf("%s \n", __func__); if (!rs->cq_armed) return 0; +// printf("wait for epoll event\n"); ret = epoll_wait(rs->epfd, &event, 1, -1); - printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno)); +// printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno)); if (ret <= 0) return ret; qp = event.data.ptr; ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context); - printf("%s get cq event ret %d errno %s\n", __func__, ret, strerror(errno)); if (!ret) { ibv_ack_cq_events(qp->cm_id->recv_cq, 1); qp->cq_armed = 0; @@ -2164,15 +2165,15 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags, struct ds_header *hdr; int ret; -// printf("%s \n", __func__); + printf("%s \n", __func__); if (!(rs->state & rs_readable)) return ERR(EINVAL); if (!rs_have_rdata(rs)) { -// printf("%s need rdata \n", __func__); + printf("%s need rdata \n", __func__); ret = ds_get_comp(rs, rs_nonblocking(rs, flags), rs_have_rdata); -// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); + printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); if (ret) return ret; } @@ -2184,10 +2185,13 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags, memcpy(buf, (void *) hdr + hdr->length, len); if (addrlen) +{ ds_set_src(src_addr, addrlen, hdr); +PRINTADDR(src_addr); +} if (!(flags & MSG_PEEK)) { - ds_post_recv(rs, rmsg->qp, hdr); + ds_post_recv(rs, rmsg->qp, rmsg->offset); if (++rs->rmsg_head == rs->rq_size + 1) rs->rmsg_head = 0; } @@ -2444,7 +2448,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, } miov[0].iov_base = &hdr; - miov[0].iov_len = sizeof hdr; + miov[0].iov_len = hdr.length; if (iov && iovcnt) memcpy(&miov[1], iov, sizeof *iov * iovcnt); @@ -2455,7 +2459,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, msg.msg_iovlen = iovcnt + 1; // printf("%s iov cnt %d\n", __func__, msg.msg_iovlen); ret = sendmsg(rs->udp_sock, &msg, flags); - printf("%s ret %d %s\n", __func__, ret, strerror(errno)); return ret > 0 ? ret - sizeof hdr : ret; } @@ -2463,7 +2466,7 @@ static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags, uint8_t op) { struct iovec iov; - printf("%s\n", __func__); +// printf("%s\n", __func__); if (buf && len) { // printf("%s have buffer\n", __func__); iov.iov_base = (void *) buf; @@ -2503,6 +2506,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) sge.lkey = rs->conn_dest->qp->smr->lkey; offset = (uint8_t *) msg - rs->sbuf; + printf("%s - sending over QP\n", __func__); ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length)); return ret ? ret : len; } @@ -2604,7 +2608,8 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, struct rsocket *rs; int ret; - printf("%s\n", __func__); + PRINTADDR(dest_addr); + printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf)); rs = idm_at(&idm, socket); if (rs->type == SOCK_STREAM) { if (dest_addr || addrlen) @@ -2620,15 +2625,11 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, } fastlock_acquire(&rs->slock); - printf("%s check conn dest\n", __func__); if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) { - printf("%s need conn dest\n", __func__); ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest); if (ret) goto out; } - else - printf("%s connected\n", __func__); ret = dsend(rs, buf, len, flags); out: @@ -3195,7 +3196,8 @@ int rsetsockopt(int socket, int level, int optname, opt_on = *(int *) optval; break; case SO_RCVBUF: - if (!rs->rbuf) + if ((rs->type == SOCK_STREAM && !rs->rbuf) || + (rs->type == SOCK_DGRAM && !rs->qp_list)) rs->rbuf_size = (*(uint32_t *) optval) << 1; ret = 0; break; @@ -3658,11 +3660,9 @@ static int rs_svc_add_rs(struct rsocket *rs) } svc_rss[++svc_cnt] = rs; - printf("%s rs %p\n", __func__, rs); svc_fds[svc_cnt].fd = rs->udp_sock; svc_fds[svc_cnt].events = POLLIN; svc_fds[svc_cnt].revents = 0; - printf("add rs udp sock %d\n",rs->udp_sock); return 0; } @@ -3686,7 +3686,6 @@ static void rs_svc_process_sock(void) struct rs_svc_msg msg; read(svc_sock[1], &msg, sizeof msg); - printf("%s op %d\n",__func__, msg.op); switch (msg.op) { case RS_SVC_INSERT: msg.status = rs_svc_add_rs(msg.rs); @@ -3698,7 +3697,6 @@ static void rs_svc_process_sock(void) msg.status = ENOTSUP; break; } - printf("%s status %d\n",__func__, msg.status); write(svc_sock[1], &msg, sizeof msg); } @@ -3732,7 +3730,6 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t struct ibv_ah_attr attr; int ret; - printf("%s\n",__func__); if (dest->ah) { fastlock_acquire(&rs->slock); ibv_destroy_ah(dest->ah); @@ -3741,7 +3738,6 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t } ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps); - printf("%s rdma_create_id %d %s\n",__func__, ret, strerror(errno)); if (ret) return; @@ -3752,12 +3748,10 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t else saddr.sin6.sin6_port = 0; ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000); - printf("%s rdma_resolve_addr %d %s\n",__func__, ret, strerror(errno)); if (ret) goto out; ret = rdma_resolve_route(id, 2000); - printf("%s rdma_resolve_route %d %s\n",__func__, ret, strerror(errno)); if (ret) goto out; @@ -3776,12 +3770,9 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t attr.static_rate = id->route.path_rec->rate; attr.port_num = id->port_num; - printf("%s getting slock \n",__func__); fastlock_acquire(&rs->slock); - printf("%s why am I not here? \n",__func__); dest->qpn = qpn; dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr); - printf("%s ibv_create_ah %p %s\n",__func__, dest->ah, strerror(errno)); fastlock_release(&rs->slock); out: rdma_destroy_id(id); @@ -3790,17 +3781,6 @@ out: static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, union socket_addr *addr) { -printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag, - udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length); - -printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n", -udp_hdr->tag == ntohl(DS_UDP_TAG), - udp_hdr->version == 4, addr->sa.sa_family == AF_INET, - udp_hdr->length == DS_UDP_IPV4_HDR_LEN, - udp_hdr->version == 6, addr->sa.sa_family == AF_INET6, - udp_hdr->length == DS_UDP_IPV6_HDR_LEN); - - return (udp_hdr->tag == ntohl(DS_UDP_TAG)) && ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET && udp_hdr->length == DS_UDP_IPV4_HDR_LEN) || @@ -3816,7 +3796,7 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len, struct ibv_sge sge; uint64_t offset; - printf("%s\n",__func__); +// PRINTADDR(src); if (!ds_can_send(rs)) { if (ds_get_comp(rs, 0, ds_can_send)) return; @@ -3827,13 +3807,18 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len, rs->sqe_avail--; ds_format_hdr(&hdr, src); +// printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version, +// hdr.length, hdr.port); memcpy((void *) msg, &hdr, hdr.length); memcpy((void *) msg + hdr.length, buf, len); +// printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf)); sge.addr = (uintptr_t) msg; sge.length = hdr.length + len; sge.lkey = rs->conn_dest->qp->smr->lkey; offset = (uint8_t *) msg - rs->sbuf; +// printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version, +// ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port); ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length)); } @@ -3845,9 +3830,9 @@ static void rs_svc_process_rs(struct rsocket *rs) socklen_t addrlen = sizeof addr; int len, ret; - printf("%s\n",__func__); ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen); - printf("%s recvfrom %d\n",__func__, ret); +// PRINTADDR(&addr); +// printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8])); if (ret < DS_UDP_IPV4_HDR_LEN) return; @@ -3855,12 +3840,10 @@ static void rs_svc_process_rs(struct rsocket *rs) if (!rs_svc_valid_udp_hdr(udp_hdr, &addr)) return; - printf("%s valid hdr\n",__func__); len = ret - udp_hdr->length; udp_hdr->tag = ntohl(udp_hdr->tag); udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF; ret = ds_get_dest(rs, &addr.sa, addrlen, &dest); - printf("%s ds_get_dest %d\n",__func__, ret); if (ret) return; @@ -3868,16 +3851,15 @@ static void rs_svc_process_rs(struct rsocket *rs) rs_svc_create_ah(rs, dest, udp_hdr->qpn); /* to do: handle when dest local ip address doesn't match udp ip */ + if (udp_hdr->op != RS_OP_DATA) + return; + fastlock_acquire(&rs->slock); cur_dest = rs->conn_dest; - if (udp_hdr->op == RS_OP_DATA) { - rs->conn_dest = &dest->qp->dest; - printf("%s forwarding msg\n",__func__); - rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); - } + rs->conn_dest = &dest->qp->dest; + rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); rs->conn_dest = dest; - printf("%s sending resp\n",__func__); ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL); rs->conn_dest = cur_dest; fastlock_release(&rs->slock); @@ -3888,7 +3870,6 @@ static void *rs_svc_run(void *arg) struct rs_svc_msg msg; int i, ret; - printf("%s\n",__func__); ret = rs_svc_grow_sets(); if (ret) { msg.status = ret; @@ -3899,13 +3880,10 @@ static void *rs_svc_run(void *arg) svc_fds[0].fd = svc_sock[1]; svc_fds[0].events = POLLIN; do { - printf("%s svc cnt %d\n",__func__, svc_cnt); for (i = 0; i <= svc_cnt; i++) svc_fds[i].revents = 0; - printf("%s poll\n",__func__); poll(svc_fds, svc_cnt + 1, -1); - printf("%s poll done\n",__func__); if (svc_fds[0].revents) rs_svc_process_sock();