#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/time.h>
#include <stdarg.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
+#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+static uint32_t polling_time = 10;
+
/*
* Immediate data format is determined by the upper bits
* bit 31: message type, 0 - data, 1 - control
uint8_t *sbuf;
};
+void rs_configure(void)
+{
+ FILE *f;
+ static int init;
+
+ if (init)
+ return;
+
+ pthread_mutex_lock(&mut);
+ if (init)
+ goto out;
+
+ if ((f = fopen(RS_CONF_DIR "/polling_time", "r"))) {
+ fscanf(f, "%u", &polling_time);
+ fclose(f);
+ }
+ init = 1;
+out:
+ pthread_mutex_unlock(&mut);
+}
+
/*
* We currently generate a completion per send. sqe_count = 1
*/
(type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP))
return ERR(ENOTSUP);
+ rs_configure();
rs = rs_alloc(NULL);
if (!rs)
return ERR(ENOMEM);
return ret;
}
+static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+ struct timeval s, e;
+ uint32_t poll_time = 0;
+ int ret;
+
+ do {
+ ret = rs_process_cq(rs, 1, test);
+ if (!ret || nonblock || errno != EWOULDBLOCK)
+ return ret;
+
+ if (!poll_time)
+ gettimeofday(&s, NULL);
+
+ gettimeofday(&e, NULL);
+ poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
+ (e.tv_usec - s.tv_usec) + 1;
+ } while (poll_time <= polling_time);
+
+ ret = rs_process_cq(rs, 0, test);
+ return ret;
+}
+
static int rs_nonblocking(struct rsocket *rs, int flags)
{
return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
}
fastlock_acquire(&rs->rlock);
if (!rs_have_rdata(rs)) {
- ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_conn_have_rdata);
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags), rs_conn_have_rdata);
if (ret && errno != ECONNRESET)
goto out;
}
fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
- ret = rs_process_cq(rs, rs_nonblocking(rs, flags),
- rs_conn_can_send);
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
if (ret)
break;
if (rs->state != rs_connected) {
fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
- ret = rs_process_cq(rs, rs_nonblocking(rs, flags),
- rs_conn_can_send);
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
if (ret)
break;
if (rs->state != rs_connected) {
*/
int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
{
+ struct timeval s, e;
struct pollfd *rfds;
+ uint32_t poll_time = 0;
int ret;
- ret = rs_poll_check(fds, nfds);
- if (ret || !timeout)
- return ret;
+ do {
+ ret = rs_poll_check(fds, nfds);
+ if (ret || !timeout)
+ return ret;
+
+ if (!poll_time)
+ gettimeofday(&s, NULL);
+
+ gettimeofday(&e, NULL);
+ poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
+ (e.tv_usec - s.tv_usec) + 1;
+ } while (poll_time <= polling_time);
rfds = rs_fds_alloc(nfds);
if (!rfds)