}
static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- int iovcnt, int flags)
+ int iovcnt, int flags, uint8_t op)
{
struct ds_udp_header hdr;
struct msghdr msg;
memcpy(&miov[1], iov, sizeof *iov * iovcnt);
memset(&msg, 0, sizeof msg);
+ msg.msg_name = rs->conn_dest->addr;
+ msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
msg.msg_iov = miov;
msg.msg_iovlen = iovcnt + 1;
return sendmsg(rs->udp_sock, msg, flags);
udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
}
-static int rs_svc_comp_local_net(struct ds_dest *dest,
- struct ds_udp_header *udp_hdr)
+static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
+ union socket_addr *src)
{
- return ((udp_hdr->version == 4) &&
- (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET) &&
- (udp_hdr->addr.ipv4 == dest->qp->cm_id->route.addr.src_sin.sin_addr)) ||
- ((udp_hdr->version == 6) &&
- (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET6) &&
- (!memcmp(udp_hdr->addr.ipv6,
- &dest->qp->cm_id->route.addr.src_sin6.sin6_addr, 16)));
+ struct ds_header hdr;
+ struct ds_smsg *msg;
+ struct ibv_sge sge;
+ uint64_t offset;
+
+ if (!ds_can_send(rs)) {
+ if (ds_get_comp(rs, 0, ds_can_send))
+ return;
+ }
+
+ msg = rs->smsg_free;
+ rs->smsg_free = msg->next;
+ rs->sqe_avail--;
+
+ ds_format_hdr(&hdr, src);
+ memcpy((void *) msg, &hdr, hdr.length);
+ memcpy((void *) msg + hdr.length, buf, len);
+ sge.addr = (uintptr_t) msg;
+ sge.length = hdr.length + len;
+ sge.lkey = rs->smr->lkey;
+ offset = (uint8_t *) msg - rs->sbuf;
+
+ ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
}
static void rs_svc_process_rs(struct rsocket *rs)
{
- struct ds_dest *dest;
+ struct ds_dest *dest, *cur_dest;
struct ds_udp_header *udp_hdr;
union socket_addr addr;
socklen_t addrlen = sizeof addr;
if (!dest->ah || (dest->qpn != udp_hdr->qpn))
rs_svc_create_ah(rs, dest, udp_hdr->qpn);
- if (!rs_svc_comp_local_net(dest, udp_hdr)) {
-
- ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
- if (!ret) {
- if (!dest->ah)
- rs_svc_create_ah(rs, dest, udp_hdr);
- rs_send_udp_reply(rs, dest);
- }
- }
-
+ /* to do: handle when dest local ip address doesn't match udp ip */
+ fastlock_acquire(&rs->slock);
+ cur_dest = rs->conn_dest;
if (udp_hdr->op == RS_OP_DATA) {
- fastlock_acquire(&rs->slock);
- ret = dsend(rs, buf, len, flags);
- fastlock_release(&rs->slock);
+ rs->conn_dest = &dest->qp->dest;
+ rs_svc_forward(rs, buf, len, addr);
}
+
+ rs->conn_dest = dest;
+ ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL);
+ rs->conn_dest = cur_dest;
+ fastlock_release(&rs->slock);
}
static int rs_svc_run(void *arg)