#define RS_OLAP_START_SIZE 2048
#define RS_MAX_TRANSFER 65536
-#define RS_SNDLOWAT 64
+#define RS_SNDLOWAT 2048
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+enum {
+ RS_SVC_INSERT,
+ RS_SVC_REMOVE
+};
+
+struct rsocket;
+
+struct rs_svc_msg {
+ uint32_t op;
+ uint32_t status;
+ struct rsocket *rs;
+};
+
+static pthread_t svc_id;
+static int svc_cnt;
+static int svc_fds[2];
+
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
uint32_t data;
};
+struct ds_qp;
+
+struct ds_msg {
+ struct ds_qp *qp;
+};
+
struct rs_sge {
uint64_t addr;
uint32_t key;
rs_connecting = rs_opening | 0x0040,
rs_accepting = rs_opening | 0x0080,
rs_connected = 0x0100,
- rs_connect_wr = 0x0200,
- rs_connect_rd = 0x0400,
- rs_connect_rdwr = rs_connected | rs_connect_rd | rs_connect_wr,
+ rs_writable = 0x0200,
+ rs_readable = 0x0400,
+ rs_connect_rdwr = rs_connected | rs_readable | rs_writable,
rs_connect_error = 0x0800,
rs_disconnected = 0x1000,
rs_error = 0x2000,
#define RS_OPT_SWAP_SGL 1
-struct rsocket {
+union socket_addr {
+ struct sockaddr sa;
+ struct sockaddr_in sin;
+ struct sockaddr_in6 sin6;
+};
+
+struct ds_qp {
+ dlist_t list;
+ struct rsocket *rs;
struct rdma_cm_id *cm_id;
+
+ struct ibv_mr *rmr;
+ uint8_t *rbuf;
+
+ struct ibv_mr *smr;
+ uint16_t lid;
+ uint8_t sl;
+};
+
+struct ds_dest {
+ union socket_addr addr; /* must be first */
+ struct ds_qp *qp;
+ struct ibv_ah *ah;
+ uint32_t qpn;
+};
+
+struct rsocket {
+ int type;
+ int index;
fastlock_t slock;
fastlock_t rlock;
fastlock_t cq_lock;
fastlock_t cq_wait_lock;
- fastlock_t iomap_lock;
+ fastlock_t map_lock; /* acquire slock first if needed */
+
+ union {
+ /* data stream */
+ struct {
+ struct rdma_cm_id *cm_id;
+
+ int remote_sge;
+ struct rs_sge remote_sgl;
+ struct rs_sge remote_iomap;
+
+ struct ibv_mr *target_mr;
+ int target_sge;
+ int target_iomap_size;
+ void *target_buffer_list;
+ volatile struct rs_sge *target_sgl;
+ struct rs_iomap *target_iomap;
+ };
+ /* datagram */
+ struct {
+ dlist_t qp_list;
+ void *dest_map;
+ struct ds_dest *conn_dest;
+ struct pollfd *fds;
+ nfds_t nfds;
+ int dsock;
+ int sbytes_needed;
+ };
+ };
int opts;
long fd_flags;
int cq_armed;
int retries;
int err;
- int index;
+
int ctrl_avail;
int sqe_avail;
int sbuf_bytes_avail;
int rbuf_offset;
int rmsg_head;
int rmsg_tail;
- struct rs_msg *rmsg;
-
- int remote_sge;
- struct rs_sge remote_sgl;
- struct rs_sge remote_iomap;
+ union {
+ struct rs_msg *rmsg;
+ struct ds_msg *dmsg;
+ };
struct rs_iomap_mr *remote_iomappings;
dlist_entry iomap_list;
dlist_entry iomap_queue;
int iomap_pending;
- struct ibv_mr *target_mr;
- int target_sge;
- int target_iomap_size;
- void *target_buffer_list;
- volatile struct rs_sge *target_sgl;
- struct rs_iomap *target_iomap;
-
uint32_t rbuf_size;
- struct ibv_mr *rmr;
+ struct ibv_mr *rmr;
uint8_t *rbuf;
uint32_t sbuf_size;
- struct ibv_mr *smr;
+ struct ibv_mr *smr;
struct ibv_sge ssgl[2];
uint8_t *sbuf;
};
+#define DS_UDP_TAG 0x5555555555555555ULL
+
+struct ds_udp_header {
+ uint64_t tag;
+ uint8_t version;
+ uint8_t sl;
+ uint16_t slid;
+ uint32_t qpn; /* upper 8-bits reserved */
+};
+
+
+static int rs_svc_run(void *arg)
+{
+ return 0;
+}
+
+static int rs_svc_insert(struct rsocket *rs)
+{
+ struct rs_svc_msg msg;
+ int ret;
+
+ pthread_mutex_lock(&mut);
+ if (!svc_cnt) {
+ ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds);
+ if (ret)
+ goto out;
+
+ ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
+ if (ret) {
+ close(svc_fds[0]);
+ close(svc_fds[1]);
+ ret = ERR(ret);
+ goto out;
+ }
+ }
+
+ msg.op = RS_SVC_INSERT;
+ msg.status = EINVAL;
+ msg.rs = rs;
+ svc_cnt++;
+ write(svc_fds[0], &msg, sizeof msg);
+ read(svc_fds[0], &msg, sizeof msg);
+ ret = ERR(msg.status);
+out:
+ pthread_mutex_unlock(&mut);
+ return ret;
+}
+
+static int rs_svc_remove(struct rsocket *rs)
+{
+ struct rs_svc_msg msg;
+ int ret;
+
+ pthread_mutex_lock(&mut);
+ msg.op = RS_SVC_REMOVE;
+ msg.status = EINVAL;
+ msg.rs = rs;
+ write(svc_fds[0], &msg, sizeof msg);
+ read(svc_fds[0], &msg, sizeof msg);
+ ret = ERR(msg.status);
+ if (!ret && !--svn_cnt)
+ pthread_join(svc_id, NULL);
+
+ pthread_mutex_unlock(&mut);
+ return ret;
+}
+
static int rs_value_to_scale(int value, int bits)
{
return value <= (1 << (bits - 1)) ?
pthread_mutex_unlock(&mut);
}
-static int rs_insert(struct rsocket *rs)
+static int rs_insert(struct rsocket *rs, index)
{
pthread_mutex_lock(&mut);
- rs->index = idm_set(&idm, rs->cm_id->channel->fd, rs);
+ rs->index = idm_set(&idm, index, rs);
pthread_mutex_unlock(&mut);
return rs->index;
}
pthread_mutex_unlock(&mut);
}
-static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
{
struct rsocket *rs;
if (!rs)
return NULL;
+ rs->type = type;
rs->index = -1;
+ rs->dsock = -1;
if (inherited_rs) {
rs->sbuf_size = inherited_rs->sbuf_size;
rs->rbuf_size = inherited_rs->rbuf_size;
fastlock_init(&rs->rlock);
fastlock_init(&rs->cq_lock);
fastlock_init(&rs->cq_wait_lock);
- fastlock_init(&rs->iomap_lock);
+ fastlock_init(&rs->map_lock);
dlist_init(&rs->iomap_list);
dlist_init(&rs->iomap_queue);
return rs;
}
+/*
+ * TODO: Support datagram rsockets
+ */
static int rs_set_nonblocking(struct rsocket *rs, long arg)
{
int ret = 0;
return 0;
}
-static int rs_create_cq(struct rsocket *rs)
+static int ds_init_bufs(struct ds_qp *qp)
{
- rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
- if (!rs->cm_id->recv_cq_channel)
+ qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
+ if (!qp->rbuf)
+ return ERR(ENOMEM);
+
+ qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
+ if (!qp->smr)
+ return -1;
+
+ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
+ if (!qp->rmr)
return -1;
- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
- rs->cm_id, rs->cm_id->recv_cq_channel, 0);
- if (!rs->cm_id->recv_cq)
+ return 0;
+}
+
+static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
+{
+ cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
+ if (!cm_id->recv_cq_channel)
+ return -1;
+
+ cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
+ cm_id, cm_id->recv_cq_channel, 0);
+ if (!cm_id->recv_cq)
goto err1;
if (rs->fd_flags & O_NONBLOCK) {
goto err2;
}
- rs->cm_id->send_cq_channel = rs->cm_id->recv_cq_channel;
- rs->cm_id->send_cq = rs->cm_id->recv_cq;
+ cm_id->send_cq_channel = cm_id->recv_cq_channel;
+ cm_id->send_cq = cm_id->recv_cq;
return 0;
err2:
- ibv_destroy_cq(rs->cm_id->recv_cq);
- rs->cm_id->recv_cq = NULL;
+ ibv_destroy_cq(cm_id->recv_cq);
+ cm_id->recv_cq = NULL;
err1:
- ibv_destroy_comp_channel(rs->cm_id->recv_cq_channel);
- rs->cm_id->recv_cq_channel = NULL;
+ ibv_destroy_comp_channel(cm_id->recv_cq_channel);
+ cm_id->recv_cq_channel = NULL;
return -1;
}
-static inline int
-rs_post_recv(struct rsocket *rs)
+static inline int rs_post_recv(struct rsocket *rs)
{
struct ibv_recv_wr wr, *bad;
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
+static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
+{
+ struct ibv_recv_wr wr, *bad;
+ struct ibv_sge sge;
+
+ sge.addr = (uintptr_t) buf;
+ sge.length = RS_SNDLOWAT;
+ sge.lkey = qp->rmr;
+
+ wr.wr_id = RS_RECV_WR_ID;
+ wr.next = NULL;
+ wr.sg_list = &sge;
+ wr.num_sge = 1;
+
+ return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
+}
+
static int rs_create_ep(struct rsocket *rs)
{
struct ibv_qp_init_attr qp_attr;
if (ret)
return ret;
- ret = rs_create_cq(rs);
+ ret = rs_create_cq(rs, rs->cm_id);
if (ret)
return ret;
}
}
+static void ds_free_qp(struct ds_qp *qp)
+{
+ if (qp->smr)
+ rdma_dereg_mr(qp->smr);
+
+ if (qp->rbuf) {
+ if (qp->rmr)
+ rdma_dereg_mr(qp->rmr);
+ free(qp->rbuf);
+ }
+
+ if (qp->cm_id) {
+ if (qp->cm_id->qp) {
+ dlist_remove(&qp->list);
+ rdma_destroy_qp(qp->cm_id);
+ }
+ rdma_destroy_id(qp->cm_id);
+ }
+ free(qp);
+}
+
+static void ds_free_qps(struct rsocket *rs)
+{
+ struct ds_qp *qp;
+ dlist_t *entry;
+
+ while (!dlist_empty(&rs->qp_list)) {
+ qp = container_of(rs->qp_list.next, struct ds_qp, list);
+ ds_free_qp(qp);
+ }
+}
+
+static void ds_free(struct rsocket *rs)
+{
+ if (rs->state & (rs_readable | rs_writable))
+ rs_svc_remove(rs);
+
+ if (rs->dsock >= 0)
+ close(rs->dsock);
+
+ if (rs->index >= 0)
+ rs_remove(rs);
+
+ ds_free_qps(rs);
+ if (rs->fds)
+ free(rs->fds);
+
+ if (rs->sbuf)
+ free(rs->sbuf);
+
+ fastlock_destroy(&rs->map_lock);
+ fastlock_destroy(&rs->cq_wait_lock);
+ fastlock_destroy(&rs->cq_lock);
+ fastlock_destroy(&rs->rlock);
+ fastlock_destroy(&rs->slock);
+ free(rs);
+}
+
static void rs_free(struct rsocket *rs)
{
+ if (rs->type == SOCK_DGRAM) {
+ ds_free(rs);
+ return;
+ }
+
if (rs->index >= 0)
rs_remove(rs);
rdma_destroy_id(rs->cm_id);
}
- fastlock_destroy(&rs->iomap_lock);
+ fastlock_destroy(&rs->map_lock);
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
rs->sseq_comp = ntohs(conn->credits);
}
+static int ds_init(struct rsocket *rs, int domain)
+{
+ rs->dsock = socket(domain, SOCK_DGRAM, 0);
+ if (rs->dsock < 0)
+ return rs->dsock;
+
+ rs->fds = calloc(1, sizeof *fds);
+ if (!rs->fds)
+ return ERR(ENOMEM);
+ rs->nfds = 1;
+ dlist_init(&rs->qp_list);
+
+ return 0;
+}
+
int rsocket(int domain, int type, int protocol)
{
struct rsocket *rs;
- int ret;
+ int index, ret;
if ((domain != PF_INET && domain != PF_INET6) ||
- (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP))
+ ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) ||
+ (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) ||
+ (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP))
return ERR(ENOTSUP);
rs_configure();
- rs = rs_alloc(NULL);
+ rs = rs_alloc(NULL, type);
if (!rs)
return ERR(ENOMEM);
- ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
- if (ret)
- goto err;
+ if (type == SOCK_STREAM) {
+ ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
+ if (ret)
+ goto err;
+
+ rs->cm_id->route.addr.src_addr.sa_family = domain;
+ index = rs->cm_id->channel->fd;
+ } else {
+ ret = ds_init(rs, domain);
+ if (ret)
+ goto err;
+
+ index = rs->dsock;
+ }
- ret = rs_insert(rs);
+ ret = rs_insert(rs, index);
if (ret < 0)
goto err;
- rs->cm_id->route.addr.src_addr.sa_family = domain;
return rs->index;
err:
int ret;
rs = idm_at(&idm, socket);
- ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
- if (!ret)
- rs->state = rs_bound;
+ if (rs->type == SOCK_STREAM) {
+ ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
+ if (!ret)
+ rs->state = rs_bound;
+ } else {
+ ret = bind(rs->dsock, addr, addrlen);
+ if (!ret) {
+ ret = rs_svc_insert(rs);
+ if (!ret)
+ rs->state = rs_readable | rs_writable;
+ }
+ }
return ret;
}
int ret;
rs = idm_at(&idm, socket);
- new_rs = rs_alloc(rs);
+ new_rs = rs_alloc(rs, rs->type);
if (!new_rs)
return ERR(ENOMEM);
if (ret)
goto err;
- ret = rs_insert(new_rs);
+ ret = rs_insert(new_rs, rs->cm_id->channel->fd);
if (ret < 0)
goto err;
return ret;
}
+static int ds_init_ep(struct rsocket *rs)
+{
+ int ret;
+
+ rs_set_qp_size(rs);
+ if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
+ rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
+ else
+ rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
+
+ rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
+ if (!rs->sbuf)
+ return ERR(ENOMEM);
+
+ rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
+ rs->sbuf_bytes_avail = rs->sbuf_size;
+ rs->sqe_avail = rs->sq_size;
+
+ ret = rs_svc_insert(rs);
+ if (ret)
+ return ret;
+
+ rs->state = rs_readable | rs_writable;
+ return 0;
+}
+
+static int ds_compare_addr(const void *dst1, const void *dst2)
+{
+ const struct sockaddr *sa1, *sa2;
+ size_t len;
+
+ sa1 = (const struct sockaddr *) dst1;
+ sa2 = (const struct sockaddr *) dst2;
+
+ len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
+ sizeof(struct sockaddr_in6) : sizeof(struct sockaddr);
+ return memcmp(dst1, dst2, len);
+}
+
+static int rs_any_addr(const union socket_addr *addr)
+{
+ if (addr->sa.sa_family == AF_INET) {
+ return (addr->sin.sin_addr == INADDR_ANY ||
+ addr->sin.sin_addr == INADDR_LOOPBACK);
+ } else {
+ return (addr->sin6.sin6_addr == in6addr_any ||
+ addr->sin6.sin6_addr == in6addr_loopback);
+ }
+}
+
+static int ds_get_src_addr(struct rsocket *rs,
+ const struct sockaddr *dest_addr, socklen_t dest_len,
+ union socket_addr *src_addr, socklen_t *src_len)
+{
+ int sock, ret;
+ uint16_t port;
+
+ *src_len = sizeof src_addr;
+ ret = getsockname(rs->dsock, &src_addr->sa, src_len);
+ if (ret || !rs_any_addr(src_addr))
+ return ret;
+
+ port = src_addr->sin.sin_port;
+ sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
+ if (sock < 0)
+ return sock;
+
+ ret = connect(sock, dest_addr, dest_len);
+ if (ret)
+ goto out;
+
+ *src_len = sizeof src_addr;
+ ret = getsockname(sock, &src_addr->sa, src_len);
+ src_addr->sin.sin_port = port;
+out:
+ close(sock);
+ return ret;
+}
+
+static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
+ socklen_t addrlen, struct ds_qp **qp)
+{
+ struct ibv_qp_init_attr qp_attr;
+ int ret;
+
+ *qp = calloc(1, sizeof(struct ds_qp));
+ if (!*qp)
+ return ERR(ENOMEM);
+
+ (*qp)->rs = rs;
+ ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
+ if (ret)
+ goto err;
+
+ ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
+ if (ret)
+ goto err;
+
+ ret = ds_init_bufs(*qp);
+ if (ret)
+ goto err;
+
+ ret = rs_create_cq(rs, (*qp)->cm_id);
+ if (ret)
+ goto err;
+
+ memset(&qp_attr, 0, sizeof qp_attr);
+ qp_attr.qp_context = qp;
+ qp_attr.send_cq = rs->cm_id->send_cq;
+ qp_attr.recv_cq = rs->cm_id->recv_cq;
+ qp_attr.qp_type = IBV_QPT_UD;
+ qp_attr.sq_sig_all = 1;
+ qp_attr.cap.max_send_wr = rs->sq_size;
+ qp_attr.cap.max_recv_wr = rs->rq_size;
+ qp_attr.cap.max_send_sge = 2;
+ qp_attr.cap.max_recv_sge = 1;
+ qp_attr.cap.max_inline_data = rs->sq_inline;
+
+ ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
+ if (ret)
+ return ret;
+
+ for (i = 0; i < rs->rq_size; i++) {
+ ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
+ if (ret)
+ goto err;
+ }
+ list_insert_head(&(*qp)->list, &rs->qp_list);
+ return 0;
+err:
+ ds_free_qp(*qp);
+ return ret;
+}
+
+static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
+ socklen_t addrlen, struct ds_qp **qp)
+{
+ dlist_t *entry;
+
+ for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
+ *qp = container_of(entry, struct ds_qp, list);
+ if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
+ return 0;
+ }
+
+ return ds_create_qp(rs, src_addr, addrlen, qp);
+}
+
+static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
+ socklen_t addrlen, struct ds_dest **dest)
+{
+ union socket_addr src_addr;
+ socklen_t src_len;
+ struct ds_qp *qp;
+ int ret = 0;
+
+ fastlock_acquire(&rs->map_lock);
+ dest = tfind(addr, &rs->dest_map, ds_compare_addr);
+ if (dest)
+ goto out;
+
+ if (rs->state == rs_init) {
+ ret = ds_init_ep(rs);
+ if (ret)
+ goto out;
+ }
+
+ ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
+ if (ret)
+ goto out;
+
+ ret = ds_get_qp(rs, src_addr, src_len, &qp);
+ if (ret)
+ goto out;
+
+ *dest = calloc(1, sizeof(struct ds_dest));
+ if (!*dest) {
+ ret = ERR(ENOMEM);
+ goto out;
+ }
+
+ memcpy(&(*dest)->addr, addr, addrlen);
+ (*dest)->qp = qp;
+ tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
+out:
+ fastlock_release(&rs->map_lock);
+ return ret;
+}
+
int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
{
struct rsocket *rs;
+ int ret;
rs = idm_at(&idm, socket);
- memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
- return rs_do_connect(rs);
+ if (rs->type == SOCK_STREAM) {
+ memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
+ ret = rs_do_connect(rs);
+ } else {
+ fastlock_acquire(&rs->slock);
+ ret = connect(rs->dsock, addr, addrlen);
+ if (!ret)
+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+ fastlock_release(&rs->slock);
+ }
+ return ret;
}
static int rs_post_write_msg(struct rsocket *rs,
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
+static int ds_post_send(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint64_t wr_id, int flags)
+{
+ struct ibv_send_wr wr, *bad;
+
+ wr.wr_id = wr_id;
+ wr.next = NULL;
+ wr.sg_list = sgl;
+ wr.num_sge = nsge;
+ wr.opcode = IBV_WR_SEND;
+ wr.send_flags = flags;
+ wr.wr.ud.ah = rs->conn_dest->ah;
+ wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
+ wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
+
+ return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
+}
+
/*
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
flags, addr, rkey);
}
+static int ds_send_data(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint32_t length, int flags)
+{
+ rs->sqe_avail--;
+ rs->sbuf_bytes_avail -= length;
+ return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
+}
+
static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
{
rs->state = rs_disconnected;
return 0;
} else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
- rs->state &= ~rs_connect_rd;
+ rs->state &= ~rs_readable;
}
break;
case RS_OP_WRITE:
(rs->target_sgl[rs->target_sge].length != 0);
}
+static int ds_can_send(struct rsocket *rs)
+{
+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
+}
+
static int rs_conn_can_send(struct rsocket *rs)
{
- return rs_can_send(rs) || !(rs->state & rs_connect_wr);
+ return rs_can_send(rs) || !(rs->state & rs_writable);
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
- return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
+ return rs_have_rdata(rs) || !(rs->state & rs_readable);
}
static int rs_conn_all_sends_done(struct rsocket *rs)
rs->rbuf_bytes_avail += rsize;
}
- } while (left && (flags & MSG_WAITALL) && (rs->state & rs_connect_rd));
+ } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable));
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
struct rs_iomap iom;
int ret;
- fastlock_acquire(&rs->iomap_lock);
+ fastlock_acquire(&rs->map_lock);
while (!dlist_empty(&rs->iomap_queue)) {
if (!rs_can_send(rs)) {
ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
rs_conn_can_send);
if (ret)
break;
- if (!(rs->state & rs_connect_wr)) {
+ if (!(rs->state & rs_writable)) {
ret = ERR(ECONNRESET);
break;
}
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
- fastlock_release(&rs->iomap_lock);
+ fastlock_release(&rs->map_lock);
return ret;
}
+static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
+ int iovcnt, int flags)
+{
+ struct ds_udp_header hdr;
+ struct msghdr msg;
+ struct iovec miov[4];
+ struct ds_qp *qp;
+
+ if (iovcnt > 4)
+ return ERR(ENOTSUP);
+
+ qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
+ hdr.tag = htonll(DS_UDP_TAG);
+ hdr.version = 1;
+ if (qp) {
+ hdr.sl = qp->sl;
+ hdr.slid = htons(qp->lid);
+ hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
+ } else {
+ hdr.sl = 0;
+ hdr.slid = 0;
+ hdr.qpn = 0;
+ }
+
+ miov[0].iov_base = &hdr;
+ miov[0].iov_len = sizeof hdr;
+ memcpy(&miov[1], iov, sizeof *iov * iovcnt);
+
+ memset(&msg, 0, sizeof msg);
+ /* TODO: specify name if needed */
+ msg.msg_iov = miov;
+ msg.msg_iovlen = iovcnt + 1;
+ return sendmsg(rs->fd, msg, flags);
+}
+
+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags)
+{
+ struct iovec iov;
+ iov.iov_base = buf;
+ iov_iov_len = len;
+ return ds_sendv_udp(s, &iov, 1, flags);
+}
+
+static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
+{
+ struct ibv_sge sge;
+ int ret = 0;
+
+ if (!rs->conn_dest || !rs->conn_dest->ah)
+ return ds_send_udp(rs, buf, len, flags);
+
+ rs->sbytes_needed = len;
+ if (!ds_can_send(rs)) {
+ ret = rs_get_comp(rs, 1, ds_can_send);
+ if (ret)
+ return ds_send_udp(rs, buf, len, flags);
+ }
+
+ if (len <= rs->sq_inline) {
+ sge.addr = (uintptr_t) buf;
+ sge.length = len;
+ sge.lkey = 0;
+ ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE);
+ } else if (len <= rs_sbuf_left(rs)) {
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len);
+ rs->ssgl[0].length = len;
+ ret = ds_send_data(rs, rs->ssgl, 1, len, 0);
+ if (len < rs_sbuf_left(rs))
+ rs->ssgl[0].addr += len;
+ else
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+ } else {
+ rs->ssgl[0].length = rs_sbuf_left(rs);
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
+ rs->ssgl[0].length);
+ rs->ssgl[1].length = len - rs->ssgl[0].length;
+ memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+ ret = ds_send_data(rs, rs->ssgl, 2, len, 0);
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+ }
+
+ return ret ? ret : len;
+}
+
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
int ret = 0;
rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_DGRAM) {
+ fastlock_acquire(&rs->slock);
+ ret = dsend(rs, buf, len, flags);
+ fastlock_release(&rs->slock);
+ return ret;
+ }
+
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
rs_conn_can_send);
if (ret)
break;
- if (!(rs->state & rs_connect_wr)) {
+ if (!(rs->state & rs_writable)) {
ret = ERR(ECONNRESET);
break;
}
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
- if (dest_addr || addrlen)
- return ERR(EISCONN);
+ struct rsocket *rs;
+
+ rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_STREAM) {
+ if (dest_addr || addrlen)
+ return ERR(EISCONN);
- return rsend(socket, buf, len, flags);
+ return rsend(socket, buf, len, flags);
+ }
+
+ fastlock_acquire(&rs->slock);
+ ds_connect(rs, dest_addr, addrlen);
+ ret = dsend(rs, buf, len, flags);
+ fastlock_release(&rs->slock);
+ return ret;
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
rs_conn_can_send);
if (ret)
break;
- if (!(rs->state & rs_connect_wr)) {
+ if (!(rs->state & rs_writable)) {
ret = ERR(ECONNRESET);
break;
}
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
- return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
+ return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
- rs->state &= ~rs_connect_rd;
+ rs->state &= ~rs_readable;
return 0;
}
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
- rs->state &= ~(rs_connect_rd | rs_connect_wr);
+ rs->state &= ~(rs_readable | rs_writable);
} else {
- rs->state &= ~rs_connect_wr;
- ctrl = (rs->state & rs_connect_rd) ?
+ rs->state &= ~rs_writable;
+ ctrl = (rs->state & rs_readable) ?
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
struct rsocket *rs;
rs = idm_at(&idm, socket);
- rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
- return 0;
+ if (rs->type == SOCK_STREAM) {
+ rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
+ return 0;
+ } else {
+ return getpeername(rs->fs, addr, addrlen);
+ }
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
- rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
- return 0;
+ if (rs->type == SOCK_STREAM) {
+ rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
+ return 0;
+ } else {
+ return getsockname(rs->fd, addr, addrlen);
+ }
}
int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
+ ret = setsockopt(rs->fd, optname, optval, optlen);
+ if (ret)
+ return ret;
+ }
+
switch (level) {
case SOL_SOCKET:
opts = &rs->so_opts;
int ret = 0;
rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
+ return getsockopt(rs->fd, level, optname, optval, optlen);
+
switch (level) {
case SOL_SOCKET:
switch (optname) {
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
- fastlock_acquire(&rs->iomap_lock);
+ fastlock_acquire(&rs->map_lock);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
- fastlock_release(&rs->iomap_lock);
+ fastlock_release(&rs->map_lock);
return offset;
}
int ret = 0;
rs = idm_at(&idm, socket);
- fastlock_acquire(&rs->iomap_lock);
+ fastlock_acquire(&rs->map_lock);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
}
ret = ERR(EINVAL);
out:
- fastlock_release(&rs->iomap_lock);
+ fastlock_release(&rs->map_lock);
return ret;
}
rs_conn_can_send);
if (ret)
break;
- if (!(rs->state & rs_connect_wr)) {
+ if (!(rs->state & rs_writable)) {
ret = ERR(ECONNRESET);
break;
}
return (ret && left == count) ? ret : count - left;
}
+
+ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
+ struct sockaddr *src_addr, socklen_t *addrlen)
+{
+ int ret;
+
+ ret = rrecv(socket, buf, len, flags);
+ if (ret > 0 && src_addr)
+ rgetpeername(socket, src_addr, addrlen);
+
+ return ret;
+}
+
+ssize_t usendto(int socket, const void *buf, size_t len, int flags,
+ const struct sockaddr *dest_addr, socklen_t addrlen)
+{
+ if (dest_addr || addrlen)
+ return ERR(EISCONN);
+
+ return usend(socket, buf, len, flags);
+}