enum {
RS_OP_DATA,
RS_OP_RSVD_DATA_MORE,
- RS_OP_RSVD_DRA,
+ RS_OP_WRITE,
RS_OP_RSVD_DRA_MORE,
RS_OP_SGL,
RS_OP_RSVD,
fclose(f);
/* round to supported values */
- def_iomap_size = (uint8_t) rs_value_to_scale(def_iomap_size, 8);
- def_iomap_size = (uint16_t) rs_scale_to_value(def_iomap_size, 8);
+ def_iomap_size = (uint8_t) rs_value_to_scale(
+ (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8)
}
init = 1;
out:
while (!dlist_empty(&rs->iomap_list)) {
iomr = container_of(rs->iomap_list.next,
struct rs_iomap_mr, entry);
- riounmap(iomr->mr->addr, iomr->mr->length);
+ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
}
while (!dlist_empty(&rs->iomap_queue)) {
iomr = container_of(rs->iomap_queue.next,
struct rs_iomap_mr, entry);
- riounmap(iomr->mr->addr, iomr->mr->length);
+ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
}
}
return rs_do_connect(rs);
}
-static int rs_post_write(struct rsocket *rs,
+static int rs_post_write_msg(struct rsocket *rs,
struct ibv_sge *sgl, int nsge,
uint32_t imm_data, int flags,
uint64_t addr, uint32_t rkey)
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
+static int rs_post_write(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint64_t wr_id, int flags,
+ uint64_t addr, uint32_t rkey)
+{
+ struct ibv_send_wr wr, *bad;
+
+ wr.wr_id = wr_id;
+ wr.next = NULL;
+ wr.sg_list = sgl;
+ wr.num_sge = nsge;
+ wr.opcode = IBV_WR_RDMA_WRITE;
+ wr.send_flags = flags;
+ wr.wr.rdma.remote_addr = addr;
+ wr.wr.rdma.rkey = rkey;
+
+ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+}
+
/*
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
rs->target_sge = 0;
}
- return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
- flags, addr, rkey);
+ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+ flags, addr, rkey);
+}
+
+static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
+ struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
+{
+ uint64_t addr;
+
+ rs->sseq_no++;
+ rs->sqe_avail--;
+ rs->sbuf_bytes_avail -= length;
+
+ addr = iom->sge.addr + offset - iom->offset;
+ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
+ flags, addr, iom->sge.key);
}
static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
- return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
- flags, addr, rs->remote_iomap.key);
+ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
+ flags, addr, rs->remote_iomap.key);
}
static uint32_t rs_sbuf_left(struct rsocket *rs)
ibsge.lkey = 0;
ibsge.length = sizeof(sge);
- rs_post_write(rs, &ibsge, 1,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
- IBV_SEND_INLINE,
- rs->remote_sgl.addr +
- rs->remote_sge * sizeof(struct rs_sge),
- rs->remote_sgl.key);
+ rs_post_write_msg(rs, &ibsge, 1,
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+ IBV_SEND_INLINE,
+ rs->remote_sgl.addr +
+ rs->remote_sge * sizeof(struct rs_sge),
+ rs->remote_sgl.key);
rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
rs->rbuf_free_offset += rs->rbuf_size >> 1;
if (++rs->remote_sge == rs->remote_sgl.length)
rs->remote_sge = 0;
} else {
- rs_post_write(rs, NULL, 0,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
+ rs_post_write_msg(rs, NULL, 0,
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+ 0, 0, 0);
}
}
case RS_OP_SGL:
rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
break;
+ case RS_OP_IOMAP_SGL:
+ /* The iomap was updated, that's nice to know. */
+ break;
case RS_OP_CTRL:
if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
rs->state = rs_disconnected;
rs->state &= ~rs_connect_rd;
}
break;
+ case RS_OP_WRITE:
+ /* We really shouldn't be here. */
+ break;
default:
rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
rs->state = rs_disconnected;
break;
+ case RS_OP_IOMAP_SGL:
+ rs->sqe_avail++;
+ rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
+ break;
default:
rs->sqe_avail++;
rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
if ((rs->state & rs_connected) && rs->ctrl_avail) {
rs->ctrl_avail--;
- ret = rs_post_write(rs, NULL, 0,
- rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+ ret = rs_post_write_msg(rs, NULL, 0,
+ rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
}
}
if (rs->sq_inline < RS_MIN_INLINE)
rs->sq_inline = RS_MIN_INLINE;
break;
+ case RDMA_IOMAPSIZE:
+ rs->target_iomap_size = (uint16_t) rs_scale_to_value(
+ (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
+ break;
default:
break;
}
*((int *) optval) = rs->sq_inline;
*optlen = sizeof(int);
break;
+ case RDMA_IOMAPSIZE:
+ *((int *) optval) = rs->target_iomap_size;
+ *optlen = sizeof(int);
+ break;
default:
ret = ENOTSUP;
break;
return ret;
}
+static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
+{
+ int i;
+
+ for (i = 0; i < rs->target_iomap_size; i++) {
+ if (offset >= rs->target_iomap[i].offset &&
+ offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
+ return &rs->target_iomap[i];
+ }
+ return NULL;
+}
+
size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
{
+ struct rsocket *rs;
+ struct rs_iomap *iom = NULL;
+ struct ibv_sge sge;
+ size_t left;
+ uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
+ int ret = 0;
+
+ rs = idm_at(&idm, socket);
+ fastlock_acquire(&rs->slock);
+ if (rs->iomap_pending) {
+ ret = rs_send_iomaps(rs, flags);
+ if (ret)
+ goto out;
+ }
+ for (left = count; left;
+ left -= xfer_size, buf += xfer_size, offset += xfer_size) {
+ if (!iom || offset > iom->offset + iom->sge.length) {
+ iom = rs_find_iomap(rs, offset);
+ if (!iom)
+ break;
+ }
+
+ if (!rs_can_send(rs)) {
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
+ if (ret)
+ break;
+ if (!(rs->state & rs_connect_wr)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+ }
+
+ if (olen < left) {
+ xfer_size = olen;
+ if (olen < RS_MAX_TRANSFER)
+ olen <<= 1;
+ } else {
+ xfer_size = left;
+ }
+
+ if (xfer_size > rs->sbuf_bytes_avail)
+ xfer_size = rs->sbuf_bytes_avail;
+ if (xfer_size > iom->offset + iom->sge.length - offset)
+ xfer_size = iom->offset + iom->sge.length - offset;
+
+ if (xfer_size <= rs->sq_inline) {
+ sge.addr = (uintptr_t) buf;
+ sge.length = xfer_size;
+ sge.lkey = 0;
+ ret = rs_write_direct(rs, iom, offset, &sge, 1,
+ xfer_size, IBV_SEND_INLINE);
+ } else if (xfer_size <= rs_sbuf_left(rs)) {
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
+ rs->ssgl[0].length = xfer_size;
+ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
+ if (xfer_size < rs_sbuf_left(rs))
+ rs->ssgl[0].addr += xfer_size;
+ else
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+ } else {
+ rs->ssgl[0].length = rs_sbuf_left(rs);
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
+ rs->ssgl[0].length);
+ rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+ memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+ }
+ if (ret)
+ break;
+ }
+out:
+ fastlock_release(&rs->slock);
+ return (ret && left == count) ? ret : count - left;
}