#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
+#include <search.h>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
static struct rsocket **svc_rss;
static struct pollfd *svc_fds;
static uint8_t svc_buf[RS_SNDLOWAT];
-static int rs_svc_run(void *arg);
+static void *rs_svc_run(void *arg);
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
};
struct ds_qp {
- dlist_t list;
+ dlist_entry list;
struct rsocket *rs;
struct rdma_cm_id *cm_id;
struct ds_header hdr;
static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
{
if (!rs->qp_list)
- list_init(&qp->list);
+ dlist_init(&qp->list);
else
- list_insert_head(&qp->list, &rs->qp_list->list);
- rs->qp_list = *qp;
+ dlist_insert_head(&qp->list, &rs->qp_list->list);
+ rs->qp_list = qp;
}
static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
{
- if (qp->list.next != qp->list) {
+ if (qp->list.next != &qp->list) {
rs->qp_list = ds_next_qp(qp);
dlist_remove(&qp->list);
} else {
pthread_mutex_lock(&mut);
if (!svc_cnt) {
- ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_sock);
+ ret = socketpair(AF_INET, SOCK_STREAM, 0, svc_sock);
if (ret)
goto err1;
write(svc_sock[0], &msg, sizeof msg);
read(svc_sock[0], &msg, sizeof msg);
ret = ERR(msg.status);
- if (ret && !svn_cnt)
+ if (ret && !svc_cnt)
goto err3;
pthread_mutex_unlock(&mut);
write(svc_sock[0], &msg, sizeof msg);
read(svc_sock[0], &msg, sizeof msg);
ret = ERR(msg.status);
- if (!svn_cnt) {
+ if (!svc_cnt) {
pthread_join(svc_id, NULL);
close(svc_sock[0]);
close(svc_sock[1]);
return ret;
}
+static int ds_compare_addr(const void *dst1, const void *dst2)
+{
+ const struct sockaddr *sa1, *sa2;
+ size_t len;
+
+ sa1 = (const struct sockaddr *) dst1;
+ sa2 = (const struct sockaddr *) dst2;
+
+ len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
+ sizeof(struct sockaddr_in6) : sizeof(struct sockaddr);
+ return memcmp(dst1, dst2, len);
+}
+
static int rs_value_to_scale(int value, int bits)
{
return value <= (1 << (bits - 1)) ?
pthread_mutex_unlock(&mut);
}
-static int rs_insert(struct rsocket *rs, index)
+static int rs_insert(struct rsocket *rs, int index)
{
pthread_mutex_lock(&mut);
rs->index = idm_set(&idm, index, rs);
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (!rs->rmsg)
- return ERR(ENOEMEM);
+ return ERR(ENOMEM);
rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
if (!rs->sbuf)
- return ERR(ENOEMEM);
+ return ERR(ENOMEM);
rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
if (!rs->smr)
sizeof(*rs->target_iomap) * rs->target_iomap_size;
rs->target_buffer_list = malloc(len);
if (!rs->target_buffer_list)
- return ERR(ENOEMEM);
+ return ERR(ENOMEM);
rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
if (!rs->target_mr)
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
- return ERR(ENOEMEM);
+ return ERR(ENOMEM);
rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
if (!rs->rmr)
sge.addr = (uintptr_t) buf;
sge.length = RS_SNDLOWAT;
- sge.lkey = qp->rmr;
+ sge.lkey = qp->rmr->lkey;
- wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf));
+ wr.wr_id = ds_recv_wr_id((uint32_t) ((uint8_t *) buf - rs->rbuf));
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
if (qp->cm_id) {
if (qp->cm_id->qp) {
- tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_dest);
+ tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
qp->cm_id->recv_cq_channel->fd, NULL);
rdma_destroy_qp(qp->cm_id);
static void ds_free(struct rsocket *rs)
{
- struct ds_qp *qp;
-
if (rs->state & (rs_readable | rs_writable))
rs_remove_from_svc(rs);
if (rs->dmsg)
free(rs->dmsg);
- if (rs->smsg)
- free(rs->smsg);
+ if (rs->smsg_free)
+ free(rs->smsg_free);
while (rs->qp_list) {
ds_remove_qp(rs, rs->qp_list);
rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
if (!rs->dmsg)
- return ERR(ENOEMEM);
+ return ERR(ENOMEM);
rs->sbuf_bytes_avail = rs->sbuf_size;
rs->sqe_avail = rs->sq_size;
return 0;
}
-static int ds_compare_addr(const void *dst1, const void *dst2)
-{
- const struct sockaddr *sa1, *sa2;
- size_t len;
-
- sa1 = (const struct sockaddr *) dst1;
- sa2 = (const struct sockaddr *) dst2;
-
- len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
- sizeof(struct sockaddr_in6) : sizeof(struct sockaddr);
- return memcmp(dst1, dst2, len);
-}
-
static int rs_any_addr(const union socket_addr *addr)
{
if (addr->sa.sa_family == AF_INET) {
- return (addr->sin.sin_addr == INADDR_ANY ||
- addr->sin.sin_addr == INADDR_LOOPBACK);
+ return (addr->sin.sin_addr.s_addr == INADDR_ANY ||
+ addr->sin.sin_addr.s_addr == INADDR_LOOPBACK);
} else {
- return (addr->sin6.sin6_addr == in6addr_any ||
- addr->sin6.sin6_addr == in6addr_loopback);
+ return (!memcmp(&addr->sin6.sin6_addr, &in6addr_any, 16) ||
+ !memcmp(&addr->sin6.sin6_addr, &in6addr_loopback, 16));
}
}
hdr->version = 4;
hdr->length = DS_IPV4_HDR_LEN;
hdr->port = addr->sin.sin_port;
- hdr->addr.ipv4 = addr->sin.sin_addr;
+ hdr->addr.ipv4 = addr->sin.sin_addr.s_addr;
} else {
hdr->version = 6;
hdr->length = DS_IPV6_HDR_LEN;
{
struct ibv_qp_init_attr qp_attr;
struct epoll_event event;
- int ret;
+ int i, ret;
*qp = calloc(1, sizeof(struct ds_qp));
if (!*qp)
if (rs->qp_list) {
*qp = rs->qp_list;
do {
- if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
+ if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id),
+ src_addr))
return 0;
*qp = ds_next_qp(*qp);
if (ret)
goto out;
- ret = ds_get_qp(rs, src_addr, src_len, &qp);
+ ret = ds_get_qp(rs, &src_addr, src_len, &qp);
if (ret)
goto out;
- if ((addrlen != src_len) || memcmp(addr, src_addr, addrlen)) {
+ if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) {
*dest = calloc(1, sizeof(struct ds_dest));
if (!*dest) {
ret = ERR(ENOMEM);
memcpy(&(*dest)->addr, addr, addrlen);
(*dest)->qp = qp;
- tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
+ tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr);
}
out:
fastlock_release(&rs->map_lock);
wr.sg_list = sge;
wr.num_sge = 1;
wr.opcode = IBV_WR_SEND;
- wr.send_flags = (sge.length <= rs->sq_inline) ? IBV_SEND_INLINE : 0;
+ wr.send_flags = (sge->length <= rs->sq_inline) ? IBV_SEND_INLINE : 0;
wr.wr.ud.ah = rs->conn_dest->ah;
wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
rs = idm_at(&idm, socket);
if (rs->type == SOCK_DGRAM) {
fastlock_acquire(&rs->slock);
- ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen);
+ ret = ds_recvfrom(rs, buf, len, flags, NULL, 0);
fastlock_release(&rs->slock);
return ret;
}
ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
+ struct rsocket *rs;
int ret;
rs = idm_at(&idm, socket);
struct ds_udp_header hdr;
struct msghdr msg;
struct iovec miov[8];
- struct ds_qp *qp;
if (iovcnt > 8)
return ERR(ENOTSUP);
hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4;
} else {
hdr.length = DS_UDP_IPV6_HDR_LEN;
- memcpy(hdr.addr.ipv6, rs->conn_dest->qp->hdr.addr.ipv6, 16);
+ memcpy(hdr.addr.ipv6, &rs->conn_dest->qp->hdr.addr.ipv6, 16);
}
miov[0].iov_base = &hdr;
memcpy(&miov[1], iov, sizeof *iov * iovcnt);
memset(&msg, 0, sizeof msg);
- msg.msg_name = rs->conn_dest->addr;
+ msg.msg_name = &rs->conn_dest->addr;
msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
msg.msg_iov = miov;
msg.msg_iovlen = iovcnt + 1;
- return sendmsg(rs->udp_sock, msg, flags);
+ return sendmsg(rs->udp_sock, &msg, flags);
}
static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
{
struct iovec iov;
if (buf && len) {
- iov.iov_base = buf;
- iov_iov_len = len;
+ iov.iov_base = (void *) buf;
+ iov.iov_len = len;
return ds_sendv_udp(rs, &iov, 1, flags, op);
} else {
return ds_sendv_udp(rs, NULL, 0, flags, op);
struct ds_smsg *msg;
struct ibv_sge sge;
uint64_t offset;
- int flags, ret = 0;
+ int ret = 0;
if (!rs->conn_dest->ah)
return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
rs->smsg_free = msg->next;
rs->sqe_avail--;
- memcpy((void *) msg, rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length);
+ memcpy((void *) msg, &rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length);
memcpy((void *) msg + rs->conn_dest->qp->hdr.length, buf, len);
sge.addr = (uintptr_t) msg;
sge.length = rs->conn_dest->qp->hdr.length + len;
const struct sockaddr *dest_addr, socklen_t addrlen)
{
struct rsocket *rs;
+ int ret;
rs = idm_at(&idm, socket);
if (rs->type == SOCK_STREAM) {
fastlock_acquire(&rs->slock);
if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
- ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+ ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
if (ret)
goto out;
}
static void ds_shutdown(struct rsocket *rs)
{
- int ret = 0;
-
if (rs->fd_flags & O_NONBLOCK)
rs_set_nonblocking(rs, 0);
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
- ret = setsockopt(rs->udp_sock, optname, optval, optlen);
+ ret = setsockopt(rs->udp_sock, level, optname, optval, optlen);
if (ret)
return ret;
}
struct ibv_port_attr attr;
if (!ibv_query_port(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num, &attr))
- return (uint8_t) ((1 << attr.lmc) - 1));
+ return (uint8_t) ((1 << attr.lmc) - 1);
return 0x7f;
}
static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
{
- struct socket_addr saddr;
+ union socket_addr saddr;
struct rdma_cm_id *id;
struct ibv_ah_attr attr;
int ret;
fastlock_release(&rs->slock);
}
- ret = rdma_create_cm_id(NULL, &id, NULL, dest->qp->cm_id->ps);
+ ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps);
if (ret)
return;
saddr.sin.sin_port = 0;
else
saddr.sin6.sin6_port = 0;
- ret = rdma_resolve_addr(id, &saddr, &dest->addr.sa, 2000);
+ ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000);
if (ret)
goto out;
attr.is_global = 1;
attr.grh.dgid = id->route.path_rec->dgid;
attr.grh.flow_label = id->route.path_rec->flow_label;
- attr.grh.sgid_index = rs_svc_sgid_index(dest, id->route.path_rec->sgid);
+ attr.grh.sgid_index = rs_svc_sgid_index(dest, &id->route.path_rec->sgid);
attr.grh.hop_limit = id->route.path_rec->hop_limit;
attr.grh.traffic_class = id->route.path_rec->traffic_class;
}
rdma_destroy_id(id);
}
-static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_header,
+static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
union socket_addr *addr)
{
return (udp_hdr->tag == DS_UDP_TAG) &&
- ((udp_hdr->version == 4 && addr.sa.sa_family == AF_INET &&
+ ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
- (udp_hdr->version == 6 && addr.sa.sa_family == AF_INET6 &&
+ (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 &&
udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
}
cur_dest = rs->conn_dest;
if (udp_hdr->op == RS_OP_DATA) {
rs->conn_dest = &dest->qp->dest;
- rs_svc_forward(rs, buf, len, addr);
+ rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
}
rs->conn_dest = dest;
fastlock_release(&rs->slock);
}
-static int rs_svc_run(void *arg)
+static void *rs_svc_run(void *arg)
{
struct rs_svc_msg msg;
int i, ret;
ret = rs_svc_grow_sets();
if (ret) {
msg.status = ret;
- write(svc_sock[1] &msg, sizeof msg);
- return ret;
+ write(svc_sock[1], &msg, sizeof msg);
+ return (void *) (uintptr_t) ret;
}
svc_fds[0].fd = svc_sock[1];
}
} while (svc_cnt > 1);
- return 0;
+ return NULL;
}