struct rsocket;
+
+#define PRINTADDR(a) \
+printf("%s port %x ip %x\n", __func__, \
+ ((struct sockaddr_in *)a)->sin_port, \
+ ((struct sockaddr_in *)a)->sin_addr.s_addr)
+
+
+
struct rs_svc_msg {
uint32_t op;
uint32_t status;
msg.op = RS_SVC_INSERT;
msg.status = EINVAL;
- printf("%s rs %p\n", __func__, rs);
msg.rs = rs;
write(svc_sock[0], &msg, sizeof msg);
read(svc_sock[0], &msg, sizeof msg);
if (!ret && rs->state < rs_connected)
ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
} else {
- printf("%s set nonblock\n", __func__);
ret = fcntl(rs->epfd, F_SETFL, arg);
- printf("%s fcntl %d\n", __func__, ret);
-
if (!ret && rs->qp_list) {
qp = rs->qp_list;
do {
{
uint16_t max_size;
- printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
- rs->rq_size, rs->rbuf_size);
max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
if (rs->sq_size > max_size)
rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
else
rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
- printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
- rs->rq_size, rs->rbuf_size);
}
static int rs_init_bufs(struct rsocket *rs)
static int ds_init_bufs(struct ds_qp *qp)
{
- qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
+ qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh),
+ sizeof(*qp->rbuf));
if (!qp->rbuf)
return ERR(ENOMEM);
if (!qp->smr)
return -1;
- qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
+ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size +
+ sizeof(struct ibv_grh));
if (!qp->rmr)
return -1;
static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
{
cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
- printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel);
if (!cm_id->recv_cq_channel)
return -1;
cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
cm_id, cm_id->recv_cq_channel, 0);
- printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size);
if (!cm_id->recv_cq)
goto err1;
if (rs->fd_flags & O_NONBLOCK) {
- printf("%s set nonblock\n", __func__);
- if (rs_set_nonblocking(rs, O_NONBLOCK))
+ if (fcntl(cm_id->recv_cq_channel->fd, F_SETFL, O_NONBLOCK))
goto err2;
+ } else {
+ ibv_req_notify_cq(cm_id->recv_cq, 0);
}
cm_id->send_cq_channel = cm_id->recv_cq_channel;
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
-static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
+static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset)
{
struct ibv_recv_wr wr, *bad;
- struct ibv_sge sge;
+ struct ibv_sge sge[2];
- sge.addr = (uintptr_t) buf;
- sge.length = RS_SNDLOWAT;
- sge.lkey = qp->rmr->lkey;
+ sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size;
+ sge[0].length = sizeof(struct ibv_grh);
+ sge[0].lkey = qp->rmr->lkey;
+ sge[1].addr = (uintptr_t) qp->rbuf + offset;
+ sge[1].length = RS_SNDLOWAT;
+ sge[1].lkey = qp->rmr->lkey;
- wr.wr_id = ds_recv_wr_id((uint32_t) ((uint8_t *) buf - rs->rbuf));
+ wr.wr_id = ds_recv_wr_id(offset);
wr.next = NULL;
- wr.sg_list = &sge;
- wr.num_sge = 1;
+ wr.sg_list = sge;
+ wr.num_sge = 2;
return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
}
rs->cm_id->route.addr.src_addr.sa_family = domain;
index = rs->cm_id->channel->fd;
} else {
- printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size);
ret = ds_init(rs, domain);
if (ret)
goto err;
}
if (rs->fd_flags & O_NONBLOCK)
- rs_set_nonblocking(new_rs, O_NONBLOCK);
+ fcntl(new_rs->cm_id->channel->fd, F_SETFL, O_NONBLOCK);
ret = rs_create_ep(new_rs);
if (ret)
break;
case rs_accepting:
if (!(rs->fd_flags & O_NONBLOCK))
- rs_set_nonblocking(rs, 0);
+ fcntl(rs->cm_id->channel->fd, F_SETFL, 0);
ret = ucma_complete(rs->cm_id);
if (ret)
int sock, ret;
uint16_t port;
+// printf("dest: "); PRINTADDR(dest_addr);
*src_len = sizeof src_addr;
ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
+// printf("src: "); PRINTADDR(src_addr);
if (ret || !rs_any_addr(src_addr))
return ret;
*src_len = sizeof src_addr;
ret = getsockname(sock, &src_addr->sa, src_len);
src_addr->sin.sin_port = port;
+// printf("selected src: ");
out:
close(sock);
return ret;
static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
{
if (addr->sa.sa_family == AF_INET) {
+ PRINTADDR(addr);
hdr->version = 4;
hdr->length = DS_IPV4_HDR_LEN;
hdr->port = addr->sin.sin_port;
struct ibv_ah_attr attr;
int ret;
+// printf("%s\n", __func__);
memcpy(&qp->dest.addr, addr, addrlen);
qp->dest.qp = qp;
qp->dest.qpn = qp->cm_id->qp->qp_num;
attr.dlid = port_attr.lid;
attr.port_num = qp->cm_id->port_num;
qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
+// printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid,
+// attr.port_num, qp->dest.qpn);
if (!qp->dest.ah)
return ERR(ENOMEM);
struct epoll_event event;
int i, ret;
-printf("%s\n", __func__);
+ PRINTADDR(src_addr);
qp = calloc(1, sizeof(*qp));
if (!qp)
return ERR(ENOMEM);
qp->rs = rs;
ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
- printf("%s rdma_create_id %d\n", __func__, ret);
if (ret)
goto err;
ds_format_hdr(&qp->hdr, src_addr);
ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
- printf("%s rdma_bind_addr %d\n", __func__, ret);
if (ret)
goto err;
ret = ds_init_bufs(qp);
- printf("%s ds_init_bufs %d\n", __func__, ret);
if (ret)
goto err;
ret = rs_create_cq(rs, qp->cm_id);
- printf("%s rs_create_cq %d\n", __func__, ret);
if (ret)
goto err;
qp_attr.sq_sig_all = 1;
qp_attr.cap.max_send_wr = rs->sq_size;
qp_attr.cap.max_recv_wr = rs->rq_size;
- qp_attr.cap.max_send_sge = 2;
- qp_attr.cap.max_recv_sge = 1;
+ qp_attr.cap.max_send_sge = 1;
+ qp_attr.cap.max_recv_sge = 2;
qp_attr.cap.max_inline_data = rs->sq_inline;
ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
- printf("%s rdma_create_qp %d\n", __func__, ret);
if (ret)
goto err;
ret = ds_add_qp_dest(qp, src_addr, addrlen);
- printf("%s ds_add_qp_dest %d\n", __func__, ret);
if (ret)
goto err;
event.data.ptr = qp;
ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
qp->cm_id->recv_cq_channel->fd, &event);
- printf("%s epoll_ctl %d\n", __func__, ret);
if (ret)
goto err;
for (i = 0; i < rs->rq_size; i++) {
- ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT);
+ ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT);
if (ret)
goto err;
}
struct ds_dest **tdest, *new_dest;
int ret = 0;
- printf("%s \n", __func__);
+ PRINTADDR(addr);
fastlock_acquire(&rs->map_lock);
tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
- printf("%s tfind %p\n", __func__, dest);
if (tdest)
goto found;
ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
- printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno));
+// printf("get src: "); PRINTADDR(&src_addr);
if (ret)
goto out;
ret = ds_get_qp(rs, &src_addr, src_len, &qp);
- printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno));
if (ret)
goto out;
tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
if (!tdest) {
- printf("%s adding dest into map\n", __func__);
new_dest = calloc(1, sizeof(*new_dest));
if (!new_dest) {
ret = ERR(ENOMEM);
memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
ret = rs_do_connect(rs);
} else {
- printf("%s\n", __func__);
if (rs->state == rs_init) {
ret = ds_init_ep(rs);
if (ret)
}
fastlock_acquire(&rs->slock);
+ PRINTADDR(addr);
ret = connect(rs->udp_sock, addr, addrlen);
- printf("%s connect %d %s\n", __func__, ret, strerror(errno));
if (!ret)
ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
- printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno));
fastlock_release(&rs->slock);
}
return ret;
wr.wr.ud.ah = rs->conn_dest->ah;
wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
+// printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah,
+// rs->conn_dest->qpn);
return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
}
return ret;
}
-static int ds_valid_recv(void *buf, uint32_t len)
+static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc)
{
- struct ds_header *hdr = (struct ds_header *) buf;
- return ((len >= sizeof(*hdr)) &&
+ struct ds_header *hdr;
+
+ hdr = (struct ds_header *) (qp->rbuf + ds_wr_offset(wc->wr_id));
+ return ((wc->byte_len >= sizeof(struct ibv_grh) + sizeof(*hdr)) &&
((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
(hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
}
if (ds_wr_is_recv(wc.wr_id)) {
if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
- ds_valid_recv(qp->rbuf + ds_wr_offset(wc.wr_id),
- wc.byte_len)) {
+ ds_valid_recv(qp, &wc)) {
rs->rqe_avail--;
rmsg = &rs->dmsg[rs->rmsg_tail];
rmsg->qp = qp;
rmsg->offset = ds_wr_offset(wc.wr_id);
- rmsg->length = wc.byte_len;
+ rmsg->length = wc.byte_len - sizeof(struct ibv_grh);
if (++rs->rmsg_tail == rs->rq_size + 1)
rs->rmsg_tail = 0;
} else {
- ds_post_recv(rs, qp, qp->rbuf +
- ds_wr_offset(wc.wr_id));
+ printf("%s invalid recv\n", __func__);
+ ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id));
}
} else {
smsg = (struct ds_smsg *)
(rs->sbuf + ds_wr_offset(wc.wr_id));
+ printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
smsg->next = rs->smsg_free;
rs->smsg_free = smsg;
rs->sqe_avail++;
void *context;
int ret;
- printf("%s \n", __func__);
if (!rs->cq_armed)
return 0;
+// printf("wait for epoll event\n");
ret = epoll_wait(rs->epfd, &event, 1, -1);
- printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
+// printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
if (ret <= 0)
return ret;
qp = event.data.ptr;
ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context);
- printf("%s get cq event ret %d errno %s\n", __func__, ret, strerror(errno));
if (!ret) {
ibv_ack_cq_events(qp->cm_id->recv_cq, 1);
qp->cq_armed = 0;
struct ds_header *hdr;
int ret;
-// printf("%s \n", __func__);
+ printf("%s \n", __func__);
if (!(rs->state & rs_readable))
return ERR(EINVAL);
if (!rs_have_rdata(rs)) {
-// printf("%s need rdata \n", __func__);
+ printf("%s need rdata \n", __func__);
ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
rs_have_rdata);
-// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
if (ret)
return ret;
}
memcpy(buf, (void *) hdr + hdr->length, len);
if (addrlen)
+{
ds_set_src(src_addr, addrlen, hdr);
+PRINTADDR(src_addr);
+}
if (!(flags & MSG_PEEK)) {
- ds_post_recv(rs, rmsg->qp, hdr);
+ ds_post_recv(rs, rmsg->qp, rmsg->offset);
if (++rs->rmsg_head == rs->rq_size + 1)
rs->rmsg_head = 0;
}
}
miov[0].iov_base = &hdr;
- miov[0].iov_len = sizeof hdr;
+ miov[0].iov_len = hdr.length;
if (iov && iovcnt)
memcpy(&miov[1], iov, sizeof *iov * iovcnt);
msg.msg_iovlen = iovcnt + 1;
// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
ret = sendmsg(rs->udp_sock, &msg, flags);
- printf("%s ret %d %s\n", __func__, ret, strerror(errno));
return ret > 0 ? ret - sizeof hdr : ret;
}
int flags, uint8_t op)
{
struct iovec iov;
- printf("%s\n", __func__);
+// printf("%s\n", __func__);
if (buf && len) {
// printf("%s have buffer\n", __func__);
iov.iov_base = (void *) buf;
sge.lkey = rs->conn_dest->qp->smr->lkey;
offset = (uint8_t *) msg - rs->sbuf;
+ printf("%s - sending over QP\n", __func__);
ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
return ret ? ret : len;
}
struct rsocket *rs;
int ret;
- printf("%s\n", __func__);
+ PRINTADDR(dest_addr);
+ printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
rs = idm_at(&idm, socket);
if (rs->type == SOCK_STREAM) {
if (dest_addr || addrlen)
}
fastlock_acquire(&rs->slock);
- printf("%s check conn dest\n", __func__);
if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
- printf("%s need conn dest\n", __func__);
ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
if (ret)
goto out;
}
- else
- printf("%s connected\n", __func__);
ret = dsend(rs, buf, len, flags);
out:
opt_on = *(int *) optval;
break;
case SO_RCVBUF:
- if (!rs->rbuf)
+ if ((rs->type == SOCK_STREAM && !rs->rbuf) ||
+ (rs->type == SOCK_DGRAM && !rs->qp_list))
rs->rbuf_size = (*(uint32_t *) optval) << 1;
ret = 0;
break;
}
svc_rss[++svc_cnt] = rs;
- printf("%s rs %p\n", __func__, rs);
svc_fds[svc_cnt].fd = rs->udp_sock;
svc_fds[svc_cnt].events = POLLIN;
svc_fds[svc_cnt].revents = 0;
- printf("add rs udp sock %d\n",rs->udp_sock);
return 0;
}
struct rs_svc_msg msg;
read(svc_sock[1], &msg, sizeof msg);
- printf("%s op %d\n",__func__, msg.op);
switch (msg.op) {
case RS_SVC_INSERT:
msg.status = rs_svc_add_rs(msg.rs);
msg.status = ENOTSUP;
break;
}
- printf("%s status %d\n",__func__, msg.status);
write(svc_sock[1], &msg, sizeof msg);
}
struct ibv_ah_attr attr;
int ret;
- printf("%s\n",__func__);
if (dest->ah) {
fastlock_acquire(&rs->slock);
ibv_destroy_ah(dest->ah);
}
ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps);
- printf("%s rdma_create_id %d %s\n",__func__, ret, strerror(errno));
if (ret)
return;
else
saddr.sin6.sin6_port = 0;
ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000);
- printf("%s rdma_resolve_addr %d %s\n",__func__, ret, strerror(errno));
if (ret)
goto out;
ret = rdma_resolve_route(id, 2000);
- printf("%s rdma_resolve_route %d %s\n",__func__, ret, strerror(errno));
if (ret)
goto out;
attr.static_rate = id->route.path_rec->rate;
attr.port_num = id->port_num;
- printf("%s getting slock \n",__func__);
fastlock_acquire(&rs->slock);
- printf("%s why am I not here? \n",__func__);
dest->qpn = qpn;
dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr);
- printf("%s ibv_create_ah %p %s\n",__func__, dest->ah, strerror(errno));
fastlock_release(&rs->slock);
out:
rdma_destroy_id(id);
static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
union socket_addr *addr)
{
-printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag,
- udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length);
-
-printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n",
-udp_hdr->tag == ntohl(DS_UDP_TAG),
- udp_hdr->version == 4, addr->sa.sa_family == AF_INET,
- udp_hdr->length == DS_UDP_IPV4_HDR_LEN,
- udp_hdr->version == 6, addr->sa.sa_family == AF_INET6,
- udp_hdr->length == DS_UDP_IPV6_HDR_LEN);
-
-
return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
struct ibv_sge sge;
uint64_t offset;
- printf("%s\n",__func__);
+// PRINTADDR(src);
if (!ds_can_send(rs)) {
if (ds_get_comp(rs, 0, ds_can_send))
return;
rs->sqe_avail--;
ds_format_hdr(&hdr, src);
+// printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version,
+// hdr.length, hdr.port);
memcpy((void *) msg, &hdr, hdr.length);
memcpy((void *) msg + hdr.length, buf, len);
+// printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf));
sge.addr = (uintptr_t) msg;
sge.length = hdr.length + len;
sge.lkey = rs->conn_dest->qp->smr->lkey;
offset = (uint8_t *) msg - rs->sbuf;
+// printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version,
+// ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port);
ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
}
socklen_t addrlen = sizeof addr;
int len, ret;
- printf("%s\n",__func__);
ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
- printf("%s recvfrom %d\n",__func__, ret);
+// PRINTADDR(&addr);
+// printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8]));
if (ret < DS_UDP_IPV4_HDR_LEN)
return;
if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
return;
- printf("%s valid hdr\n",__func__);
len = ret - udp_hdr->length;
udp_hdr->tag = ntohl(udp_hdr->tag);
udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
- printf("%s ds_get_dest %d\n",__func__, ret);
if (ret)
return;
rs_svc_create_ah(rs, dest, udp_hdr->qpn);
/* to do: handle when dest local ip address doesn't match udp ip */
+ if (udp_hdr->op != RS_OP_DATA)
+ return;
+
fastlock_acquire(&rs->slock);
cur_dest = rs->conn_dest;
- if (udp_hdr->op == RS_OP_DATA) {
- rs->conn_dest = &dest->qp->dest;
- printf("%s forwarding msg\n",__func__);
- rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
- }
+ rs->conn_dest = &dest->qp->dest;
+ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
rs->conn_dest = dest;
- printf("%s sending resp\n",__func__);
ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
rs->conn_dest = cur_dest;
fastlock_release(&rs->slock);
struct rs_svc_msg msg;
int i, ret;
- printf("%s\n",__func__);
ret = rs_svc_grow_sets();
if (ret) {
msg.status = ret;
svc_fds[0].fd = svc_sock[1];
svc_fds[0].events = POLLIN;
do {
- printf("%s svc cnt %d\n",__func__, svc_cnt);
for (i = 0; i <= svc_cnt; i++)
svc_fds[i].revents = 0;
- printf("%s poll\n",__func__);
poll(svc_fds, svc_cnt + 1, -1);
- printf("%s poll done\n",__func__);
if (svc_fds[0].revents)
rs_svc_process_sock();