msg.op = RS_SVC_INSERT;
msg.status = EINVAL;
+ printf("%s rs %p\n", __func__, rs);
msg.rs = rs;
write(svc_sock[0], &msg, sizeof msg);
read(svc_sock[0], &msg, sizeof msg);
if (!ret && rs->state < rs_connected)
ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
} else {
+ printf("%s set nonblock\n", __func__);
ret = fcntl(rs->epfd, F_SETFL, arg);
+ printf("%s fcntl %d\n", __func__, ret);
if (!ret && rs->qp_list) {
qp = rs->qp_list;
{
uint16_t max_size;
+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
+ rs->rq_size, rs->rbuf_size);
max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
if (rs->sq_size > max_size)
rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
else
rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
+ printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
+ rs->rq_size, rs->rbuf_size);
}
static int rs_init_bufs(struct rsocket *rs)
static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
{
cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
+ printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel);
if (!cm_id->recv_cq_channel)
return -1;
cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
cm_id, cm_id->recv_cq_channel, 0);
+ printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size);
if (!cm_id->recv_cq)
goto err1;
if (rs->fd_flags & O_NONBLOCK) {
+ printf("%s set nonblock\n", __func__);
if (rs_set_nonblocking(rs, O_NONBLOCK))
goto err2;
}
static void ds_free(struct rsocket *rs)
{
+ struct ds_qp *qp;
+
if (rs->state & (rs_readable | rs_writable))
rs_remove_from_svc(rs);
if (rs->dmsg)
free(rs->dmsg);
- if (rs->smsg_free)
- free(rs->smsg_free);
-
- while (rs->qp_list) {
- ds_remove_qp(rs, rs->qp_list);
- ds_free_qp(rs->qp_list);
+ while ((qp = rs->qp_list)) {
+ ds_remove_qp(rs, qp);
+ ds_free_qp(qp);
}
if (rs->epfd >= 0)
return 0;
}
+static int ds_init_ep(struct rsocket *rs)
+{
+ struct ds_smsg *msg;
+ int i, ret;
+
+ ds_set_qp_size(rs);
+
+ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
+ if (!rs->sbuf)
+ return ERR(ENOMEM);
+
+ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
+ if (!rs->dmsg)
+ return ERR(ENOMEM);
+
+ rs->sqe_avail = rs->sq_size;
+ rs->rqe_avail = rs->rq_size;
+
+ rs->smsg_free = (struct ds_smsg *) rs->sbuf;
+ msg = rs->smsg_free;
+ for (i = 0; i < rs->sq_size - 1; i++) {
+ msg->next = (void *) msg + RS_SNDLOWAT;
+ msg = msg->next;
+ }
+ msg->next = NULL;
+
+ ret = rs_add_to_svc(rs);
+ if (ret)
+ return ret;
+
+ rs->state = rs_readable | rs_writable;
+ return 0;
+}
+
int rsocket(int domain, int type, int protocol)
{
struct rsocket *rs;
rs->cm_id->route.addr.src_addr.sa_family = domain;
index = rs->cm_id->channel->fd;
} else {
+ printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size);
ret = ds_init(rs, domain);
if (ret)
goto err;
if (!ret)
rs->state = rs_bound;
} else {
- ret = bind(rs->udp_sock, addr, addrlen);
- if (!ret) {
- ret = rs_add_to_svc(rs);
- if (!ret)
- rs->state = rs_readable | rs_writable;
+ if (rs->state == rs_init) {
+ ret = ds_init_ep(rs);
+ if (ret)
+ return ret;
}
+ ret = bind(rs->udp_sock, addr, addrlen);
}
return ret;
}
return ret;
}
-static int ds_init_ep(struct rsocket *rs)
-{
- struct ds_smsg *msg;
- int i, ret;
-
- ds_set_qp_size(rs);
-
- rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
- if (!rs->sbuf)
- return ERR(ENOMEM);
-
- rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
- if (!rs->dmsg)
- return ERR(ENOMEM);
-
- rs->sbuf_bytes_avail = rs->sbuf_size;
- rs->sqe_avail = rs->sq_size;
- rs->rqe_avail = rs->rq_size;
-
- rs->smsg_free = (struct ds_smsg *) rs->sbuf;
- msg = rs->smsg_free;
- for (i = 0; i < rs->sq_size - 1; i++) {
- msg->next = (void *) msg + i * RS_SNDLOWAT;
- msg = msg->next;
- }
- msg->next = NULL;
-
- ret = rs_add_to_svc(rs);
- if (ret)
- return ret;
-
- rs->state = rs_readable | rs_writable;
- return 0;
-}
-
static int rs_any_addr(const union socket_addr *addr)
{
if (addr->sa.sa_family == AF_INET) {
}
static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
- socklen_t addrlen, struct ds_qp **qp)
+ socklen_t addrlen, struct ds_qp **new_qp)
{
+ struct ds_qp *qp;
struct ibv_qp_init_attr qp_attr;
struct epoll_event event;
int i, ret;
- *qp = calloc(1, sizeof(struct ds_qp));
- if (!*qp)
+printf("%s\n", __func__);
+ qp = calloc(1, sizeof(*qp));
+ if (!qp)
return ERR(ENOMEM);
- (*qp)->rs = rs;
- ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
+ qp->rs = rs;
+ ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
+ printf("%s rdma_create_id %d\n", __func__, ret);
if (ret)
goto err;
- ds_format_hdr(&(*qp)->hdr, src_addr);
- ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
+ ds_format_hdr(&qp->hdr, src_addr);
+ ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
+ printf("%s rdma_bind_addr %d\n", __func__, ret);
if (ret)
goto err;
- ret = ds_init_bufs(*qp);
+ ret = ds_init_bufs(qp);
+ printf("%s ds_init_bufs %d\n", __func__, ret);
if (ret)
goto err;
- ret = rs_create_cq(rs, (*qp)->cm_id);
+ ret = rs_create_cq(rs, qp->cm_id);
+ printf("%s rs_create_cq %d\n", __func__, ret);
if (ret)
goto err;
memset(&qp_attr, 0, sizeof qp_attr);
qp_attr.qp_context = qp;
- qp_attr.send_cq = rs->cm_id->send_cq;
- qp_attr.recv_cq = rs->cm_id->recv_cq;
+ qp_attr.send_cq = qp->cm_id->send_cq;
+ qp_attr.recv_cq = qp->cm_id->recv_cq;
qp_attr.qp_type = IBV_QPT_UD;
qp_attr.sq_sig_all = 1;
qp_attr.cap.max_send_wr = rs->sq_size;
qp_attr.cap.max_send_sge = 2;
qp_attr.cap.max_recv_sge = 1;
qp_attr.cap.max_inline_data = rs->sq_inline;
- ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
+ ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
+ printf("%s rdma_create_qp %d\n", __func__, ret);
if (ret)
goto err;
- ret = ds_add_qp_dest(*qp, src_addr, addrlen);
+ ret = ds_add_qp_dest(qp, src_addr, addrlen);
+ printf("%s ds_add_qp_dest %d\n", __func__, ret);
if (ret)
goto err;
event.events = EPOLLIN;
- event.data.ptr = *qp;
+ event.data.ptr = qp;
ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
- (*qp)->cm_id->recv_cq_channel->fd, &event);
+ qp->cm_id->recv_cq_channel->fd, &event);
+ printf("%s epoll_ctl %d\n", __func__, ret);
if (ret)
goto err;
for (i = 0; i < rs->rq_size; i++) {
- ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
+ ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT);
if (ret)
goto err;
}
- ds_insert_qp(rs, *qp);
+ ds_insert_qp(rs, qp);
+ *new_qp = qp;
return 0;
err:
- ds_free_qp(*qp);
+ ds_free_qp(qp);
return ret;
}
union socket_addr src_addr;
socklen_t src_len;
struct ds_qp *qp;
+ struct ds_dest **tdest, *new_dest;
int ret = 0;
+ printf("%s \n", __func__);
fastlock_acquire(&rs->map_lock);
- dest = tfind(addr, &rs->dest_map, ds_compare_addr);
- if (dest)
- goto out;
-
- if (rs->state == rs_init) {
- ret = ds_init_ep(rs);
- if (ret)
- goto out;
- }
+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
+ printf("%s tfind %p\n", __func__, dest);
+ if (tdest)
+ goto found;
ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
+ printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno));
if (ret)
goto out;
ret = ds_get_qp(rs, &src_addr, src_len, &qp);
+ printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno));
if (ret)
goto out;
- if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) {
- *dest = calloc(1, sizeof(struct ds_dest));
- if (!*dest) {
+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
+ if (!tdest) {
+ printf("%s adding dest into map\n", __func__);
+ new_dest = calloc(1, sizeof(*new_dest));
+ if (!new_dest) {
ret = ERR(ENOMEM);
goto out;
}
- memcpy(&(*dest)->addr, addr, addrlen);
- (*dest)->qp = qp;
- tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr);
+ memcpy(&new_dest->addr, addr, addrlen);
+ new_dest->qp = qp;
+ tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr);
}
+
+found:
+ *dest = *tdest;
out:
fastlock_release(&rs->map_lock);
return ret;
memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
ret = rs_do_connect(rs);
} else {
+ printf("%s\n", __func__);
+ if (rs->state == rs_init) {
+ ret = ds_init_ep(rs);
+ if (ret)
+ return ret;
+ }
+
fastlock_acquire(&rs->slock);
ret = connect(rs->udp_sock, addr, addrlen);
+ printf("%s connect %d %s\n", __func__, ret, strerror(errno));
if (!ret)
ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+ printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno));
fastlock_release(&rs->slock);
}
return ret;
do {
ds_poll_cqs(rs);
if (test(rs)) {
- printf("%s test succeeded\n", __func__);
+// printf("%s test succeeded\n", __func__);
ret = 0;
break;
} else if (nonblock) {
ret = ERR(EWOULDBLOCK);
- printf("%s nonblocking \n", __func__);
+// printf("%s nonblocking \n", __func__);
} else if (!rs->cq_armed) {
- printf("%s req notify \n", __func__);
+// printf("%s req notify \n", __func__);
ds_req_notify_cqs(rs);
rs->cq_armed = 1;
} else {
fastlock_release(&rs->cq_lock);
ret = ds_get_cq_event(rs);
- printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
+// printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
fastlock_release(&rs->cq_wait_lock);
fastlock_acquire(&rs->cq_lock);
}
} while (!ret);
fastlock_release(&rs->cq_lock);
- printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
return ret;
}
do {
ret = ds_process_cqs(rs, 1, test);
- printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
if (!ret || nonblock || errno != EWOULDBLOCK)
return ret;
struct ds_header *hdr;
int ret;
-ret = 0;
- printf("%s \n", __func__);
+// printf("%s \n", __func__);
if (!(rs->state & rs_readable))
return ERR(EINVAL);
if (!rs_have_rdata(rs)) {
- printf("%s need rdata \n", __func__);
+// printf("%s need rdata \n", __func__);
ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
rs_have_rdata);
- printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno));
+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
if (ret)
return ret;
}
rs->rmsg_head = 0;
}
+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
return len;
}
struct ds_udp_header hdr;
struct msghdr msg;
struct iovec miov[8];
+ ssize_t ret;
+// printf("%s\n", __func__);
if (iovcnt > 8)
return ERR(ENOTSUP);
hdr.tag = htonl(DS_UDP_TAG);
- hdr.version = 1;
+ hdr.version = rs->conn_dest->qp->hdr.version;
hdr.op = op;
hdr.reserved = 0;
hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
msg.msg_iov = miov;
msg.msg_iovlen = iovcnt + 1;
- return sendmsg(rs->udp_sock, &msg, flags);
+// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
+ ret = sendmsg(rs->udp_sock, &msg, flags);
+ printf("%s ret %d %s\n", __func__, ret, strerror(errno));
+ return ret > 0 ? ret - sizeof hdr : ret;
}
static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
int flags, uint8_t op)
{
struct iovec iov;
+ printf("%s\n", __func__);
if (buf && len) {
+// printf("%s have buffer\n", __func__);
iov.iov_base = (void *) buf;
iov.iov_len = len;
return ds_sendv_udp(rs, &iov, 1, flags, op);
} else {
+// printf("%s no buffer\n", __func__);
return ds_sendv_udp(rs, NULL, 0, flags, op);
}
}
uint64_t offset;
int ret = 0;
+ printf("%s\n", __func__);
if (!rs->conn_dest->ah)
return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
struct rsocket *rs;
int ret;
+ printf("%s\n", __func__);
rs = idm_at(&idm, socket);
if (rs->type == SOCK_STREAM) {
if (dest_addr || addrlen)
return rsend(socket, buf, len, flags);
}
+ if (rs->state == rs_init) {
+ ret = ds_init_ep(rs);
+ if (ret)
+ return ret;
+ }
+
fastlock_acquire(&rs->slock);
+ printf("%s check conn dest\n", __func__);
if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
+ printf("%s need conn dest\n", __func__);
ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
if (ret)
goto out;
}
+ else
+ printf("%s connected\n", __func__);
+
ret = dsend(rs, buf, len, flags);
out:
fastlock_release(&rs->slock);
}
svc_rss[++svc_cnt] = rs;
+ printf("%s rs %p\n", __func__, rs);
svc_fds[svc_cnt].fd = rs->udp_sock;
svc_fds[svc_cnt].events = POLLIN;
svc_fds[svc_cnt].revents = 0;
+ printf("add rs udp sock %d\n",rs->udp_sock);
return 0;
}
struct rs_svc_msg msg;
read(svc_sock[1], &msg, sizeof msg);
+ printf("%s op %d\n",__func__, msg.op);
switch (msg.op) {
case RS_SVC_INSERT:
msg.status = rs_svc_add_rs(msg.rs);
msg.status = ENOTSUP;
break;
}
+ printf("%s status %d\n",__func__, msg.status);
write(svc_sock[1], &msg, sizeof msg);
}
struct ibv_ah_attr attr;
int ret;
+ printf("%s\n",__func__);
if (dest->ah) {
fastlock_acquire(&rs->slock);
ibv_destroy_ah(dest->ah);
static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
union socket_addr *addr)
{
- return (udp_hdr->tag == DS_UDP_TAG) &&
+printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag,
+ udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length);
+
+printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n",
+udp_hdr->tag == ntohl(DS_UDP_TAG),
+ udp_hdr->version == 4, addr->sa.sa_family == AF_INET,
+ udp_hdr->length == DS_UDP_IPV4_HDR_LEN,
+ udp_hdr->version == 6, addr->sa.sa_family == AF_INET6,
+ udp_hdr->length == DS_UDP_IPV6_HDR_LEN);
+
+
+ return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
(udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 &&
struct ibv_sge sge;
uint64_t offset;
+ printf("%s\n",__func__);
if (!ds_can_send(rs)) {
if (ds_get_comp(rs, 0, ds_can_send))
return;
socklen_t addrlen = sizeof addr;
int len, ret;
+ printf("%s\n",__func__);
ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
+ printf("%s recvfrom %d\n",__func__, ret);
if (ret < DS_UDP_IPV4_HDR_LEN)
return;
if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
return;
+ printf("%s valid hdr\n",__func__);
len = ret - udp_hdr->length;
udp_hdr->tag = ntohl(udp_hdr->tag);
udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
+ printf("%s ds_get_dest %d\n",__func__, ret);
if (ret)
return;
cur_dest = rs->conn_dest;
if (udp_hdr->op == RS_OP_DATA) {
rs->conn_dest = &dest->qp->dest;
+ printf("%s forwarding msg\n",__func__);
rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
}
rs->conn_dest = dest;
+ printf("%s sending resp\n",__func__);
ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL);
rs->conn_dest = cur_dest;
fastlock_release(&rs->slock);
struct rs_svc_msg msg;
int i, ret;
+ printf("%s\n",__func__);
ret = rs_svc_grow_sets();
if (ret) {
msg.status = ret;
svc_fds[0].fd = svc_sock[1];
svc_fds[0].events = POLLIN;
do {
+ printf("%s svc cnt %d\n",__func__, svc_cnt);
for (i = 0; i <= svc_cnt; i++)
svc_fds[i].revents = 0;
+ printf("%s poll\n",__func__);
poll(svc_fds, svc_cnt + 1, -1);
+ printf("%s poll done\n",__func__);
if (svc_fds[0].revents)
rs_svc_process_sock();
if (svc_fds[i].revents)
rs_svc_process_rs(svc_rss[i]);
}
- } while (svc_cnt > 1);
+ } while (svc_cnt >= 1);
return NULL;
}