From 6712b56a64b82164fea31c72f51bd560bccfcb81 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Sat, 15 Dec 2012 00:15:43 -0800 Subject: [PATCH] refresh --- meta | 7 +- patches/dsocket | 357 +++++++++++----------- patches/refresh-temp | 687 ------------------------------------------- 3 files changed, 180 insertions(+), 871 deletions(-) delete mode 100644 patches/refresh-temp diff --git a/meta b/meta index f1676466..b2cef2fd 100644 --- a/meta +++ b/meta @@ -1,9 +1,8 @@ Version: 1 -Previous: bb35237871619ddf1d13e8ef9caa2f292a8738f7 -Head: 72b8742fa8779d5b6c6c21850867ce4c21b0c99c +Previous: 624ef320265b36eae7d53aa5dc7e886ae28ca6dd +Head: 89347e2cbf949879c15b3d25bb155a653006be08 Applied: - dsocket: 152ed1d8cb5d62607a834e4c94a36e0b69cd58f8 - refresh-temp: 72b8742fa8779d5b6c6c21850867ce4c21b0c99c + dsocket: 89347e2cbf949879c15b3d25bb155a653006be08 Unapplied: udpong: a42957509acbde99a7d8469e0819b7d75af51289 test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7 diff --git a/patches/dsocket b/patches/dsocket index 617543d6..fad4d8e5 100644 --- a/patches/dsocket +++ b/patches/dsocket @@ -1,5 +1,5 @@ Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21 -Top: 49030a049bcacc4789ad20b05e6a7a3ee28c5e0d +Top: 232d6a57cc2f2d81d4457edeeef4cb9e418b9640 Author: Sean Hefty Date: 2012-11-09 10:26:38 -0800 @@ -113,7 +113,7 @@ index 0a0370e..7135a61 100644 { errno = err; diff --git a/src/rsocket.c b/src/rsocket.c -index a060f66..04f00dd 100644 +index a060f66..aca705b 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -47,6 +47,8 @@ @@ -134,7 +134,7 @@ index a060f66..04f00dd 100644 #define RS_QP_MAX_SIZE 0xFFFE #define RS_QP_CTRL_SIZE 4 #define RS_CONN_RETRIES 6 -@@ -64,6 +66,28 @@ +@@ -64,6 +66,36 @@ static struct index_map idm; static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; @@ -145,6 +145,14 @@ index a060f66..04f00dd 100644 + +struct rsocket; + ++ ++#define PRINTADDR(a) \ ++printf("%s port %x ip %x\n", __func__, \ ++ ((struct sockaddr_in *)a)->sin_port, \ ++ ((struct sockaddr_in *)a)->sin_addr.s_addr) ++ ++ ++ +struct rs_svc_msg { + uint32_t op; + uint32_t status; @@ -163,7 +171,7 @@ index a060f66..04f00dd 100644 static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; static uint16_t def_sqsize = 384; -@@ -100,6 +124,14 @@ enum { +@@ -100,6 +132,14 @@ enum { #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) #define rs_msg_op(imm_data) (imm_data >> 29) #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF) @@ -178,7 +186,7 @@ index a060f66..04f00dd 100644 enum { RS_CTRL_DISCONNECT, -@@ -111,6 +143,18 @@ struct rs_msg { +@@ -111,6 +151,18 @@ struct rs_msg { uint32_t data; }; @@ -197,7 +205,7 @@ index a060f66..04f00dd 100644 struct rs_sge { uint64_t addr; uint32_t key; -@@ -145,8 +189,6 @@ struct rs_conn_data { +@@ -145,8 +197,6 @@ struct rs_conn_data { struct rs_sge data_buf; }; @@ -206,7 +214,7 @@ index a060f66..04f00dd 100644 /* * rsocket states are ordered as passive, connecting, connected, disconnected. */ -@@ -160,9 +202,9 @@ enum rs_state { +@@ -160,9 +210,9 @@ enum rs_state { rs_connecting = rs_opening | 0x0040, rs_accepting = rs_opening | 0x0080, rs_connected = 0x0100, @@ -219,7 +227,7 @@ index a060f66..04f00dd 100644 rs_connect_error = 0x0800, rs_disconnected = 0x1000, rs_error = 0x2000, -@@ -170,68 +212,249 @@ enum rs_state { +@@ -170,68 +220,248 @@ enum rs_state { #define RS_OPT_SWAP_SGL 1 @@ -375,12 +383,12 @@ index a060f66..04f00dd 100644 - void *target_buffer_list; - volatile struct rs_sge *target_sgl; - struct rs_iomap *target_iomap; -- ++#define DS_UDP_TAG 0x55555555 + - uint32_t rbuf_size; - struct ibv_mr *rmr; - uint8_t *rbuf; -+#define DS_UDP_TAG 0x55555555 - +- - uint32_t sbuf_size; - struct ibv_mr *smr; - struct ibv_sge ssgl[2]; @@ -442,7 +450,6 @@ index a060f66..04f00dd 100644 + + msg.op = RS_SVC_INSERT; + msg.status = EINVAL; -+ printf("%s rs %p\n", __func__, rs); + msg.rs = rs; + write(svc_sock[0], &msg, sizeof msg); + read(svc_sock[0], &msg, sizeof msg); @@ -501,7 +508,7 @@ index a060f66..04f00dd 100644 static int rs_value_to_scale(int value, int bits) { return value <= (1 << (bits - 1)) ? -@@ -307,10 +530,10 @@ out: +@@ -307,10 +537,10 @@ out: pthread_mutex_unlock(&mut); } @@ -514,7 +521,7 @@ index a060f66..04f00dd 100644 pthread_mutex_unlock(&mut); return rs->index; } -@@ -322,7 +545,7 @@ static void rs_remove(struct rsocket *rs) +@@ -322,7 +552,7 @@ static void rs_remove(struct rsocket *rs) pthread_mutex_unlock(&mut); } @@ -523,7 +530,7 @@ index a060f66..04f00dd 100644 { struct rsocket *rs; -@@ -330,29 +553,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) +@@ -330,29 +560,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) if (!rs) return NULL; @@ -568,7 +575,7 @@ index a060f66..04f00dd 100644 dlist_init(&rs->iomap_list); dlist_init(&rs->iomap_queue); return rs; -@@ -360,13 +593,29 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) +@@ -360,13 +600,26 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) static int rs_set_nonblocking(struct rsocket *rs, long arg) { @@ -586,10 +593,7 @@ index a060f66..04f00dd 100644 + if (!ret && rs->state < rs_connected) + ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); + } else { -+ printf("%s set nonblock\n", __func__); + ret = fcntl(rs->epfd, F_SETFL, arg); -+ printf("%s fcntl %d\n", __func__, ret); -+ + if (!ret && rs->qp_list) { + qp = rs->qp_list; + do { @@ -602,7 +606,7 @@ index a060f66..04f00dd 100644 return ret; } -@@ -390,17 +639,43 @@ static void rs_set_qp_size(struct rsocket *rs) +@@ -390,17 +643,39 @@ static void rs_set_qp_size(struct rsocket *rs) rs->rq_size = 2; } @@ -610,8 +614,6 @@ index a060f66..04f00dd 100644 +{ + uint16_t max_size; + -+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, -+ rs->rq_size, rs->rbuf_size); + max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE); + + if (rs->sq_size > max_size) @@ -628,8 +630,6 @@ index a060f66..04f00dd 100644 + rs->sq_size = rs->sbuf_size / RS_SNDLOWAT; + else + rs->sbuf_size = rs->sq_size * RS_SNDLOWAT; -+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, -+ rs->rq_size, rs->rbuf_size); +} + static int rs_init_bufs(struct rsocket *rs) @@ -666,14 +666,15 @@ index a060f66..04f00dd 100644 rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size); if (!rs->rmr) -@@ -440,37 +715,56 @@ static int rs_init_bufs(struct rsocket *rs) +@@ -440,37 +715,57 @@ static int rs_init_bufs(struct rsocket *rs) return 0; } -static int rs_create_cq(struct rsocket *rs) +static int ds_init_bufs(struct ds_qp *qp) +{ -+ qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf)); ++ qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), ++ sizeof(*qp->rbuf)); + if (!qp->rbuf) + return ERR(ENOMEM); + @@ -681,7 +682,8 @@ index a060f66..04f00dd 100644 + if (!qp->smr) + return -1; + -+ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size); ++ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size + ++ sizeof(struct ibv_grh)); + if (!qp->rmr) + return -1; + @@ -693,7 +695,6 @@ index a060f66..04f00dd 100644 - rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs); - if (!rs->cm_id->recv_cq_channel) + cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs); -+ printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel); + if (!cm_id->recv_cq_channel) return -1; @@ -702,14 +703,15 @@ index a060f66..04f00dd 100644 - if (!rs->cm_id->recv_cq) + cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size, + cm_id, cm_id->recv_cq_channel, 0); -+ printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size); + if (!cm_id->recv_cq) goto err1; if (rs->fd_flags & O_NONBLOCK) { -+ printf("%s set nonblock\n", __func__); - if (rs_set_nonblocking(rs, O_NONBLOCK)) +- if (rs_set_nonblocking(rs, O_NONBLOCK)) ++ if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK)) goto err2; ++ } else { ++ ibv_req_notify_cq(cm_id->recv_cq, 0); } - rs->cm_id->send_cq_channel = rs->cm_id->recv_cq_channel; @@ -737,23 +739,26 @@ index a060f66..04f00dd 100644 { struct ibv_recv_wr wr, *bad; -@@ -482,6 +776,23 @@ rs_post_recv(struct rsocket *rs) +@@ -482,6 +777,26 @@ rs_post_recv(struct rsocket *rs) return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad)); } -+static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf) ++static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset) +{ + struct ibv_recv_wr wr, *bad; -+ struct ibv_sge sge; ++ struct ibv_sge sge[2]; + -+ sge.addr = (uintptr_t) buf; -+ sge.length = RS_SNDLOWAT; -+ sge.lkey = qp->rmr->lkey; ++ sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size; ++ sge[0].length = sizeof(struct ibv_grh); ++ sge[0].lkey = qp->rmr->lkey; ++ sge[1].addr = (uintptr_t) qp->rbuf + offset; ++ sge[1].length = RS_SNDLOWAT; ++ sge[1].lkey = qp->rmr->lkey; + -+ wr.wr_id = ds_recv_wr_id((uint32_t) ((uint8_t *) buf - rs->rbuf)); ++ wr.wr_id = ds_recv_wr_id(offset); + wr.next = NULL; -+ wr.sg_list = &sge; -+ wr.num_sge = 1; ++ wr.sg_list = sge; ++ wr.num_sge = 2; + + return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad)); +} @@ -761,7 +766,7 @@ index a060f66..04f00dd 100644 static int rs_create_ep(struct rsocket *rs) { struct ibv_qp_init_attr qp_attr; -@@ -492,7 +803,7 @@ static int rs_create_ep(struct rsocket *rs) +@@ -492,7 +807,7 @@ static int rs_create_ep(struct rsocket *rs) if (ret) return ret; @@ -770,7 +775,7 @@ index a060f66..04f00dd 100644 if (ret) return ret; -@@ -549,8 +860,73 @@ static void rs_free_iomappings(struct rsocket *rs) +@@ -549,8 +864,73 @@ static void rs_free_iomappings(struct rsocket *rs) } } @@ -844,7 +849,7 @@ index a060f66..04f00dd 100644 if (rs->index >= 0) rs_remove(rs); -@@ -582,7 +958,7 @@ static void rs_free(struct rsocket *rs) +@@ -582,7 +962,7 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } @@ -853,7 +858,7 @@ index a060f66..04f00dd 100644 fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); fastlock_destroy(&rs->rlock); -@@ -636,29 +1012,89 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) +@@ -636,29 +1016,88 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) rs->sseq_comp = ntohs(conn->credits); } @@ -930,16 +935,15 @@ index a060f66..04f00dd 100644 + ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); + if (ret) + goto err; -+ + +- ret = rs_insert(rs); + rs->cm_id->route.addr.src_addr.sa_family = domain; + index = rs->cm_id->channel->fd; + } else { -+ printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size); + ret = ds_init(rs, domain); + if (ret) + goto err; - -- ret = rs_insert(rs); ++ + index = rs->udp_sock; + } + @@ -951,7 +955,7 @@ index a060f66..04f00dd 100644 return rs->index; err: -@@ -672,9 +1108,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) +@@ -672,9 +1111,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) int ret; rs = idm_at(&idm, socket); @@ -973,7 +977,7 @@ index a060f66..04f00dd 100644 return ret; } -@@ -710,7 +1155,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) +@@ -710,7 +1158,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) int ret; rs = idm_at(&idm, socket); @@ -982,7 +986,7 @@ index a060f66..04f00dd 100644 if (!new_rs) return ERR(ENOMEM); -@@ -718,7 +1163,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) +@@ -718,7 +1166,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) if (ret) goto err; @@ -991,7 +995,25 @@ index a060f66..04f00dd 100644 if (ret < 0) goto err; -@@ -855,13 +1300,256 @@ connected: +@@ -729,7 +1177,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) + } + + if (rs->fd_flags & O_NONBLOCK) +- rs_set_nonblocking(new_rs, O_NONBLOCK); ++ fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); + + ret = rs_create_ep(new_rs); + if (ret) +@@ -831,7 +1279,7 @@ connected: + break; + case rs_accepting: + if (!(rs->fd_flags & O_NONBLOCK)) +- rs_set_nonblocking(rs, 0); ++ fcntl(rs->cm_id->channel->fd, F_SETFL, 0); + + ret = ucma_complete(rs->cm_id); + if (ret) +@@ -855,13 +1303,251 @@ connected: return ret; } @@ -1013,8 +1035,10 @@ index a060f66..04f00dd 100644 + int sock, ret; + uint16_t port; + ++// printf("dest: "); PRINTADDR(dest_addr); + *src_len = sizeof src_addr; + ret = getsockname(rs->udp_sock, &src_addr->sa, src_len); ++// printf("src: "); PRINTADDR(src_addr); + if (ret || !rs_any_addr(src_addr)) + return ret; + @@ -1030,6 +1054,7 @@ index a060f66..04f00dd 100644 + *src_len = sizeof src_addr; + ret = getsockname(sock, &src_addr->sa, src_len); + src_addr->sin.sin_port = port; ++// printf("selected src: "); +out: + close(sock); + return ret; @@ -1038,6 +1063,7 @@ index a060f66..04f00dd 100644 +static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr) +{ + if (addr->sa.sa_family == AF_INET) { ++ PRINTADDR(addr); + hdr->version = 4; + hdr->length = DS_IPV4_HDR_LEN; + hdr->port = addr->sin.sin_port; @@ -1058,6 +1084,7 @@ index a060f66..04f00dd 100644 + struct ibv_ah_attr attr; + int ret; + ++// printf("%s\n", __func__); + memcpy(&qp->dest.addr, addr, addrlen); + qp->dest.qp = qp; + qp->dest.qpn = qp->cm_id->qp->qp_num; @@ -1070,6 +1097,8 @@ index a060f66..04f00dd 100644 + attr.dlid = port_attr.lid; + attr.port_num = qp->cm_id->port_num; + qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr); ++// printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid, ++// attr.port_num, qp->dest.qpn); + if (!qp->dest.ah) + return ERR(ENOMEM); + @@ -1085,30 +1114,26 @@ index a060f66..04f00dd 100644 + struct epoll_event event; + int i, ret; + -+printf("%s\n", __func__); ++ PRINTADDR(src_addr); + qp = calloc(1, sizeof(*qp)); + if (!qp) + return ERR(ENOMEM); + + qp->rs = rs; + ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP); -+ printf("%s rdma_create_id %d\n", __func__, ret); + if (ret) + goto err; + + ds_format_hdr(&qp->hdr, src_addr); + ret = rdma_bind_addr(qp->cm_id, &src_addr->sa); -+ printf("%s rdma_bind_addr %d\n", __func__, ret); + if (ret) + goto err; + + ret = ds_init_bufs(qp); -+ printf("%s ds_init_bufs %d\n", __func__, ret); + if (ret) + goto err; + + ret = rs_create_cq(rs, qp->cm_id); -+ printf("%s rs_create_cq %d\n", __func__, ret); + if (ret) + goto err; + @@ -1120,16 +1145,14 @@ index a060f66..04f00dd 100644 + qp_attr.sq_sig_all = 1; + qp_attr.cap.max_send_wr = rs->sq_size; + qp_attr.cap.max_recv_wr = rs->rq_size; -+ qp_attr.cap.max_send_sge = 2; -+ qp_attr.cap.max_recv_sge = 1; ++ qp_attr.cap.max_send_sge = 1; ++ qp_attr.cap.max_recv_sge = 2; + qp_attr.cap.max_inline_data = rs->sq_inline; + ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr); -+ printf("%s rdma_create_qp %d\n", __func__, ret); + if (ret) + goto err; + + ret = ds_add_qp_dest(qp, src_addr, addrlen); -+ printf("%s ds_add_qp_dest %d\n", __func__, ret); + if (ret) + goto err; + @@ -1137,12 +1160,11 @@ index a060f66..04f00dd 100644 + event.data.ptr = qp; + ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, + qp->cm_id->recv_cq_channel->fd, &event); -+ printf("%s epoll_ctl %d\n", __func__, ret); + if (ret) + goto err; + + for (i = 0; i < rs->rq_size; i++) { -+ ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT); ++ ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT); + if (ret) + goto err; + } @@ -1181,26 +1203,23 @@ index a060f66..04f00dd 100644 + struct ds_dest **tdest, *new_dest; + int ret = 0; + -+ printf("%s \n", __func__); ++ PRINTADDR(addr); + fastlock_acquire(&rs->map_lock); + tdest = tfind(addr, &rs->dest_map, ds_compare_addr); -+ printf("%s tfind %p\n", __func__, dest); + if (tdest) + goto found; + + ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len); -+ printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno)); ++// printf("get src: "); PRINTADDR(&src_addr); + if (ret) + goto out; + + ret = ds_get_qp(rs, &src_addr, src_len, &qp); -+ printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno)); + if (ret) + goto out; + + tdest = tfind(addr, &rs->dest_map, ds_compare_addr); + if (!tdest) { -+ printf("%s adding dest into map\n", __func__); + new_dest = calloc(1, sizeof(*new_dest)); + if (!new_dest) { + ret = ERR(ENOMEM); @@ -1231,7 +1250,6 @@ index a060f66..04f00dd 100644 + memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); + ret = rs_do_connect(rs); + } else { -+ printf("%s\n", __func__); + if (rs->state == rs_init) { + ret = ds_init_ep(rs); + if (ret) @@ -1239,18 +1257,17 @@ index a060f66..04f00dd 100644 + } + + fastlock_acquire(&rs->slock); ++ PRINTADDR(addr); + ret = connect(rs->udp_sock, addr, addrlen); -+ printf("%s connect %d %s\n", __func__, ret, strerror(errno)); + if (!ret) + ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); -+ printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno)); + fastlock_release(&rs->slock); + } + return ret; } static int rs_post_write_msg(struct rsocket *rs, -@@ -903,6 +1591,24 @@ static int rs_post_write(struct rsocket *rs, +@@ -903,6 +1589,26 @@ static int rs_post_write(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } @@ -1268,6 +1285,8 @@ index a060f66..04f00dd 100644 + wr.wr.ud.ah = rs->conn_dest->ah; + wr.wr.ud.remote_qpn = rs->conn_dest->qpn; + wr.wr.ud.remote_qkey = RDMA_UDP_QKEY; ++// printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah, ++// rs->conn_dest->qpn); + + return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad)); +} @@ -1284,15 +1303,12 @@ index a060f66..04f00dd 100644 } break; case RS_OP_WRITE: -@@ -1133,46 +1839,217 @@ static int rs_get_cq_event(struct rsocket *rs) - */ - static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) - { -- int ret; -+ int ret; -+ -+ fastlock_acquire(&rs->cq_lock); -+ do { +@@ -1137,42 +1843,214 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs + + fastlock_acquire(&rs->cq_lock); + do { +- rs_update_credits(rs); +- ret = rs_poll_cq(rs); + rs_update_credits(rs); + ret = rs_poll_cq(rs); + if (test(rs)) { @@ -1344,10 +1360,12 @@ index a060f66..04f00dd 100644 + return ret; +} + -+static int ds_valid_recv(void *buf, uint32_t len) ++static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc) +{ -+ struct ds_header *hdr = (struct ds_header *) buf; -+ return ((len >= sizeof(*hdr)) && ++ struct ds_header *hdr; ++ ++ hdr = (struct ds_header *) (qp->rbuf + ds_wr_offset(wc->wr_id)); ++ return ((wc->byte_len >= sizeof(struct ibv_grh) + sizeof(*hdr)) && + ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) || + (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN))); +} @@ -1381,22 +1399,22 @@ index a060f66..04f00dd 100644 + + if (ds_wr_is_recv(wc.wr_id)) { + if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS && -+ ds_valid_recv(qp->rbuf + ds_wr_offset(wc.wr_id), -+ wc.byte_len)) { ++ ds_valid_recv(qp, &wc)) { + rs->rqe_avail--; + rmsg = &rs->dmsg[rs->rmsg_tail]; + rmsg->qp = qp; + rmsg->offset = ds_wr_offset(wc.wr_id); -+ rmsg->length = wc.byte_len; ++ rmsg->length = wc.byte_len - sizeof(struct ibv_grh); + if (++rs->rmsg_tail == rs->rq_size + 1) + rs->rmsg_tail = 0; + } else { -+ ds_post_recv(rs, qp, qp->rbuf + -+ ds_wr_offset(wc.wr_id)); ++ printf("%s invalid recv\n", __func__); ++ ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id)); + } + } else { + smsg = (struct ds_smsg *) + (rs->sbuf + ds_wr_offset(wc.wr_id)); ++ printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free); + smsg->next = rs->smsg_free; + rs->smsg_free = smsg; + rs->sqe_avail++; @@ -1436,18 +1454,17 @@ index a060f66..04f00dd 100644 + void *context; + int ret; + -+ printf("%s \n", __func__); + if (!rs->cq_armed) + return 0; + ++// printf("wait for epoll event\n"); + ret = epoll_wait(rs->epfd, &event, 1, -1); -+ printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno)); ++// printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno)); + if (ret <= 0) + return ret; + + qp = event.data.ptr; + ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context); -+ printf("%s get cq event ret %d errno %s\n", __func__, ret, strerror(errno)); + if (!ret) { + ibv_ack_cq_events(qp->cm_id->recv_cq, 1); + qp->cq_armed = 0; @@ -1460,11 +1477,9 @@ index a060f66..04f00dd 100644 +static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) +{ + int ret = 0; - - fastlock_acquire(&rs->cq_lock); - do { -- rs_update_credits(rs); -- ret = rs_poll_cq(rs); ++ ++ fastlock_acquire(&rs->cq_lock); ++ do { + ds_poll_cqs(rs); if (test(rs)) { +// printf("%s test succeeded\n", __func__); @@ -1513,7 +1528,7 @@ index a060f66..04f00dd 100644 if (!ret || nonblock || errno != EWOULDBLOCK) return ret; -@@ -1184,7 +2061,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc +@@ -1184,7 +2062,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc (e.tv_usec - s.tv_usec) + 1; } while (poll_time <= polling_time); @@ -1522,7 +1537,7 @@ index a060f66..04f00dd 100644 return ret; } -@@ -1219,9 +2096,19 @@ static int rs_can_send(struct rsocket *rs) +@@ -1219,9 +2097,19 @@ static int rs_can_send(struct rsocket *rs) (rs->target_sgl[rs->target_sge].length != 0); } @@ -1543,7 +1558,7 @@ index a060f66..04f00dd 100644 } static int rs_conn_can_send_ctrl(struct rsocket *rs) -@@ -1236,7 +2123,7 @@ static int rs_have_rdata(struct rsocket *rs) +@@ -1236,7 +2124,7 @@ static int rs_have_rdata(struct rsocket *rs) static int rs_conn_have_rdata(struct rsocket *rs) { @@ -1552,7 +1567,7 @@ index a060f66..04f00dd 100644 } static int rs_conn_all_sends_done(struct rsocket *rs) -@@ -1245,6 +2132,70 @@ static int rs_conn_all_sends_done(struct rsocket *rs) +@@ -1245,6 +2133,73 @@ static int rs_conn_all_sends_done(struct rsocket *rs) !(rs->state & rs_connected); } @@ -1588,15 +1603,15 @@ index a060f66..04f00dd 100644 + struct ds_header *hdr; + int ret; + -+// printf("%s \n", __func__); ++ printf("%s \n", __func__); + if (!(rs->state & rs_readable)) + return ERR(EINVAL); + + if (!rs_have_rdata(rs)) { -+// printf("%s need rdata \n", __func__); ++ printf("%s need rdata \n", __func__); + ret = ds_get_comp(rs, rs_nonblocking(rs, flags), + rs_have_rdata); -+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); ++ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); + if (ret) + return ret; + } @@ -1608,10 +1623,13 @@ index a060f66..04f00dd 100644 + + memcpy(buf, (void *) hdr + hdr->length, len); + if (addrlen) ++{ + ds_set_src(src_addr, addrlen, hdr); ++PRINTADDR(src_addr); ++} + + if (!(flags & MSG_PEEK)) { -+ ds_post_recv(rs, rmsg->qp, hdr); ++ ds_post_recv(rs, rmsg->qp, rmsg->offset); + if (++rs->rmsg_head == rs->rq_size + 1) + rs->rmsg_head = 0; + } @@ -1623,7 +1641,7 @@ index a060f66..04f00dd 100644 static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len) { size_t left = len; -@@ -1290,6 +2241,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) +@@ -1290,6 +2245,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) int ret; rs = idm_at(&idm, socket); @@ -1637,7 +1655,7 @@ index a060f66..04f00dd 100644 if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { -@@ -1339,7 +2297,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) +@@ -1339,7 +2301,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) rs->rbuf_bytes_avail += rsize; } @@ -1646,7 +1664,7 @@ index a060f66..04f00dd 100644 fastlock_release(&rs->rlock); return ret ? ret : len - left; -@@ -1348,8 +2306,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) +@@ -1348,8 +2310,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen) { @@ -1664,7 +1682,7 @@ index a060f66..04f00dd 100644 ret = rrecv(socket, buf, len, flags); if (ret > 0 && src_addr) rgetpeername(socket, src_addr, addrlen); -@@ -1391,14 +2358,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) +@@ -1391,14 +2362,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) struct rs_iomap iom; int ret; @@ -1681,7 +1699,7 @@ index a060f66..04f00dd 100644 ret = ERR(ECONNRESET); break; } -@@ -1447,10 +2414,99 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) +@@ -1447,10 +2418,99 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) } rs->iomap_pending = !dlist_empty(&rs->iomap_queue); @@ -1716,7 +1734,7 @@ index a060f66..04f00dd 100644 + } + + miov[0].iov_base = &hdr; -+ miov[0].iov_len = sizeof hdr; ++ miov[0].iov_len = hdr.length; + if (iov && iovcnt) + memcpy(&miov[1], iov, sizeof *iov * iovcnt); + @@ -1727,7 +1745,6 @@ index a060f66..04f00dd 100644 + msg.msg_iovlen = iovcnt + 1; +// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen); + ret = sendmsg(rs->udp_sock, &msg, flags); -+ printf("%s ret %d %s\n", __func__, ret, strerror(errno)); + return ret > 0 ? ret - sizeof hdr : ret; +} + @@ -1735,7 +1752,7 @@ index a060f66..04f00dd 100644 + int flags, uint8_t op) +{ + struct iovec iov; -+ printf("%s\n", __func__); ++// printf("%s\n", __func__); + if (buf && len) { +// printf("%s have buffer\n", __func__); + iov.iov_base = (void *) buf; @@ -1775,6 +1792,7 @@ index a060f66..04f00dd 100644 + sge.lkey = rs->conn_dest->qp->smr->lkey; + offset = (uint8_t *) msg - rs->sbuf; + ++ printf("%s - sending over QP\n", __func__); + ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length)); + return ret ? ret : len; +} @@ -1782,7 +1800,7 @@ index a060f66..04f00dd 100644 /* * We overlap sending the data, by posting a small work request immediately, * then increasing the size of the send on each iteration. -@@ -1464,6 +2520,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) +@@ -1464,6 +2524,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) int ret = 0; rs = idm_at(&idm, socket); @@ -1796,7 +1814,7 @@ index a060f66..04f00dd 100644 if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { -@@ -1485,7 +2548,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) +@@ -1485,7 +2552,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) rs_conn_can_send); if (ret) break; @@ -1805,7 +1823,7 @@ index a060f66..04f00dd 100644 ret = ERR(ECONNRESET); break; } -@@ -1538,10 +2601,39 @@ out: +@@ -1538,10 +2605,36 @@ out: ssize_t rsendto(int socket, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) { @@ -1813,8 +1831,10 @@ index a060f66..04f00dd 100644 - return ERR(EISCONN); + struct rsocket *rs; + int ret; -+ -+ printf("%s\n", __func__); + +- return rsend(socket, buf, len, flags); ++ PRINTADDR(dest_addr); ++ printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf)); + rs = idm_at(&idm, socket); + if (rs->type == SOCK_STREAM) { + if (dest_addr || addrlen) @@ -1830,17 +1850,12 @@ index a060f66..04f00dd 100644 + } + + fastlock_acquire(&rs->slock); -+ printf("%s check conn dest\n", __func__); + if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) { -+ printf("%s need conn dest\n", __func__); + ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest); + if (ret) + goto out; + } -+ else -+ printf("%s connected\n", __func__); - -- return rsend(socket, buf, len, flags); ++ + ret = dsend(rs, buf, len, flags); +out: + fastlock_release(&rs->slock); @@ -1848,7 +1863,7 @@ index a060f66..04f00dd 100644 } static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len) -@@ -1600,7 +2692,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags +@@ -1600,7 +2693,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags rs_conn_can_send); if (ret) break; @@ -1857,7 +1872,7 @@ index a060f66..04f00dd 100644 ret = ERR(ECONNRESET); break; } -@@ -1653,7 +2745,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) +@@ -1653,7 +2746,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) if (msg->msg_control && msg->msg_controllen) return ERR(ENOTSUP); @@ -1866,7 +1881,7 @@ index a060f66..04f00dd 100644 } ssize_t rwrite(int socket, const void *buf, size_t count) -@@ -1690,8 +2782,8 @@ static int rs_poll_rs(struct rsocket *rs, int events, +@@ -1690,8 +2783,8 @@ static int rs_poll_rs(struct rsocket *rs, int events, int ret; check_cq: @@ -1877,7 +1892,7 @@ index a060f66..04f00dd 100644 rs_process_cq(rs, nonblock, test); revents = 0; -@@ -1707,6 +2799,16 @@ check_cq: +@@ -1707,6 +2800,16 @@ check_cq: } return revents; @@ -1894,7 +1909,7 @@ index a060f66..04f00dd 100644 } if (rs->state == rs_listening) { -@@ -1766,11 +2868,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) +@@ -1766,11 +2869,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) if (fds[i].revents) return 1; @@ -1914,7 +1929,7 @@ index a060f66..04f00dd 100644 rfds[i].events = POLLIN; } else { rfds[i].fd = fds[i].fd; -@@ -1793,7 +2898,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) +@@ -1793,7 +2899,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) rs = idm_lookup(&idm, fds[i].fd); if (rs) { @@ -1926,7 +1941,7 @@ index a060f66..04f00dd 100644 fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); } else { fds[i].revents = rfds[i].revents; -@@ -1949,7 +3057,7 @@ int rshutdown(int socket, int how) +@@ -1949,7 +3058,7 @@ int rshutdown(int socket, int how) rs = idm_at(&idm, socket); if (how == SHUT_RD) { @@ -1935,7 +1950,7 @@ index a060f66..04f00dd 100644 return 0; } -@@ -1959,10 +3067,10 @@ int rshutdown(int socket, int how) +@@ -1959,10 +3068,10 @@ int rshutdown(int socket, int how) if (rs->state & rs_connected) { if (how == SHUT_RDWR) { ctrl = RS_CTRL_DISCONNECT; @@ -1949,7 +1964,7 @@ index a060f66..04f00dd 100644 RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT; } if (!rs->ctrl_avail) { -@@ -1987,13 +3095,29 @@ int rshutdown(int socket, int how) +@@ -1987,13 +3096,29 @@ int rshutdown(int socket, int how) return 0; } @@ -1981,7 +1996,7 @@ index a060f66..04f00dd 100644 rs_free(rs); return 0; -@@ -2018,8 +3142,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) +@@ -2018,8 +3143,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); @@ -1996,7 +2011,7 @@ index a060f66..04f00dd 100644 } int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) -@@ -2027,8 +3155,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) +@@ -2027,8 +3156,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); @@ -2011,7 +2026,7 @@ index a060f66..04f00dd 100644 } int rsetsockopt(int socket, int level, int optname, -@@ -2040,18 +3172,26 @@ int rsetsockopt(int socket, int level, int optname, +@@ -2040,22 +3173,31 @@ int rsetsockopt(int socket, int level, int optname, ret = ERR(ENOTSUP); rs = idm_at(&idm, socket); @@ -2045,7 +2060,13 @@ index a060f66..04f00dd 100644 opt_on = *(int *) optval; break; case SO_RCVBUF: -@@ -2101,9 +3241,11 @@ int rsetsockopt(int socket, int level, int optname, +- if (!rs->rbuf) ++ if ((rs->type == SOCK_STREAM && !rs->rbuf) || ++ (rs->type == SOCK_DGRAM && !rs->qp_list)) + rs->rbuf_size = (*(uint32_t *) optval) << 1; + ret = 0; + break; +@@ -2101,9 +3243,11 @@ int rsetsockopt(int socket, int level, int optname, opts = &rs->ipv6_opts; switch (optname) { case IPV6_V6ONLY: @@ -2060,7 +2081,7 @@ index a060f66..04f00dd 100644 opt_on = *(int *) optval; break; default: -@@ -2315,7 +3457,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse +@@ -2315,7 +3459,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE))) return ERR(EINVAL); @@ -2069,7 +2090,7 @@ index a060f66..04f00dd 100644 if (prot & PROT_WRITE) { iomr = rs_get_iomap_mr(rs); access |= IBV_ACCESS_REMOTE_WRITE; -@@ -2349,7 +3491,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse +@@ -2349,7 +3493,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse dlist_insert_tail(&iomr->entry, &rs->iomap_list); } out: @@ -2078,7 +2099,7 @@ index a060f66..04f00dd 100644 return offset; } -@@ -2361,7 +3503,7 @@ int riounmap(int socket, void *buf, size_t len) +@@ -2361,7 +3505,7 @@ int riounmap(int socket, void *buf, size_t len) int ret = 0; rs = idm_at(&idm, socket); @@ -2087,7 +2108,7 @@ index a060f66..04f00dd 100644 for (entry = rs->iomap_list.next; entry != &rs->iomap_list; entry = entry->next) { -@@ -2382,7 +3524,7 @@ int riounmap(int socket, void *buf, size_t len) +@@ -2382,7 +3526,7 @@ int riounmap(int socket, void *buf, size_t len) } ret = ERR(EINVAL); out: @@ -2096,7 +2117,7 @@ index a060f66..04f00dd 100644 return ret; } -@@ -2426,7 +3568,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla +@@ -2426,7 +3570,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla rs_conn_can_send); if (ret) break; @@ -2105,7 +2126,7 @@ index a060f66..04f00dd 100644 ret = ERR(ECONNRESET); break; } -@@ -2476,3 +3618,302 @@ out: +@@ -2476,3 +3620,278 @@ out: return (ret && left == count) ? ret : count - left; } @@ -2149,11 +2170,9 @@ index a060f66..04f00dd 100644 + } + + svc_rss[++svc_cnt] = rs; -+ printf("%s rs %p\n", __func__, rs); + svc_fds[svc_cnt].fd = rs->udp_sock; + svc_fds[svc_cnt].events = POLLIN; + svc_fds[svc_cnt].revents = 0; -+ printf("add rs udp sock %d\n",rs->udp_sock); + return 0; +} + @@ -2177,7 +2196,6 @@ index a060f66..04f00dd 100644 + struct rs_svc_msg msg; + + read(svc_sock[1], &msg, sizeof msg); -+ printf("%s op %d\n",__func__, msg.op); + switch (msg.op) { + case RS_SVC_INSERT: + msg.status = rs_svc_add_rs(msg.rs); @@ -2189,7 +2207,6 @@ index a060f66..04f00dd 100644 + msg.status = ENOTSUP; + break; + } -+ printf("%s status %d\n",__func__, msg.status); + write(svc_sock[1], &msg, sizeof msg); +} + @@ -2223,7 +2240,6 @@ index a060f66..04f00dd 100644 + struct ibv_ah_attr attr; + int ret; + -+ printf("%s\n",__func__); + if (dest->ah) { + fastlock_acquire(&rs->slock); + ibv_destroy_ah(dest->ah); @@ -2232,7 +2248,6 @@ index a060f66..04f00dd 100644 + } + + ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps); -+ printf("%s rdma_create_id %d %s\n",__func__, ret, strerror(errno)); + if (ret) + return; + @@ -2243,12 +2258,10 @@ index a060f66..04f00dd 100644 + else + saddr.sin6.sin6_port = 0; + ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000); -+ printf("%s rdma_resolve_addr %d %s\n",__func__, ret, strerror(errno)); + if (ret) + goto out; + + ret = rdma_resolve_route(id, 2000); -+ printf("%s rdma_resolve_route %d %s\n",__func__, ret, strerror(errno)); + if (ret) + goto out; + @@ -2267,12 +2280,9 @@ index a060f66..04f00dd 100644 + attr.static_rate = id->route.path_rec->rate; + attr.port_num = id->port_num; + -+ printf("%s getting slock \n",__func__); + fastlock_acquire(&rs->slock); -+ printf("%s why am I not here? \n",__func__); + dest->qpn = qpn; + dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr); -+ printf("%s ibv_create_ah %p %s\n",__func__, dest->ah, strerror(errno)); + fastlock_release(&rs->slock); +out: + rdma_destroy_id(id); @@ -2281,17 +2291,6 @@ index a060f66..04f00dd 100644 +static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, + union socket_addr *addr) +{ -+printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag, -+ udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length); -+ -+printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n", -+udp_hdr->tag == ntohl(DS_UDP_TAG), -+ udp_hdr->version == 4, addr->sa.sa_family == AF_INET, -+ udp_hdr->length == DS_UDP_IPV4_HDR_LEN, -+ udp_hdr->version == 6, addr->sa.sa_family == AF_INET6, -+ udp_hdr->length == DS_UDP_IPV6_HDR_LEN); -+ -+ + return (udp_hdr->tag == ntohl(DS_UDP_TAG)) && + ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET && + udp_hdr->length == DS_UDP_IPV4_HDR_LEN) || @@ -2307,7 +2306,7 @@ index a060f66..04f00dd 100644 + struct ibv_sge sge; + uint64_t offset; + -+ printf("%s\n",__func__); ++// PRINTADDR(src); + if (!ds_can_send(rs)) { + if (ds_get_comp(rs, 0, ds_can_send)) + return; @@ -2318,13 +2317,18 @@ index a060f66..04f00dd 100644 + rs->sqe_avail--; + + ds_format_hdr(&hdr, src); ++// printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version, ++// hdr.length, hdr.port); + memcpy((void *) msg, &hdr, hdr.length); + memcpy((void *) msg + hdr.length, buf, len); ++// printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf)); + sge.addr = (uintptr_t) msg; + sge.length = hdr.length + len; + sge.lkey = rs->conn_dest->qp->smr->lkey; + offset = (uint8_t *) msg - rs->sbuf; + ++// printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version, ++// ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port); + ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length)); +} + @@ -2336,9 +2340,9 @@ index a060f66..04f00dd 100644 + socklen_t addrlen = sizeof addr; + int len, ret; + -+ printf("%s\n",__func__); + ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen); -+ printf("%s recvfrom %d\n",__func__, ret); ++// PRINTADDR(&addr); ++// printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8])); + if (ret < DS_UDP_IPV4_HDR_LEN) + return; + @@ -2346,12 +2350,10 @@ index a060f66..04f00dd 100644 + if (!rs_svc_valid_udp_hdr(udp_hdr, &addr)) + return; + -+ printf("%s valid hdr\n",__func__); + len = ret - udp_hdr->length; + udp_hdr->tag = ntohl(udp_hdr->tag); + udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF; + ret = ds_get_dest(rs, &addr.sa, addrlen, &dest); -+ printf("%s ds_get_dest %d\n",__func__, ret); + if (ret) + return; + @@ -2359,16 +2361,15 @@ index a060f66..04f00dd 100644 + rs_svc_create_ah(rs, dest, udp_hdr->qpn); + + /* to do: handle when dest local ip address doesn't match udp ip */ ++ if (udp_hdr->op != RS_OP_DATA) ++ return; ++ + fastlock_acquire(&rs->slock); + cur_dest = rs->conn_dest; -+ if (udp_hdr->op == RS_OP_DATA) { -+ rs->conn_dest = &dest->qp->dest; -+ printf("%s forwarding msg\n",__func__); -+ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); -+ } ++ rs->conn_dest = &dest->qp->dest; ++ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); + + rs->conn_dest = dest; -+ printf("%s sending resp\n",__func__); + ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL); + rs->conn_dest = cur_dest; + fastlock_release(&rs->slock); @@ -2379,7 +2380,6 @@ index a060f66..04f00dd 100644 + struct rs_svc_msg msg; + int i, ret; + -+ printf("%s\n",__func__); + ret = rs_svc_grow_sets(); + if (ret) { + msg.status = ret; @@ -2390,13 +2390,10 @@ index a060f66..04f00dd 100644 + svc_fds[0].fd = svc_sock[1]; + svc_fds[0].events = POLLIN; + do { -+ printf("%s svc cnt %d\n",__func__, svc_cnt); + for (i = 0; i <= svc_cnt; i++) + svc_fds[i].revents = 0; + -+ printf("%s poll\n",__func__); + poll(svc_fds, svc_cnt + 1, -1); -+ printf("%s poll done\n",__func__); + if (svc_fds[0].revents) + rs_svc_process_sock(); + diff --git a/patches/refresh-temp b/patches/refresh-temp deleted file mode 100644 index 94b69fc3..00000000 --- a/patches/refresh-temp +++ /dev/null @@ -1,687 +0,0 @@ -Bottom: 49030a049bcacc4789ad20b05e6a7a3ee28c5e0d -Top: 232d6a57cc2f2d81d4457edeeef4cb9e418b9640 -Author: Sean Hefty -Date: 2012-12-15 00:15:42 -0800 - -Refresh of dsocket - ---- - -diff --git a/src/rsocket.c b/src/rsocket.c -index 04f00dd..aca705b 100644 ---- a/src/rsocket.c -+++ b/src/rsocket.c -@@ -73,6 +73,14 @@ enum { - - struct rsocket; - -+ -+#define PRINTADDR(a) \ -+printf("%s port %x ip %x\n", __func__, \ -+ ((struct sockaddr_in *)a)->sin_port, \ -+ ((struct sockaddr_in *)a)->sin_addr.s_addr) -+ -+ -+ - struct rs_svc_msg { - uint32_t op; - uint32_t status; -@@ -399,7 +407,6 @@ static int rs_add_to_svc(struct rsocket *rs) - - msg.op = RS_SVC_INSERT; - msg.status = EINVAL; -- printf("%s rs %p\n", __func__, rs); - msg.rs = rs; - write(svc_sock[0], &msg, sizeof msg); - read(svc_sock[0], &msg, sizeof msg); -@@ -603,10 +610,7 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg) - if (!ret && rs->state < rs_connected) - ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); - } else { -- printf("%s set nonblock\n", __func__); - ret = fcntl(rs->epfd, F_SETFL, arg); -- printf("%s fcntl %d\n", __func__, ret); -- - if (!ret && rs->qp_list) { - qp = rs->qp_list; - do { -@@ -643,8 +647,6 @@ static void ds_set_qp_size(struct rsocket *rs) - { - uint16_t max_size; - -- printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, -- rs->rq_size, rs->rbuf_size); - max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE); - - if (rs->sq_size > max_size) -@@ -661,8 +663,6 @@ static void ds_set_qp_size(struct rsocket *rs) - rs->sq_size = rs->sbuf_size / RS_SNDLOWAT; - else - rs->sbuf_size = rs->sq_size * RS_SNDLOWAT; -- printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, -- rs->rq_size, rs->rbuf_size); - } - - static int rs_init_bufs(struct rsocket *rs) -@@ -717,7 +717,8 @@ static int rs_init_bufs(struct rsocket *rs) - - static int ds_init_bufs(struct ds_qp *qp) - { -- qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf)); -+ qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), -+ sizeof(*qp->rbuf)); - if (!qp->rbuf) - return ERR(ENOMEM); - -@@ -725,7 +726,8 @@ static int ds_init_bufs(struct ds_qp *qp) - if (!qp->smr) - return -1; - -- qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size); -+ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size + -+ sizeof(struct ibv_grh)); - if (!qp->rmr) - return -1; - -@@ -735,20 +737,19 @@ static int ds_init_bufs(struct ds_qp *qp) - static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) - { - cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs); -- printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel); - if (!cm_id->recv_cq_channel) - return -1; - - cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size, - cm_id, cm_id->recv_cq_channel, 0); -- printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size); - if (!cm_id->recv_cq) - goto err1; - - if (rs->fd_flags & O_NONBLOCK) { -- printf("%s set nonblock\n", __func__); -- if (rs_set_nonblocking(rs, O_NONBLOCK)) -+ if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK)) - goto err2; -+ } else { -+ ibv_req_notify_cq(cm_id->recv_cq, 0); - } - - cm_id->send_cq_channel = cm_id->recv_cq_channel; -@@ -776,19 +777,22 @@ static inline int rs_post_recv(struct rsocket *rs) - return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad)); - } - --static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf) -+static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset) - { - struct ibv_recv_wr wr, *bad; -- struct ibv_sge sge; -+ struct ibv_sge sge[2]; - -- sge.addr = (uintptr_t) buf; -- sge.length = RS_SNDLOWAT; -- sge.lkey = qp->rmr->lkey; -+ sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size; -+ sge[0].length = sizeof(struct ibv_grh); -+ sge[0].lkey = qp->rmr->lkey; -+ sge[1].addr = (uintptr_t) qp->rbuf + offset; -+ sge[1].length = RS_SNDLOWAT; -+ sge[1].lkey = qp->rmr->lkey; - -- wr.wr_id = ds_recv_wr_id((uint32_t) ((uint8_t *) buf - rs->rbuf)); -+ wr.wr_id = ds_recv_wr_id(offset); - wr.next = NULL; -- wr.sg_list = &sge; -- wr.num_sge = 1; -+ wr.sg_list = sge; -+ wr.num_sge = 2; - - return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad)); - } -@@ -1083,7 +1087,6 @@ int rsocket(int domain, int type, int protocol) - rs->cm_id->route.addr.src_addr.sa_family = domain; - index = rs->cm_id->channel->fd; - } else { -- printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size); - ret = ds_init(rs, domain); - if (ret) - goto err; -@@ -1174,7 +1177,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) - } - - if (rs->fd_flags & O_NONBLOCK) -- rs_set_nonblocking(new_rs, O_NONBLOCK); -+ fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK); - - ret = rs_create_ep(new_rs); - if (ret) -@@ -1276,7 +1279,7 @@ connected: - break; - case rs_accepting: - if (!(rs->fd_flags & O_NONBLOCK)) -- rs_set_nonblocking(rs, 0); -+ fcntl(rs->cm_id->channel->fd, F_SETFL, 0); - - ret = ucma_complete(rs->cm_id); - if (ret) -@@ -1318,8 +1321,10 @@ static int ds_get_src_addr(struct rsocket *rs, - int sock, ret; - uint16_t port; - -+// printf("dest: "); PRINTADDR(dest_addr); - *src_len = sizeof src_addr; - ret = getsockname(rs->udp_sock, &src_addr->sa, src_len); -+// printf("src: "); PRINTADDR(src_addr); - if (ret || !rs_any_addr(src_addr)) - return ret; - -@@ -1335,6 +1340,7 @@ static int ds_get_src_addr(struct rsocket *rs, - *src_len = sizeof src_addr; - ret = getsockname(sock, &src_addr->sa, src_len); - src_addr->sin.sin_port = port; -+// printf("selected src: "); - out: - close(sock); - return ret; -@@ -1343,6 +1349,7 @@ out: - static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr) - { - if (addr->sa.sa_family == AF_INET) { -+ PRINTADDR(addr); - hdr->version = 4; - hdr->length = DS_IPV4_HDR_LEN; - hdr->port = addr->sin.sin_port; -@@ -1363,6 +1370,7 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr, - struct ibv_ah_attr attr; - int ret; - -+// printf("%s\n", __func__); - memcpy(&qp->dest.addr, addr, addrlen); - qp->dest.qp = qp; - qp->dest.qpn = qp->cm_id->qp->qp_num; -@@ -1375,6 +1383,8 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr, - attr.dlid = port_attr.lid; - attr.port_num = qp->cm_id->port_num; - qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr); -+// printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid, -+// attr.port_num, qp->dest.qpn); - if (!qp->dest.ah) - return ERR(ENOMEM); - -@@ -1390,30 +1400,26 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, - struct epoll_event event; - int i, ret; - --printf("%s\n", __func__); -+ PRINTADDR(src_addr); - qp = calloc(1, sizeof(*qp)); - if (!qp) - return ERR(ENOMEM); - - qp->rs = rs; - ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP); -- printf("%s rdma_create_id %d\n", __func__, ret); - if (ret) - goto err; - - ds_format_hdr(&qp->hdr, src_addr); - ret = rdma_bind_addr(qp->cm_id, &src_addr->sa); -- printf("%s rdma_bind_addr %d\n", __func__, ret); - if (ret) - goto err; - - ret = ds_init_bufs(qp); -- printf("%s ds_init_bufs %d\n", __func__, ret); - if (ret) - goto err; - - ret = rs_create_cq(rs, qp->cm_id); -- printf("%s rs_create_cq %d\n", __func__, ret); - if (ret) - goto err; - -@@ -1425,16 +1431,14 @@ printf("%s\n", __func__); - qp_attr.sq_sig_all = 1; - qp_attr.cap.max_send_wr = rs->sq_size; - qp_attr.cap.max_recv_wr = rs->rq_size; -- qp_attr.cap.max_send_sge = 2; -- qp_attr.cap.max_recv_sge = 1; -+ qp_attr.cap.max_send_sge = 1; -+ qp_attr.cap.max_recv_sge = 2; - qp_attr.cap.max_inline_data = rs->sq_inline; - ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr); -- printf("%s rdma_create_qp %d\n", __func__, ret); - if (ret) - goto err; - - ret = ds_add_qp_dest(qp, src_addr, addrlen); -- printf("%s ds_add_qp_dest %d\n", __func__, ret); - if (ret) - goto err; - -@@ -1442,12 +1446,11 @@ printf("%s\n", __func__); - event.data.ptr = qp; - ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, - qp->cm_id->recv_cq_channel->fd, &event); -- printf("%s epoll_ctl %d\n", __func__, ret); - if (ret) - goto err; - - for (i = 0; i < rs->rq_size; i++) { -- ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT); -+ ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT); - if (ret) - goto err; - } -@@ -1486,26 +1489,23 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr, - struct ds_dest **tdest, *new_dest; - int ret = 0; - -- printf("%s \n", __func__); -+ PRINTADDR(addr); - fastlock_acquire(&rs->map_lock); - tdest = tfind(addr, &rs->dest_map, ds_compare_addr); -- printf("%s tfind %p\n", __func__, dest); - if (tdest) - goto found; - - ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len); -- printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno)); -+// printf("get src: "); PRINTADDR(&src_addr); - if (ret) - goto out; - - ret = ds_get_qp(rs, &src_addr, src_len, &qp); -- printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno)); - if (ret) - goto out; - - tdest = tfind(addr, &rs->dest_map, ds_compare_addr); - if (!tdest) { -- printf("%s adding dest into map\n", __func__); - new_dest = calloc(1, sizeof(*new_dest)); - if (!new_dest) { - ret = ERR(ENOMEM); -@@ -1534,7 +1534,6 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) - memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); - ret = rs_do_connect(rs); - } else { -- printf("%s\n", __func__); - if (rs->state == rs_init) { - ret = ds_init_ep(rs); - if (ret) -@@ -1542,11 +1541,10 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) - } - - fastlock_acquire(&rs->slock); -+ PRINTADDR(addr); - ret = connect(rs->udp_sock, addr, addrlen); -- printf("%s connect %d %s\n", __func__, ret, strerror(errno)); - if (!ret) - ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); -- printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno)); - fastlock_release(&rs->slock); - } - return ret; -@@ -1605,6 +1603,8 @@ static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge, - wr.wr.ud.ah = rs->conn_dest->ah; - wr.wr.ud.remote_qpn = rs->conn_dest->qpn; - wr.wr.ud.remote_qkey = RDMA_UDP_QKEY; -+// printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah, -+// rs->conn_dest->qpn); - - return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad)); - } -@@ -1894,10 +1894,12 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc - return ret; - } - --static int ds_valid_recv(void *buf, uint32_t len) -+static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc) - { -- struct ds_header *hdr = (struct ds_header *) buf; -- return ((len >= sizeof(*hdr)) && -+ struct ds_header *hdr; -+ -+ hdr = (struct ds_header *) (qp->rbuf + ds_wr_offset(wc->wr_id)); -+ return ((wc->byte_len >= sizeof(struct ibv_grh) + sizeof(*hdr)) && - ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) || - (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN))); - } -@@ -1931,22 +1933,22 @@ static void ds_poll_cqs(struct rsocket *rs) - - if (ds_wr_is_recv(wc.wr_id)) { - if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS && -- ds_valid_recv(qp->rbuf + ds_wr_offset(wc.wr_id), -- wc.byte_len)) { -+ ds_valid_recv(qp, &wc)) { - rs->rqe_avail--; - rmsg = &rs->dmsg[rs->rmsg_tail]; - rmsg->qp = qp; - rmsg->offset = ds_wr_offset(wc.wr_id); -- rmsg->length = wc.byte_len; -+ rmsg->length = wc.byte_len - sizeof(struct ibv_grh); - if (++rs->rmsg_tail == rs->rq_size + 1) - rs->rmsg_tail = 0; - } else { -- ds_post_recv(rs, qp, qp->rbuf + -- ds_wr_offset(wc.wr_id)); -+ printf("%s invalid recv\n", __func__); -+ ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id)); - } - } else { - smsg = (struct ds_smsg *) - (rs->sbuf + ds_wr_offset(wc.wr_id)); -+ printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free); - smsg->next = rs->smsg_free; - rs->smsg_free = smsg; - rs->sqe_avail++; -@@ -1986,18 +1988,17 @@ static int ds_get_cq_event(struct rsocket *rs) - void *context; - int ret; - -- printf("%s \n", __func__); - if (!rs->cq_armed) - return 0; - -+// printf("wait for epoll event\n"); - ret = epoll_wait(rs->epfd, &event, 1, -1); -- printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno)); -+// printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno)); - if (ret <= 0) - return ret; - - qp = event.data.ptr; - ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context); -- printf("%s get cq event ret %d errno %s\n", __func__, ret, strerror(errno)); - if (!ret) { - ibv_ack_cq_events(qp->cm_id->recv_cq, 1); - qp->cq_armed = 0; -@@ -2164,15 +2165,15 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags, - struct ds_header *hdr; - int ret; - --// printf("%s \n", __func__); -+ printf("%s \n", __func__); - if (!(rs->state & rs_readable)) - return ERR(EINVAL); - - if (!rs_have_rdata(rs)) { --// printf("%s need rdata \n", __func__); -+ printf("%s need rdata \n", __func__); - ret = ds_get_comp(rs, rs_nonblocking(rs, flags), - rs_have_rdata); --// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); -+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); - if (ret) - return ret; - } -@@ -2184,10 +2185,13 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags, - - memcpy(buf, (void *) hdr + hdr->length, len); - if (addrlen) -+{ - ds_set_src(src_addr, addrlen, hdr); -+PRINTADDR(src_addr); -+} - - if (!(flags & MSG_PEEK)) { -- ds_post_recv(rs, rmsg->qp, hdr); -+ ds_post_recv(rs, rmsg->qp, rmsg->offset); - if (++rs->rmsg_head == rs->rq_size + 1) - rs->rmsg_head = 0; - } -@@ -2444,7 +2448,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, - } - - miov[0].iov_base = &hdr; -- miov[0].iov_len = sizeof hdr; -+ miov[0].iov_len = hdr.length; - if (iov && iovcnt) - memcpy(&miov[1], iov, sizeof *iov * iovcnt); - -@@ -2455,7 +2459,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, - msg.msg_iovlen = iovcnt + 1; - // printf("%s iov cnt %d\n", __func__, msg.msg_iovlen); - ret = sendmsg(rs->udp_sock, &msg, flags); -- printf("%s ret %d %s\n", __func__, ret, strerror(errno)); - return ret > 0 ? ret - sizeof hdr : ret; - } - -@@ -2463,7 +2466,7 @@ static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, - int flags, uint8_t op) - { - struct iovec iov; -- printf("%s\n", __func__); -+// printf("%s\n", __func__); - if (buf && len) { - // printf("%s have buffer\n", __func__); - iov.iov_base = (void *) buf; -@@ -2503,6 +2506,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) - sge.lkey = rs->conn_dest->qp->smr->lkey; - offset = (uint8_t *) msg - rs->sbuf; - -+ printf("%s - sending over QP\n", __func__); - ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length)); - return ret ? ret : len; - } -@@ -2604,7 +2608,8 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, - struct rsocket *rs; - int ret; - -- printf("%s\n", __func__); -+ PRINTADDR(dest_addr); -+ printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf)); - rs = idm_at(&idm, socket); - if (rs->type == SOCK_STREAM) { - if (dest_addr || addrlen) -@@ -2620,15 +2625,11 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, - } - - fastlock_acquire(&rs->slock); -- printf("%s check conn dest\n", __func__); - if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) { -- printf("%s need conn dest\n", __func__); - ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest); - if (ret) - goto out; - } -- else -- printf("%s connected\n", __func__); - - ret = dsend(rs, buf, len, flags); - out: -@@ -3195,7 +3196,8 @@ int rsetsockopt(int socket, int level, int optname, - opt_on = *(int *) optval; - break; - case SO_RCVBUF: -- if (!rs->rbuf) -+ if ((rs->type == SOCK_STREAM && !rs->rbuf) || -+ (rs->type == SOCK_DGRAM && !rs->qp_list)) - rs->rbuf_size = (*(uint32_t *) optval) << 1; - ret = 0; - break; -@@ -3658,11 +3660,9 @@ static int rs_svc_add_rs(struct rsocket *rs) - } - - svc_rss[++svc_cnt] = rs; -- printf("%s rs %p\n", __func__, rs); - svc_fds[svc_cnt].fd = rs->udp_sock; - svc_fds[svc_cnt].events = POLLIN; - svc_fds[svc_cnt].revents = 0; -- printf("add rs udp sock %d\n",rs->udp_sock); - return 0; - } - -@@ -3686,7 +3686,6 @@ static void rs_svc_process_sock(void) - struct rs_svc_msg msg; - - read(svc_sock[1], &msg, sizeof msg); -- printf("%s op %d\n",__func__, msg.op); - switch (msg.op) { - case RS_SVC_INSERT: - msg.status = rs_svc_add_rs(msg.rs); -@@ -3698,7 +3697,6 @@ static void rs_svc_process_sock(void) - msg.status = ENOTSUP; - break; - } -- printf("%s status %d\n",__func__, msg.status); - write(svc_sock[1], &msg, sizeof msg); - } - -@@ -3732,7 +3730,6 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t - struct ibv_ah_attr attr; - int ret; - -- printf("%s\n",__func__); - if (dest->ah) { - fastlock_acquire(&rs->slock); - ibv_destroy_ah(dest->ah); -@@ -3741,7 +3738,6 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t - } - - ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps); -- printf("%s rdma_create_id %d %s\n",__func__, ret, strerror(errno)); - if (ret) - return; - -@@ -3752,12 +3748,10 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t - else - saddr.sin6.sin6_port = 0; - ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000); -- printf("%s rdma_resolve_addr %d %s\n",__func__, ret, strerror(errno)); - if (ret) - goto out; - - ret = rdma_resolve_route(id, 2000); -- printf("%s rdma_resolve_route %d %s\n",__func__, ret, strerror(errno)); - if (ret) - goto out; - -@@ -3776,12 +3770,9 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t - attr.static_rate = id->route.path_rec->rate; - attr.port_num = id->port_num; - -- printf("%s getting slock \n",__func__); - fastlock_acquire(&rs->slock); -- printf("%s why am I not here? \n",__func__); - dest->qpn = qpn; - dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr); -- printf("%s ibv_create_ah %p %s\n",__func__, dest->ah, strerror(errno)); - fastlock_release(&rs->slock); - out: - rdma_destroy_id(id); -@@ -3790,17 +3781,6 @@ out: - static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, - union socket_addr *addr) - { --printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag, -- udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length); -- --printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n", --udp_hdr->tag == ntohl(DS_UDP_TAG), -- udp_hdr->version == 4, addr->sa.sa_family == AF_INET, -- udp_hdr->length == DS_UDP_IPV4_HDR_LEN, -- udp_hdr->version == 6, addr->sa.sa_family == AF_INET6, -- udp_hdr->length == DS_UDP_IPV6_HDR_LEN); -- -- - return (udp_hdr->tag == ntohl(DS_UDP_TAG)) && - ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET && - udp_hdr->length == DS_UDP_IPV4_HDR_LEN) || -@@ -3816,7 +3796,7 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len, - struct ibv_sge sge; - uint64_t offset; - -- printf("%s\n",__func__); -+// PRINTADDR(src); - if (!ds_can_send(rs)) { - if (ds_get_comp(rs, 0, ds_can_send)) - return; -@@ -3827,13 +3807,18 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len, - rs->sqe_avail--; - - ds_format_hdr(&hdr, src); -+// printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version, -+// hdr.length, hdr.port); - memcpy((void *) msg, &hdr, hdr.length); - memcpy((void *) msg + hdr.length, buf, len); -+// printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf)); - sge.addr = (uintptr_t) msg; - sge.length = hdr.length + len; - sge.lkey = rs->conn_dest->qp->smr->lkey; - offset = (uint8_t *) msg - rs->sbuf; - -+// printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version, -+// ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port); - ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length)); - } - -@@ -3845,9 +3830,9 @@ static void rs_svc_process_rs(struct rsocket *rs) - socklen_t addrlen = sizeof addr; - int len, ret; - -- printf("%s\n",__func__); - ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen); -- printf("%s recvfrom %d\n",__func__, ret); -+// PRINTADDR(&addr); -+// printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8])); - if (ret < DS_UDP_IPV4_HDR_LEN) - return; - -@@ -3855,12 +3840,10 @@ static void rs_svc_process_rs(struct rsocket *rs) - if (!rs_svc_valid_udp_hdr(udp_hdr, &addr)) - return; - -- printf("%s valid hdr\n",__func__); - len = ret - udp_hdr->length; - udp_hdr->tag = ntohl(udp_hdr->tag); - udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF; - ret = ds_get_dest(rs, &addr.sa, addrlen, &dest); -- printf("%s ds_get_dest %d\n",__func__, ret); - if (ret) - return; - -@@ -3868,16 +3851,15 @@ static void rs_svc_process_rs(struct rsocket *rs) - rs_svc_create_ah(rs, dest, udp_hdr->qpn); - - /* to do: handle when dest local ip address doesn't match udp ip */ -+ if (udp_hdr->op != RS_OP_DATA) -+ return; -+ - fastlock_acquire(&rs->slock); - cur_dest = rs->conn_dest; -- if (udp_hdr->op == RS_OP_DATA) { -- rs->conn_dest = &dest->qp->dest; -- printf("%s forwarding msg\n",__func__); -- rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); -- } -+ rs->conn_dest = &dest->qp->dest; -+ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); - - rs->conn_dest = dest; -- printf("%s sending resp\n",__func__); - ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL); - rs->conn_dest = cur_dest; - fastlock_release(&rs->slock); -@@ -3888,7 +3870,6 @@ static void *rs_svc_run(void *arg) - struct rs_svc_msg msg; - int i, ret; - -- printf("%s\n",__func__); - ret = rs_svc_grow_sets(); - if (ret) { - msg.status = ret; -@@ -3899,13 +3880,10 @@ static void *rs_svc_run(void *arg) - svc_fds[0].fd = svc_sock[1]; - svc_fds[0].events = POLLIN; - do { -- printf("%s svc cnt %d\n",__func__, svc_cnt); - for (i = 0; i <= svc_cnt; i++) - svc_fds[i].revents = 0; - -- printf("%s poll\n",__func__); - poll(svc_fds, svc_cnt + 1, -1); -- printf("%s poll done\n",__func__); - if (svc_fds[0].revents) - rs_svc_process_sock(); -- 2.46.0