#include <string.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
+#include <sys/epoll.h>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
#define rs_msg_op(imm_data) (imm_data >> 29)
#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+#define RS_RECV_WR_ID (~((uint64_t) 0))
+
+#define DS_WR_RECV 0xFFFFFFFF
+#define ds_send_wr_id(offset, length) (((uint64_t) (offset)) << 32 | (uint64_t) length)
+#define ds_recv_wr_id(offset) (((uint64_t) (offset)) << 32 | (uint64_t) DS_WR_RECV)
+#define ds_wr_offset(wr_id) ((uint32_t) (wr_id >> 32))
+#define ds_wr_length(wr_id) ((uint32_t) wr_id)
+#define ds_wr_is_recv(wr_id) (ds_wr_length(wr_id) == DS_WR_RECV)
enum {
RS_CTRL_DISCONNECT,
struct ds_qp;
-struct ds_msg {
- struct ds_qp *qp;
+struct ds_rmsg {
+ struct ds_qp *qp;
+ uint32_t offset;
+ uint32_t length;
+};
+
+struct ds_smsg {
+ struct ds_smsg *next;
};
struct rs_sge {
struct rs_sge data_buf;
};
-#define RS_RECV_WR_ID (~((uint64_t) 0))
-
/*
* rsocket states are ordered as passive, connecting, connected, disconnected.
*/
struct rsocket *rs;
struct rdma_cm_id *cm_id;
+ struct ibv_mr *smr;
struct ibv_mr *rmr;
uint8_t *rbuf;
- struct ibv_mr *smr;
- uint16_t lid;
- uint8_t sl;
+ int cq_armed;
};
struct ds_dest {
/* data stream */
struct {
struct rdma_cm_id *cm_id;
+ uint64_t tcp_opts;
+
+ int ctrl_avail;
+ uint16_t sseq_no;
+ uint16_t sseq_comp;
+ uint16_t rseq_no;
+ uint16_t rseq_comp;
int remote_sge;
struct rs_sge remote_sgl;
void *target_buffer_list;
volatile struct rs_sge *target_sgl;
struct rs_iomap *target_iomap;
+
+ int rbuf_bytes_avail;
+ int rbuf_free_offset;
+ int rbuf_offset;
+ struct ibv_mr *rmr;
+ uint8_t *rbuf;
+
+ int sbuf_bytes_avail;
+ struct ibv_mr *smr;
+ struct ibv_sge ssgl[2];
};
/* datagram */
struct {
- dlist_t qp_list;
+ struct ds_qp *qp_list;
void *dest_map;
struct ds_dest *conn_dest;
- struct pollfd *fds;
- nfds_t nfds;
- int dsock;
- int sbytes_needed;
+
+ int udp_sock;
+ int epfd;
+ int rqe_avail;
+ struct ds_smsg *smsg_free;
};
};
int opts;
long fd_flags;
uint64_t so_opts;
- uint64_t tcp_opts;
uint64_t ipv6_opts;
int state;
int cq_armed;
int retries;
int err;
- int ctrl_avail;
int sqe_avail;
- int sbuf_bytes_avail;
- uint16_t sseq_no;
- uint16_t sseq_comp;
+ uint32_t sbuf_size;
uint16_t sq_size;
uint16_t sq_inline;
+ uint32_t rbuf_size;
uint16_t rq_size;
- uint16_t rseq_no;
- uint16_t rseq_comp;
- int rbuf_bytes_avail;
- int rbuf_free_offset;
- int rbuf_offset;
int rmsg_head;
int rmsg_tail;
union {
struct rs_msg *rmsg;
- struct ds_msg *dmsg;
+ struct ds_rmsg *dmsg;
};
+ uint8_t *sbuf;
struct rs_iomap_mr *remote_iomappings;
dlist_entry iomap_list;
dlist_entry iomap_queue;
int iomap_pending;
-
- uint32_t rbuf_size;
- struct ibv_mr *rmr;
- uint8_t *rbuf;
-
- uint32_t sbuf_size;
- struct ibv_mr *smr;
- struct ibv_sge ssgl[2];
- uint8_t *sbuf;
};
#define DS_UDP_TAG 0x5555555555555555ULL
struct ds_udp_header {
uint64_t tag;
uint8_t version;
- uint8_t sl;
- uint16_t slid;
+ uint8_t reserved[3];
uint32_t qpn; /* upper 8-bits reserved */
};
+#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
+
+static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
+{
+ if (!rs->qp_list)
+ list_init(&qp->list);
+ else
+ list_insert_head(&qp->list, &rs->qp_list->list);
+ rs->qp_list = *qp;
+}
+
+static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
+{
+ if (qp->list.next != qp->list) {
+ rs->qp_list = ds_next_qp(qp);
+ dlist_remove(&qp->list);
+ } else {
+ rs->qp_list = NULL;
+ }
+}
+
static int rs_svc_run(void *arg)
{
return 0;
rs->type = type;
rs->index = -1;
- rs->dsock = -1;
+ rs->udp_sock = -1;
+ rs->epfd = -1;
+
if (inherited_rs) {
rs->sbuf_size = inherited_rs->sbuf_size;
rs->rbuf_size = inherited_rs->rbuf_size;
return rs;
}
-/*
- * TODO: Support datagram rsockets
- */
static int rs_set_nonblocking(struct rsocket *rs, long arg)
{
+ struct ds_qp *qp;
int ret = 0;
- if (rs->cm_id->recv_cq_channel)
- ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
+ if (rs->type == SOCK_STREAM) {
+ if (rs->cm_id->recv_cq_channel)
+ ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
- if (!ret && rs->state < rs_connected)
- ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
+ if (!ret && rs->state < rs_connected)
+ ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
+ } else {
+ ret = fcntl(rs->epfd, F_SETFL, arg);
+
+ if (!ret && rs->qp_list) {
+ qp = rs->qp_list;
+ do {
+ ret = fcntl(qp->cm_id->recv_cq_channel->fd,
+ F_SETFL, arg);
+ qp = ds_next_qp(qp);
+ } while (qp != rs->qp_list && !ret);
+ }
+ }
return ret;
}
rs->rq_size = 2;
}
+static void ds_set_qp_size(struct rsocket *rs)
+{
+ uint16_t max_size;
+
+ max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
+
+ if (rs->sq_size > max_size)
+ rs->sq_size = max_size;
+ if (rs->rq_size > max_size)
+ rs->rq_size = max_size;
+
+ if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
+ rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
+ else
+ rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
+
+ if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT))
+ rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
+ else
+ rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
+}
+
static int rs_init_bufs(struct rsocket *rs)
{
size_t len;
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (!rs->rmsg)
- return -1;
+ return ERR(ENOEMEM);
rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
if (!rs->sbuf)
- return -1;
+ return ERR(ENOEMEM);
rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
if (!rs->smr)
sizeof(*rs->target_iomap) * rs->target_iomap_size;
rs->target_buffer_list = malloc(len);
if (!rs->target_buffer_list)
- return -1;
+ return ERR(ENOEMEM);
rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
if (!rs->target_mr)
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
- return -1;
+ return ERR(ENOEMEM);
rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
if (!rs->rmr)
sge.length = RS_SNDLOWAT;
sge.lkey = qp->rmr;
- wr.wr_id = RS_RECV_WR_ID;
+ wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf));
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
if (qp->cm_id) {
if (qp->cm_id->qp) {
- dlist_remove(&qp->list);
+ epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
+ qp->cm_id->recv_cq_channel->fd, NULL);
rdma_destroy_qp(qp->cm_id);
}
rdma_destroy_id(qp->cm_id);
free(qp);
}
-static void ds_free_qps(struct rsocket *rs)
+static void ds_free(struct rsocket *rs)
{
struct ds_qp *qp;
- dlist_t *entry;
- while (!dlist_empty(&rs->qp_list)) {
- qp = container_of(rs->qp_list.next, struct ds_qp, list);
- ds_free_qp(qp);
- }
-}
-
-static void ds_free(struct rsocket *rs)
-{
if (rs->state & (rs_readable | rs_writable))
rs_svc_remove(rs);
- if (rs->dsock >= 0)
- close(rs->dsock);
+ if (rs->udp_sock >= 0)
+ close(rs->udp_sock);
if (rs->index >= 0)
rs_remove(rs);
- ds_free_qps(rs);
- if (rs->fds)
- free(rs->fds);
+ if (rs->dmsg)
+ free(rs->dmsg);
+
+ if (rs->smsg)
+ free(rs->smsg);
+
+ while (rs->qp_list) {
+ ds_remove_qp(rs, rs->qp_list);
+ ds_free_qp(rs->qp_list);
+ }
+
+ if (rs->epfd >= 0)
+ close(rs->epfd);
if (rs->sbuf)
free(rs->sbuf);
static int ds_init(struct rsocket *rs, int domain)
{
- rs->dsock = socket(domain, SOCK_DGRAM, 0);
- if (rs->dsock < 0)
- return rs->dsock;
+ rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
+ if (rs->udp_sock < 0)
+ return rs->udp_sock;
- rs->fds = calloc(1, sizeof *fds);
- if (!rs->fds)
- return ERR(ENOMEM);
- rs->nfds = 1;
- dlist_init(&rs->qp_list);
+ rs->epfd = epoll_create(0);
+ if (rs->epfd < 0)
+ return rs->epfd;
return 0;
}
if (ret)
goto err;
- index = rs->dsock;
+ index = rs->udp_sock;
}
ret = rs_insert(rs, index);
if (!ret)
rs->state = rs_bound;
} else {
- ret = bind(rs->dsock, addr, addrlen);
+ ret = bind(rs->udp_sock, addr, addrlen);
if (!ret) {
ret = rs_svc_insert(rs);
if (!ret)
static int ds_init_ep(struct rsocket *rs)
{
- int ret;
+ struct ds_smsg *msg;
+ int i, ret;
- rs_set_qp_size(rs);
- if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
- rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
- else
- rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
+ ds_set_qp_size(rs);
- rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
+ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
if (!rs->sbuf)
return ERR(ENOMEM);
- rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
+ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
+ if (!rs->dmsg)
+ return ERR(ENOEMEM);
+
rs->sbuf_bytes_avail = rs->sbuf_size;
rs->sqe_avail = rs->sq_size;
+ rs->rqe_avail = rs->rq_size;
+
+ rs->smsg_free = (struct ds_smsg *) rs->sbuf;
+ msg = rs->smsg_free;
+ for (i = 0; i < rs->sq_size - 1; i++) {
+ msg->next = (void *) msg + i * RS_SNDLOWAT;
+ msg = msg->next;
+ }
+ msg->next = NULL;
ret = rs_svc_insert(rs);
if (ret)
uint16_t port;
*src_len = sizeof src_addr;
- ret = getsockname(rs->dsock, &src_addr->sa, src_len);
+ ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
if (ret || !rs_any_addr(src_addr))
return ret;
socklen_t addrlen, struct ds_qp **qp)
{
struct ibv_qp_init_attr qp_attr;
+ struct epoll_event event;
int ret;
*qp = calloc(1, sizeof(struct ds_qp));
qp_attr.cap.max_send_sge = 2;
qp_attr.cap.max_recv_sge = 1;
qp_attr.cap.max_inline_data = rs->sq_inline;
-
ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
if (ret)
- return ret;
+ goto err;
+
+ event.events = EPOLLIN | EPOLLOUT;
+ event.data.ptr = *qp;
+ ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
+ (*qp)->cm_id->recv_cq_channel->fd, &event);
+ if (ret)
+ goto err;
for (i = 0; i < rs->rq_size; i++) {
ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
if (ret)
goto err;
}
- list_insert_head(&(*qp)->list, &rs->qp_list);
+
+ ds_insert_qp(rs, *qp);
return 0;
err:
ds_free_qp(*qp);
static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
socklen_t addrlen, struct ds_qp **qp)
{
- dlist_t *entry;
+ if (rs->qp_list) {
+ *qp = rs->qp_list;
+ do {
+ if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
+ return 0;
- for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
- *qp = container_of(entry, struct ds_qp, list);
- if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
- return 0;
+ *qp = ds_next_qp(*qp);
+ } while (*qp != rs->qp_list);
}
return ds_create_qp(rs, src_addr, addrlen, qp);
ret = rs_do_connect(rs);
} else {
fastlock_acquire(&rs->slock);
- ret = connect(rs->dsock, addr, addrlen);
+ ret = connect(rs->udp_sock, addr, addrlen);
if (!ret)
ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
fastlock_release(&rs->slock);
struct ibv_sge *sgl, int nsge,
uint32_t length, int flags)
{
+ uint64_t offset;
+
rs->sqe_avail--;
rs->sbuf_bytes_avail -= length;
- return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
+ offset = sgl->addr - (uintptr_t) rs->sbuf;
+ return ds_post_send(rs, sgl, nsge, ds_send_wr_id(offset, length), flags);
}
static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
return ret;
}
+/*
+ * Poll all CQs associated with a datagram rsocket. We need to drop any
+ * received messages that we do not have room to store. To limit drops,
+ * we only poll if we have room to store the receive or we need a send
+ * buffer. To ensure fairness, we poll the CQs round robin, remembering
+ * where we left off.
+ */
+static void ds_poll_cqs(struct rsocket *rs)
+{
+ struct ds_qp *qp;
+ struct ds_smsg *msg;
+ struct ibv_wc wc;
+ int ret, cnt;
+
+ qp = rs->qp_list;
+ do {
+ cnt = 0;
+ do {
+ ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc);
+ if (ret <= 0) {
+ qp = ds_next_qp(qp);
+ continue;
+ }
+
+ if (ds_wr_is_recv(wc.wr_id)) {
+ if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS) {
+ rs->rqe_avail--;
+ rs->dmsg[rs->rmsg_tail].qp = qp;
+ if (++rs->rmsg_tail == rs->rq_size + 1)
+ rs->rmsg_tail = 0;
+ } else {
+ ds_post_recv(rs, qp, qp->rbuf +
+ ds_wr_offset(wc.wr_id));
+ }
+ } else {
+ if (ds_wr_length(wc.wr_id) > rs->sq_inline) {
+ msg = (struct ds_smsg *)
+ (rs->sbuf + ds_wr_offset(wc.wr_id));
+ msg->next = rs->smsg_free;
+ rs->smsg_free = msg;
+ }
+ rs->sqe_avail++;
+ }
+
+ qp = ds_next_qp(qp);
+ if (!rs->rqe_avail && rs->sqe_avail) {
+ rs->qp_list = qp;
+ return;
+ }
+ cnt++;
+ } while (qp != rs->qp_list);
+ } while (cnt);
+}
+
+static void ds_req_notify_cqs(struct rsocket *rs)
+{
+ struct ds_qp *qp;
+
+ qp = rs->qp_list;
+ do {
+ if (!qp->cq_armed) {
+ ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
+ qp->cq_armed = 1;
+ }
+ qp = ds_next_qp(qp);
+ } while (qp != rs->qp_list);
+}
+
+static int ds_get_cq_event(struct rsocket *rs)
+{
+ struct epoll_event event;
+ struct ds_qp *qp;
+ struct ibv_cq *cq;
+ void *context;
+ int ret;
+
+ if (!rs->cq_armed)
+ return 0;
+
+ ret = epoll_wait(rs->epfd, &event, 1, -1);
+ if (ret <= 0)
+ return ret;
+
+ qp = event.data.ptr;
+ ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
+ if (!ret) {
+ ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
+ qp->cq_armed = 0;
+ rs->cq_armed = 0;
+ }
+
+ return ret;
+}
+
+static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+ int ret = 0;
+
+ fastlock_acquire(&rs->cq_lock);
+ do {
+ ds_poll_cqs(rs);
+ if (test(rs)) {
+ ret = 0;
+ break;
+ } else if (nonblock) {
+ ret = ERR(EWOULDBLOCK);
+ } else if (!rs->cq_armed) {
+ ds_req_notify_cqs(rs);
+ rs->cq_armed = 1;
+ } else {
+ rs_update_credits(rs);
+ fastlock_acquire(&rs->cq_wait_lock);
+ fastlock_release(&rs->cq_lock);
+
+ ret = ds_get_cq_event(rs);
+ fastlock_release(&rs->cq_wait_lock);
+ fastlock_acquire(&rs->cq_lock);
+ }
+ } while (!ret);
+
+ fastlock_release(&rs->cq_lock);
+ return ret;
+}
+
+static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+ struct timeval s, e;
+ uint32_t poll_time = 0;
+ int ret;
+
+ do {
+ ret = ds_process_cqs(rs, 1, test);
+ if (!ret || nonblock || errno != EWOULDBLOCK)
+ return ret;
+
+ if (!poll_time)
+ gettimeofday(&s, NULL);
+
+ gettimeofday(&e, NULL);
+ poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
+ (e.tv_usec - s.tv_usec) + 1;
+ } while (poll_time <= polling_time);
+
+ ret = ds_process_cqs(rs, 0, test);
+ return ret;
+}
+
static int rs_nonblocking(struct rsocket *rs, int flags)
{
return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
static int ds_can_send(struct rsocket *rs)
{
- return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT);
}
static int rs_conn_can_send(struct rsocket *rs)
{
struct ds_udp_header hdr;
struct msghdr msg;
- struct iovec miov[4];
+ struct iovec miov[8];
struct ds_qp *qp;
- if (iovcnt > 4)
+ if (iovcnt > 8)
return ERR(ENOTSUP);
- qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
hdr.tag = htonll(DS_UDP_TAG);
hdr.version = 1;
- if (qp) {
- hdr.sl = qp->sl;
- hdr.slid = htons(qp->lid);
- hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
- } else {
- hdr.sl = 0;
- hdr.slid = 0;
- hdr.qpn = 0;
- }
+ memset(&hdr->reserved, 0, sizeof hdr->reserved);
+ hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
miov[0].iov_base = &hdr;
miov[0].iov_len = sizeof hdr;
memcpy(&miov[1], iov, sizeof *iov * iovcnt);
memset(&msg, 0, sizeof msg);
- /* TODO: specify name if needed */
msg.msg_iov = miov;
msg.msg_iovlen = iovcnt + 1;
return sendmsg(rs->fd, msg, flags);
struct ibv_sge sge;
int ret = 0;
- if (!rs->conn_dest || !rs->conn_dest->ah)
+ if (!rs->conn_dest->ah)
return ds_send_udp(rs, buf, len, flags);
- rs->sbytes_needed = len;
if (!ds_can_send(rs)) {
- ret = rs_get_comp(rs, 1, ds_can_send);
+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
if (ret)
- return ds_send_udp(rs, buf, len, flags);
+ return ret;
}
if (len <= rs->sq_inline) {
}
fastlock_acquire(&rs->slock);
- ds_connect(rs, dest_addr, addrlen);
+ if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+ if (ret)
+ goto out;
+ }
ret = dsend(rs, buf, len, flags);
+out:
fastlock_release(&rs->slock);
return ret;
}
rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
return 0;
} else {
- return getpeername(rs->fs, addr, addrlen);
+ return getpeername(rs->udp_sock, addr, addrlen);
}
}
rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
return 0;
} else {
- return getsockname(rs->fd, addr, addrlen);
+ return getsockname(rs->udp_sock, addr, addrlen);
}
}
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
- ret = setsockopt(rs->fd, optname, optval, optlen);
+ ret = setsockopt(rs->udp_sock, optname, optval, optlen);
if (ret)
return ret;
}
opts = &rs->so_opts;
switch (optname) {
case SO_REUSEADDR:
- ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
- RDMA_OPTION_ID_REUSEADDR,
- (void *) optval, optlen);
- if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
- rs->cm_id->context &&
- (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
- ret = 0;
+ if (rs->type == SOCK_STREAM) {
+ ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
+ RDMA_OPTION_ID_REUSEADDR,
+ (void *) optval, optlen);
+ if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
+ rs->cm_id->context &&
+ (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
+ ret = 0;
+ }
opt_on = *(int *) optval;
break;
case SO_RCVBUF:
opts = &rs->ipv6_opts;
switch (optname) {
case IPV6_V6ONLY:
- ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
- RDMA_OPTION_ID_AFONLY,
- (void *) optval, optlen);
+ if (rs->type == SOCK_STREAM) {
+ ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
+ RDMA_OPTION_ID_AFONLY,
+ (void *) optval, optlen);
+ }
opt_on = *(int *) optval;
break;
default:
int ret = 0;
rs = idm_at(&idm, socket);
- if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
- return getsockopt(rs->fd, level, optname, optval, optlen);
-
switch (level) {
case SOL_SOCKET:
switch (optname) {
return (ret && left == count) ? ret : count - left;
}
-
-ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
- struct sockaddr *src_addr, socklen_t *addrlen)
-{
- int ret;
-
- ret = rrecv(socket, buf, len, flags);
- if (ret > 0 && src_addr)
- rgetpeername(socket, src_addr, addrlen);
-
- return ret;
-}
-
-ssize_t usendto(int socket, const void *buf, size_t len, int flags,
- const struct sockaddr *dest_addr, socklen_t addrlen)
-{
- if (dest_addr || addrlen)
- return ERR(EISCONN);
-
- return usend(socket, buf, len, flags);
-}