Bottom: a8d2f5e9e4909e604d5a7b614f02d08080507dcb
-Top: 2c3d4d1bd4b9fe1a653896a8d234d46ca6620399
+Top: 372ba59a4b3584a6777c7ad5fa7340fbcf11dcfd
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-07-13 15:25:53 -0700
---
diff --git a/src/preload.c b/src/preload.c
-index 498e813..22c4b65 100644
+index 498e813..8f8fc11 100644
--- a/src/preload.c
+++ b/src/preload.c
-@@ -92,10 +92,12 @@ static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+@@ -46,6 +46,8 @@
+ #include <string.h>
+ #include <netinet/in.h>
+ #include <netinet/tcp.h>
++#include <unistd.h>
++#include <semaphore.h>
+
+ #include <rdma/rdma_cma.h>
+ #include <rdma/rdma_verbs.h>
+@@ -81,6 +83,7 @@ struct socket_calls {
+ int (*getsockopt)(int socket, int level, int optname,
+ void *optval, socklen_t *optlen);
+ int (*fcntl)(int socket, int cmd, ... /* arg */);
++ pid_t (*fork)(void);
+ };
+
+ static struct socket_calls real;
+@@ -92,10 +95,13 @@ static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static int sq_size;
static int rq_size;
static int sq_inline;
+static int fork_support;
++static int last_accept = -1;
enum fd_type {
fd_normal,
};
struct fd_info {
-@@ -207,6 +209,10 @@ void getenv_options(void)
+@@ -207,6 +213,10 @@ void getenv_options(void)
var = getenv("RS_INLINE");
if (var)
sq_inline = atoi(var);
}
static void init_preload(void)
-@@ -365,8 +371,16 @@ int socket(int domain, int type, int protocol)
+@@ -244,6 +254,7 @@ static void init_preload(void)
+ real.setsockopt = dlsym(RTLD_NEXT, "setsockopt");
+ real.getsockopt = dlsym(RTLD_NEXT, "getsockopt");
+ real.fcntl = dlsym(RTLD_NEXT, "fcntl");
++ real.fork = dlsym(RTLD_NEXT, "fork");
+
+ rs.socket = dlsym(RTLD_DEFAULT, "rsocket");
+ rs.bind = dlsym(RTLD_DEFAULT, "rbind");
+@@ -280,12 +291,13 @@ out:
+ * the same settings and bindings as the current socket. We currently only
+ * handle setting a few of the more common values.
+ */
+-static int transpose_socket(int index, int *fd, enum fd_type new_type)
++static int transpose_socket(int socket, enum fd_type new_type)
+ {
+- socklen_t len = 0;
+- int new_fd, param, ret;
++ int fd, new_fd, param, ret;
+ struct socket_calls *new, *old;
++ socklen_t len = 0;
+
++ fd = fd_getd(socket);
+ if (new_type == fd_rsocket) {
+ new = &rs;
+ old = ℜ
+@@ -294,7 +306,7 @@ static int transpose_socket(int index, int *fd, enum fd_type new_type)
+ old = &rs;
+ }
+
+- ret = old->getsockname(*fd, NULL, &len);
++ ret = old->getsockname(fd, NULL, &len);
+ if (ret)
+ return ret;
+
+@@ -303,30 +315,28 @@ static int transpose_socket(int index, int *fd, enum fd_type new_type)
+ if (new_fd < 0)
+ return new_fd;
+
+- ret = old->fcntl(*fd, F_GETFL);
++ ret = old->fcntl(fd, F_GETFL);
+ if (ret > 0)
+ ret = new->fcntl(new_fd, F_SETFL, ret);
+ if (ret)
+ goto err;
+
+ len = sizeof param;
+- ret = old->getsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, ¶m, &len);
++ ret = old->getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, ¶m, &len);
+ if (param && !ret)
+ ret = new->setsockopt(new_fd, SOL_SOCKET, SO_REUSEADDR, ¶m, len);
+ if (ret)
+ goto err;
+
+ len = sizeof param;
+- ret = old->getsockopt(*fd, IPPROTO_TCP, TCP_NODELAY, ¶m, &len);
++ ret = old->getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, ¶m, &len);
+ if (param && !ret)
+ ret = new->setsockopt(new_fd, IPPROTO_TCP, TCP_NODELAY, ¶m, len);
+ if (ret)
+ goto err;
+
+- old->close(*fd);
+ fd_store(socket, new_fd, new_type);
+- *fd = new_fd;
+- return 0;
++ return new_fd;
+
+ err:
+ new->close(new_fd);
+@@ -365,8 +375,16 @@ int socket(int domain, int type, int protocol)
ret = rsocket(domain, type, protocol);
recursive = 0;
if (ret >= 0) {
return index;
}
fd_close(index, &ret);
-@@ -421,12 +435,37 @@ int accept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -384,9 +402,11 @@ int bind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ if (!sin->sin_port || ntohs(sin->sin_port) > 1024)
+ return rbind(fd, addr, addrlen);
+
+- ret = transpose_socket(socket, &fd, fd_normal);
+- if (ret)
++ ret = transpose_socket(socket, fd_normal);
++ if (ret < 0)
+ return ret;
++ rclose(fd);
++ fd = ret;
+ }
+
+ return real.bind(fd, addr, addrlen);
+@@ -415,18 +435,48 @@ int accept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+ }
+
+ fd_store(index, ret, fd_rsocket);
++ last_accept = index;
+ return index;
+ } else {
++ last_accept = -1;
+ return real.accept(fd, addr, addrlen);
}
}
+ int fd, ret;
+ uint32_t msg;
+
-+ fd_get(socket, &fd);
++ fd = fd_getd(socket);
+ ret = real.connect(fd, addr, addrlen);
+ if (ret)
+ return ret;
+
+ ret = real.read(fd, &msg, sizeof msg);
-+ if (ret != sizeof msg)
-+ return ret;
++ if ((ret != sizeof msg) || msg) {
++ fd_store(socket, fd, fd_normal);
++ return 0;
++ }
+
-+ ret = transpose_socket(socket, &fd, fd_rsocket);
-+ if (ret)
++ ret = transpose_socket(socket, fd_rsocket);
++ if (ret < 0)
+ return ret;
+
-+ return rconnect(fd, addr, addrlen);
++ real.close(fd);
++ return rconnect(ret, addr, addrlen);
+}
+
int connect(int socket, const struct sockaddr *addr, socklen_t addrlen)
sin = (struct sockaddr_in *) addr;
if (ntohs(sin->sin_port) > 1024) {
ret = rconnect(fd, addr, addrlen);
+@@ -434,9 +484,12 @@ int connect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ return ret;
+ }
+
+- ret = transpose_socket(socket, &fd, fd_normal);
+- if (ret)
++ ret = transpose_socket(socket, fd_normal);
++ if (ret < 0)
+ return ret;
++ rclose(fd);
++ fd = ret;
++ break;
+ }
+
+ return real.connect(fd, addr, addrlen);
+@@ -735,3 +788,57 @@ int fcntl(int socket, int cmd, ... /* arg */)
+ va_end(args);
+ return ret;
+ }
++
++/*
++ * We can't fork RDMA connections and pass them from the parent to the child
++ * process. Intercept the fork call, and if we're the child establish the
++ * RDMA connection after calling fork. The assumption is that the last
++ * connection accepted by the server will be processed by the child after the
++ * fork call.
++ *
++ * It would be better to establishing the RDMA connection once the child
++ * process tries to use the connection after the fork call (i.e. in a read
++ * or write call), rather than making the previous assumption.
++ */
++pid_t fork(void)
++{
++ struct sockaddr_storage sa;
++ pid_t pid;
++ sem_t *sem;
++ int fd, lfd, newfd, ret, len, param;
++ uint32_t msg;
++
++ init_preload();
++ pid = real.fork();
++ if (pid || !fork_support || (last_accept < 0) ||
++ (fd_get(last_accept, &fd) != fd_fork))
++ goto out;
++
++ sem = sem_open("/rsocket_fork", O_CREAT, 0644, 1);
++ if (sem == SEM_FAILED)
++ goto out;
++
++ lfd = transpose_socket(last_accept, fd_rsocket);
++ if (lfd < 0)
++ goto out;
++
++ param = 1;
++ len = sizeof param;
++ rsetsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, ¶m, len);
++
++ len = sizeof sa;
++ ret = real.getsockname(fd, &sa, &len);
++ if (ret)
++ goto out;
++
++ sem_wait(sem);
++ ret = rbind()
++
++ real.close(fd);
++
++ sem_post(sem);
++ sem_close(sem);
++out:
++ last_accept = -1;
++ return pid;
++}
+++ /dev/null
-Bottom: 2c3d4d1bd4b9fe1a653896a8d234d46ca6620399
-Top: 372ba59a4b3584a6777c7ad5fa7340fbcf11dcfd
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-07-17 10:02:57 -0700
-
-Refresh of fork
-
----
-
-diff --git a/src/preload.c b/src/preload.c
-index 22c4b65..8f8fc11 100644
---- a/src/preload.c
-+++ b/src/preload.c
-@@ -46,6 +46,8 @@
- #include <string.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
-+#include <unistd.h>
-+#include <semaphore.h>
-
- #include <rdma/rdma_cma.h>
- #include <rdma/rdma_verbs.h>
-@@ -81,6 +83,7 @@ struct socket_calls {
- int (*getsockopt)(int socket, int level, int optname,
- void *optval, socklen_t *optlen);
- int (*fcntl)(int socket, int cmd, ... /* arg */);
-+ pid_t (*fork)(void);
- };
-
- static struct socket_calls real;
-@@ -93,6 +96,7 @@ static int sq_size;
- static int rq_size;
- static int sq_inline;
- static int fork_support;
-+static int last_accept = -1;
-
- enum fd_type {
- fd_normal,
-@@ -250,6 +254,7 @@ static void init_preload(void)
- real.setsockopt = dlsym(RTLD_NEXT, "setsockopt");
- real.getsockopt = dlsym(RTLD_NEXT, "getsockopt");
- real.fcntl = dlsym(RTLD_NEXT, "fcntl");
-+ real.fork = dlsym(RTLD_NEXT, "fork");
-
- rs.socket = dlsym(RTLD_DEFAULT, "rsocket");
- rs.bind = dlsym(RTLD_DEFAULT, "rbind");
-@@ -286,12 +291,13 @@ out:
- * the same settings and bindings as the current socket. We currently only
- * handle setting a few of the more common values.
- */
--static int transpose_socket(int index, int *fd, enum fd_type new_type)
-+static int transpose_socket(int socket, enum fd_type new_type)
- {
-- socklen_t len = 0;
-- int new_fd, param, ret;
-+ int fd, new_fd, param, ret;
- struct socket_calls *new, *old;
-+ socklen_t len = 0;
-
-+ fd = fd_getd(socket);
- if (new_type == fd_rsocket) {
- new = &rs;
- old = ℜ
-@@ -300,7 +306,7 @@ static int transpose_socket(int index, int *fd, enum fd_type new_type)
- old = &rs;
- }
-
-- ret = old->getsockname(*fd, NULL, &len);
-+ ret = old->getsockname(fd, NULL, &len);
- if (ret)
- return ret;
-
-@@ -309,30 +315,28 @@ static int transpose_socket(int index, int *fd, enum fd_type new_type)
- if (new_fd < 0)
- return new_fd;
-
-- ret = old->fcntl(*fd, F_GETFL);
-+ ret = old->fcntl(fd, F_GETFL);
- if (ret > 0)
- ret = new->fcntl(new_fd, F_SETFL, ret);
- if (ret)
- goto err;
-
- len = sizeof param;
-- ret = old->getsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, ¶m, &len);
-+ ret = old->getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, ¶m, &len);
- if (param && !ret)
- ret = new->setsockopt(new_fd, SOL_SOCKET, SO_REUSEADDR, ¶m, len);
- if (ret)
- goto err;
-
- len = sizeof param;
-- ret = old->getsockopt(*fd, IPPROTO_TCP, TCP_NODELAY, ¶m, &len);
-+ ret = old->getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, ¶m, &len);
- if (param && !ret)
- ret = new->setsockopt(new_fd, IPPROTO_TCP, TCP_NODELAY, ¶m, len);
- if (ret)
- goto err;
-
-- old->close(*fd);
- fd_store(socket, new_fd, new_type);
-- *fd = new_fd;
-- return 0;
-+ return new_fd;
-
- err:
- new->close(new_fd);
-@@ -398,9 +402,11 @@ int bind(int socket, const struct sockaddr *addr, socklen_t addrlen)
- if (!sin->sin_port || ntohs(sin->sin_port) > 1024)
- return rbind(fd, addr, addrlen);
-
-- ret = transpose_socket(socket, &fd, fd_normal);
-- if (ret)
-+ ret = transpose_socket(socket, fd_normal);
-+ if (ret < 0)
- return ret;
-+ rclose(fd);
-+ fd = ret;
- }
-
- return real.bind(fd, addr, addrlen);
-@@ -429,8 +435,10 @@ int accept(int socket, struct sockaddr *addr, socklen_t *addrlen)
- }
-
- fd_store(index, ret, fd_rsocket);
-+ last_accept = index;
- return index;
- } else {
-+ last_accept = -1;
- return real.accept(fd, addr, addrlen);
- }
- }
-@@ -441,20 +449,23 @@ static int connect_fork(int socket, const struct sockaddr *addr, socklen_t addrl
- int fd, ret;
- uint32_t msg;
-
-- fd_get(socket, &fd);
-+ fd = fd_getd(socket);
- ret = real.connect(fd, addr, addrlen);
- if (ret)
- return ret;
-
- ret = real.read(fd, &msg, sizeof msg);
-- if (ret != sizeof msg)
-- return ret;
-+ if ((ret != sizeof msg) || msg) {
-+ fd_store(socket, fd, fd_normal);
-+ return 0;
-+ }
-
-- ret = transpose_socket(socket, &fd, fd_rsocket);
-- if (ret)
-+ ret = transpose_socket(socket, fd_rsocket);
-+ if (ret < 0)
- return ret;
-
-- return rconnect(fd, addr, addrlen);
-+ real.close(fd);
-+ return rconnect(ret, addr, addrlen);
- }
-
- int connect(int socket, const struct sockaddr *addr, socklen_t addrlen)
-@@ -473,9 +484,12 @@ int connect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- return ret;
- }
-
-- ret = transpose_socket(socket, &fd, fd_normal);
-- if (ret)
-+ ret = transpose_socket(socket, fd_normal);
-+ if (ret < 0)
- return ret;
-+ rclose(fd);
-+ fd = ret;
-+ break;
- }
-
- return real.connect(fd, addr, addrlen);
-@@ -774,3 +788,57 @@ int fcntl(int socket, int cmd, ... /* arg */)
- va_end(args);
- return ret;
- }
-+
-+/*
-+ * We can't fork RDMA connections and pass them from the parent to the child
-+ * process. Intercept the fork call, and if we're the child establish the
-+ * RDMA connection after calling fork. The assumption is that the last
-+ * connection accepted by the server will be processed by the child after the
-+ * fork call.
-+ *
-+ * It would be better to establishing the RDMA connection once the child
-+ * process tries to use the connection after the fork call (i.e. in a read
-+ * or write call), rather than making the previous assumption.
-+ */
-+pid_t fork(void)
-+{
-+ struct sockaddr_storage sa;
-+ pid_t pid;
-+ sem_t *sem;
-+ int fd, lfd, newfd, ret, len, param;
-+ uint32_t msg;
-+
-+ init_preload();
-+ pid = real.fork();
-+ if (pid || !fork_support || (last_accept < 0) ||
-+ (fd_get(last_accept, &fd) != fd_fork))
-+ goto out;
-+
-+ sem = sem_open("/rsocket_fork", O_CREAT, 0644, 1);
-+ if (sem == SEM_FAILED)
-+ goto out;
-+
-+ lfd = transpose_socket(last_accept, fd_rsocket);
-+ if (lfd < 0)
-+ goto out;
-+
-+ param = 1;
-+ len = sizeof param;
-+ rsetsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, ¶m, len);
-+
-+ len = sizeof sa;
-+ ret = real.getsockname(fd, &sa, &len);
-+ if (ret)
-+ goto out;
-+
-+ sem_wait(sem);
-+ ret = rbind()
-+
-+ real.close(fd);
-+
-+ sem_post(sem);
-+ sem_close(sem);
-+out:
-+ last_accept = -1;
-+ return pid;
-+}