#if HAVE_CONFIG_H
# include <config.h>
#endif /* HAVE_CONFIG_H */
+#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
if (var)
sq_inline = atoi(var);
- var = getenv("RDMAV_FORK_SAFE");
+ var = getenv("RS_FORK_SUPPORT");
if (var)
fork_support = atoi(var);
}
return index;
recursive = 1;
+printf("skipping rsocket call\n");
+goto realsock;
ret = rsocket(domain, type, protocol);
recursive = 0;
if (ret >= 0) {
if (fork_support) {
rclose(ret);
+realsock:
ret = real.socket(domain, type, protocol);
if (ret < 0)
return ret;
uint32_t msg;
long flags;
+ printf("fork_active\n");
sfd = fd_getd(socket);
flags = real.fcntl(sfd, F_GETFL);
real.fcntl(sfd, F_SETFL, 0);
+ printf("fork_active - recv\n");
ret = real.recv(sfd, &msg, sizeof msg, MSG_PEEK);
+ printf("fork_active - recv %d\n", ret);
real.fcntl(sfd, F_SETFL, flags);
if ((ret != sizeof msg) || msg)
goto err1;
len = sizeof addr;
ret = real.getpeername(sfd, (struct sockaddr *) &addr, &len);
+ printf("fork_active - getpeername %d\n", ret);
if (ret)
goto err1;
+ printf("fork_active - create rsocket\n");
dfd = rsocket(addr.ss_family, SOCK_STREAM, 0);
if (dfd < 0)
goto err1;
+ printf("fork_active - rconnect\n");
ret = rconnect(dfd, (struct sockaddr *) &addr, len);
if (ret)
goto err2;
socklen_t len;
uint32_t msg;
+ printf("fork_passive\n");
sfd = fd_getd(socket);
len = sizeof sin6;
ret = -1;
goto out;
}
+ printf("fork_passive - create rsocket\n");
lfd = rsocket(sin6.sin6_family, SOCK_STREAM, 0);
if (lfd < 0) {
sem_wait(sem);
ret = rbind(lfd, (struct sockaddr *) &sin6, sizeof sin6);
+ printf("fork_passive bind %d\n", ret);
if (ret)
goto lclose;
ret = rlisten(lfd, 1);
+ printf("fork_passive listen %d\n", ret);
if (ret)
goto lclose;
msg = 0;
len = real.write(sfd, &msg, sizeof msg);
+ printf("fork_passive write %d\n", len);
if (len != sizeof msg)
goto lclose;
+ printf("fork_passive - raccept\n");
dfd = raccept(lfd, NULL, NULL);
+ printf("fork_passive accept %d\n", dfd);
if (dfd < 0) {
ret = dfd;
goto lclose;
fastlock_t cq_lock;
fastlock_t cq_wait_lock;
+ struct rs_msg last_ctrl;
+ uint64_t bytes_sent;
+ uint64_t bytes_received;
+
int opts;
long fd_flags;
uint64_t so_opts;
rs->ctrl_avail--;
rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
+ rs->last_ctrl.op = rs->rseq_no + rs->rq_size;
if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
if (!(rs->opts & RS_OPT_SWAP_SGL)) {
sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
ibsge.addr = (uintptr_t) &sge;
ibsge.lkey = 0;
ibsge.length = sizeof(sge);
+ rs->last_ctrl.data = sge.length;
rs_post_write(rs, &ibsge, 1,
rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
if (++rs->remote_sge == rs->remote_sgl.length)
rs->remote_sge = 0;
} else {
+ rs->last_ctrl.data = 0;
rs_post_write(rs, NULL, 0,
rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
}
} while (left && (flags & MSG_WAITALL) && (rs->state & rs_connect_rd));
+ rs->bytes_received += (len - left);
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
}
if (ret)
break;
}
+ rs->bytes_sent += (len - left);
fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;