]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rspreload: Move fork handling to first data transfer
authorSean Hefty <sean.hefty@intel.com>
Fri, 20 Jul 2012 00:09:29 +0000 (17:09 -0700)
committerSean Hefty <sean.hefty@intel.com>
Fri, 20 Jul 2012 00:09:29 +0000 (17:09 -0700)
Instead of hooking fork and migrating the last accepted
connection to rsockets, perform the migration when the socket
first tries to send or receive data.

This is necessary to handle more complex cases of fork, where
the child process establishes connections.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/preload.c

index f824af3fb2a0fc41927cfe00189a8e962fe2c374..79340c6f4965b98b260d2fcd775b2eba0e570548 100644 (file)
@@ -96,7 +96,6 @@ static int sq_size;
 static int rq_size;
 static int sq_inline;
 static int fork_support;
-static int last_accept = -1;
 
 enum fd_type {
        fd_normal,
@@ -453,15 +452,20 @@ int accept(int socket, struct sockaddr *addr, socklen_t *addrlen)
                }
 
                fd_store(index, ret, type);
-               last_accept = (type == fd_fork) ? index : -1;
                return index;
        } else {
-               last_accept = -1;
                return real.accept(fd, addr, addrlen);
        }
 }
 
-static int connect_fork(int socket, const struct sockaddr *addr, socklen_t addrlen)
+/*
+ * We can't fork RDMA connections and pass them from the parent to the child
+ * process.  Instead, we need to establish the RDMA connection after calling
+ * fork.  To do this, we delay establishing the RDMA connection until we try
+ * to send/receive on the server side.  On the client side, we don't expect
+ * to fork, so we switch from a TCP connection to an rsocket when connecting.
+ */
+static int fork_active(int socket, const struct sockaddr *addr, socklen_t addrlen)
 {
        int fd, ret;
        uint32_t msg;
@@ -489,6 +493,95 @@ static int connect_fork(int socket, const struct sockaddr *addr, socklen_t addrl
        return rconnect(ret, addr, addrlen);
 }
 
+static void fork_passive(int socket)
+{
+       struct sockaddr_in6 sin6;
+       sem_t *sem;
+       int lfd, sfd, dfd, ret, param;
+       socklen_t len;
+       uint32_t msg;
+
+       fd_get(socket, &sfd);
+
+       len = sizeof sin6;
+       ret = real.getsockname(sfd, (struct sockaddr *) &sin6, &len);
+       if (ret)
+               goto out;
+       sin6.sin6_flowinfo = sin6.sin6_scope_id = 0;
+       memset(&sin6.sin6_addr, 0, sizeof sin6.sin6_addr);
+
+       sem = sem_open("/rsocket_fork", O_CREAT | O_RDWR,
+                      S_IRWXU | S_IRWXG, 1);
+       if (sem == SEM_FAILED) {
+               ret = -1;
+               goto out;
+       }
+
+       lfd = rsocket(sin6.sin6_family, SOCK_STREAM, 0);
+       if (lfd < 0) {
+               ret  = lfd;
+               goto sclose;
+       }
+
+       param = 1;
+       rsetsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &param, sizeof param);
+
+       sem_wait(sem);
+       ret = rbind(lfd, (struct sockaddr *) &sin6, sizeof sin6);
+       if (ret)
+               goto lclose;
+
+       ret = rlisten(lfd, 1);
+       if (ret)
+               goto lclose;
+
+       msg = 0;
+       len = real.write(sfd, &msg, sizeof msg);
+       if (len != sizeof msg)
+               goto lclose;
+
+       dfd = raccept(lfd, NULL, NULL);
+       if (dfd < 0) {
+               ret  = dfd;
+               goto lclose;
+       }
+
+       param = 1;
+       rsetsockopt(dfd, IPPROTO_TCP, TCP_NODELAY, &param, sizeof param);
+       set_rsocket_options(dfd);
+
+       copysockopts(dfd, sfd, &rs, &real);
+       real.shutdown(sfd, SHUT_RDWR);
+       real.close(sfd);
+       fd_store(socket, dfd, fd_rsocket);
+
+lclose:
+       rclose(lfd);
+       sem_post(sem);
+sclose:
+       sem_close(sem);
+out:
+       if (ret)
+               fd_store(socket, sfd, fd_normal);
+}
+
+static inline enum fd_type fd_fork_get(int index, int *fd)
+{
+       struct fd_info *fdi;
+
+       fdi = idm_lookup(&idm, index);
+       if (fdi) {
+               if (fdi->type == fd_fork)
+                       fork_passive(index);
+               *fd = fdi->fd;
+               return fdi->type;
+
+       } else {
+               *fd = index;
+               return fd_normal;
+       }
+}
+
 int connect(int socket, const struct sockaddr *addr, socklen_t addrlen)
 {
        struct sockaddr_in *sin;
@@ -496,7 +589,7 @@ int connect(int socket, const struct sockaddr *addr, socklen_t addrlen)
 
        switch (fd_get(socket, &fd)) {
        case fd_fork:
-               return connect_fork(socket, addr, addrlen);
+               return fork_active(socket, addr, addrlen);
        case fd_rsocket:
                sin = (struct sockaddr_in *) addr;
                if (ntohs(sin->sin_port) > 1024) {
@@ -522,7 +615,7 @@ int connect(int socket, const struct sockaddr *addr, socklen_t addrlen)
 ssize_t recv(int socket, void *buf, size_t len, int flags)
 {
        int fd;
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rrecv(fd, buf, len, flags) : real.recv(fd, buf, len, flags);
 }
 
@@ -530,7 +623,7 @@ ssize_t recvfrom(int socket, void *buf, size_t len, int flags,
                 struct sockaddr *src_addr, socklen_t *addrlen)
 {
        int fd;
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rrecvfrom(fd, buf, len, flags, src_addr, addrlen) :
                real.recvfrom(fd, buf, len, flags, src_addr, addrlen);
 }
@@ -538,7 +631,7 @@ ssize_t recvfrom(int socket, void *buf, size_t len, int flags,
 ssize_t recvmsg(int socket, struct msghdr *msg, int flags)
 {
        int fd;
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rrecvmsg(fd, msg, flags) : real.recvmsg(fd, msg, flags);
 }
 
@@ -546,7 +639,7 @@ ssize_t read(int socket, void *buf, size_t count)
 {
        int fd;
        init_preload();
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rread(fd, buf, count) : real.read(fd, buf, count);
 }
 
@@ -554,14 +647,14 @@ ssize_t readv(int socket, const struct iovec *iov, int iovcnt)
 {
        int fd;
        init_preload();
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rreadv(fd, iov, iovcnt) : real.readv(fd, iov, iovcnt);
 }
 
 ssize_t send(int socket, const void *buf, size_t len, int flags)
 {
        int fd;
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rsend(fd, buf, len, flags) : real.send(fd, buf, len, flags);
 }
 
@@ -569,7 +662,7 @@ ssize_t sendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
 {
        int fd;
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rsendto(fd, buf, len, flags, dest_addr, addrlen) :
                real.sendto(fd, buf, len, flags, dest_addr, addrlen);
 }
@@ -577,7 +670,7 @@ ssize_t sendto(int socket, const void *buf, size_t len, int flags,
 ssize_t sendmsg(int socket, const struct msghdr *msg, int flags)
 {
        int fd;
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rsendmsg(fd, msg, flags) : real.sendmsg(fd, msg, flags);
 }
 
@@ -585,7 +678,7 @@ ssize_t write(int socket, const void *buf, size_t count)
 {
        int fd;
        init_preload();
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rwrite(fd, buf, count) : real.write(fd, buf, count);
 }
 
@@ -593,7 +686,7 @@ ssize_t writev(int socket, const struct iovec *iov, int iovcnt)
 {
        int fd;
        init_preload();
-       return (fd_get(socket, &fd) == fd_rsocket) ?
+       return (fd_fork_get(socket, &fd) == fd_rsocket) ?
                rwritev(fd, iov, iovcnt) : real.writev(fd, iov, iovcnt);
 }
 
@@ -812,85 +905,3 @@ int fcntl(int socket, int cmd, ... /* arg */)
        va_end(args);
        return ret;
 }
-
-/*
- * We can't fork RDMA connections and pass them from the parent to the child
- * process.  Intercept the fork call, and if we're the child establish the
- * RDMA connection after calling fork.  The assumption is that the last
- * connection accepted by the server will be processed by the child after the
- * fork call.
- *
- * It would be better to establishing the RDMA connection once the child
- * process tries to use the connection after the fork call (i.e. in a read
- * or write call), rather than making the previous assumption.
- */
-pid_t fork(void)
-{
-       struct sockaddr_in6 sin6;
-       pid_t pid;
-       sem_t *sem;
-       int lfd, sfd, dfd, ret, param;
-       socklen_t len;
-       uint32_t msg;
-
-       init_preload();
-       pid = real.fork();
-       if (pid || !fork_support || (last_accept < 0) ||
-           (fd_get(last_accept, &sfd) != fd_fork))
-               goto out;
-
-       len = sizeof sin6;
-       ret = real.getsockname(sfd, (struct sockaddr *) &sin6, &len);
-       if (ret)
-               goto out;
-       sin6.sin6_flowinfo = sin6.sin6_scope_id = 0;
-       memset(&sin6.sin6_addr, 0, sizeof sin6.sin6_addr);
-
-       sem = sem_open("/rsocket_fork", O_CREAT | O_RDWR,
-                      S_IRWXU | S_IRWXG, 1);
-       if (sem == SEM_FAILED)
-               goto out;
-
-       lfd = rsocket(sin6.sin6_family, SOCK_STREAM, 0);
-       if (lfd < 0)
-               goto sclose;
-
-       param = 1;
-       rsetsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &param, sizeof param);
-
-       sem_wait(sem);
-       ret = rbind(lfd, (struct sockaddr *) &sin6, sizeof sin6);
-       if (ret)
-               goto lclose;
-
-       ret = rlisten(lfd, 1);
-       if (ret)
-               goto lclose;
-
-       msg = 0;
-       ret = real.write(sfd, &msg, sizeof msg);
-       if (ret != sizeof msg)
-               goto lclose;
-
-       dfd = raccept(lfd, NULL, NULL);
-       if (dfd < 0)
-               goto lclose;
-
-       param = 1;
-       rsetsockopt(dfd, IPPROTO_TCP, TCP_NODELAY, &param, sizeof param);
-       set_rsocket_options(dfd);
-
-       copysockopts(dfd, sfd, &rs, &real);
-       real.shutdown(sfd, SHUT_RDWR);
-       real.close(sfd);
-       fd_store(last_accept, dfd, fd_rsocket);
-
-lclose:
-       rclose(lfd);
-       sem_post(sem);
-sclose:
-       sem_close(sem);
-out:
-       last_accept = -1;
-       return pid;
-}