miniupnpd/asyncsendto: make sendto_schedule work

This commit is contained in:
Thomas Bernard 2014-02-20 16:08:20 +01:00
parent b2143eff94
commit 06d9d36e99
2 changed files with 62 additions and 12 deletions

View File

@ -16,9 +16,19 @@
#include "asyncsendto.h" #include "asyncsendto.h"
/* state diagram for a packet :
*
* |
* V
* -> ESCHEDULED -> ESENDNOW -> sent
* ^ |
* | V
* EWAITREADY -> sent
*/
struct scheduled_send { struct scheduled_send {
LIST_ENTRY(scheduled_send) entries; LIST_ENTRY(scheduled_send) entries;
struct timeval ts; struct timeval ts;
enum {ESCHEDULED=1, EWAITREADY=2, ESENDNOW=3} state;
int sockfd; int sockfd;
const void * buf; const void * buf;
size_t len; size_t len;
@ -41,6 +51,7 @@ sendto_schedule(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen, const struct sockaddr *dest_addr, socklen_t addrlen,
unsigned int delay) unsigned int delay)
{ {
enum {ESCHEDULED, EWAITREADY, ESENDNOW} state;
ssize_t n; ssize_t n;
struct timeval tv; struct timeval tv;
struct scheduled_send * elt; struct scheduled_send * elt;
@ -48,9 +59,21 @@ sendto_schedule(int sockfd, const void *buf, size_t len, int flags,
if(delay == 0) { if(delay == 0) {
/* first try to send at once */ /* first try to send at once */
n = sendto(sockfd, buf, len, flags, dest_addr, addrlen); n = sendto(sockfd, buf, len, flags, dest_addr, addrlen);
if((n >= 0) || (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)) if(n >= 0)
return n; return n;
else if(errno == EAGAIN || errno == EWOULDBLOCK) {
/* use select() on this socket */
state = EWAITREADY;
} else if(errno == EINTR) {
state = ESENDNOW;
} else {
/* uncatched error */
return n;
}
} else {
state = ESCHEDULED;
} }
/* schedule */ /* schedule */
if(gettimeofday(&tv, 0) < 0) { if(gettimeofday(&tv, 0) < 0) {
return -1; return -1;
@ -62,6 +85,7 @@ sendto_schedule(int sockfd, const void *buf, size_t len, int flags,
(unsigned)(sizeof(struct scheduled_send) + len + addrlen)); (unsigned)(sizeof(struct scheduled_send) + len + addrlen));
return -1; return -1;
} }
elt->state = state;
/* time the packet should be sent */ /* time the packet should be sent */
elt->ts.tv_sec = tv.tv_sec + (delay / 1000); elt->ts.tv_sec = tv.tv_sec + (delay / 1000);
elt->ts.tv_usec = tv.tv_usec + (delay % 1000) * 1000; elt->ts.tv_usec = tv.tv_usec + (delay % 1000) * 1000;
@ -83,6 +107,7 @@ sendto_schedule(int sockfd, const void *buf, size_t len, int flags,
} }
/* try to send at once, and queue the packet if needed */
ssize_t ssize_t
sendto_or_schedule(int sockfd, const void *buf, size_t len, int flags, sendto_or_schedule(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen) const struct sockaddr *dest_addr, socklen_t addrlen)
@ -108,23 +133,30 @@ int get_next_scheduled_send(struct timeval * next_send)
return n; return n;
} }
/* update writefds for select() call
* return the number of packets to try to send at once */
int get_sendto_fds(fd_set * writefds, int * max_fd, const struct timeval * now) int get_sendto_fds(fd_set * writefds, int * max_fd, const struct timeval * now)
{ {
int n = 0; int n = 0;
struct scheduled_send * elt; struct scheduled_send * elt;
for(elt = send_list.lh_first; elt != NULL; elt = elt->entries.le_next) { for(elt = send_list.lh_first; elt != NULL; elt = elt->entries.le_next) {
if((elt->ts.tv_sec < now->tv_sec) || if(elt->state == EWAITREADY) {
(elt->ts.tv_sec == now->tv_sec && elt->ts.tv_usec <= now->tv_usec)) { /* last sendto() call returned EAGAIN/EWOULDBLOCK */
FD_SET(elt->sockfd, writefds); FD_SET(elt->sockfd, writefds);
if(elt->sockfd > *max_fd) if(elt->sockfd > *max_fd)
*max_fd = elt->sockfd; *max_fd = elt->sockfd;
n++; n++;
} else if((elt->ts.tv_sec < now->tv_sec) ||
(elt->ts.tv_sec == now->tv_sec && elt->ts.tv_usec <= now->tv_usec)) {
/* we waited long enough, now send ! */
elt->state = ESENDNOW;
n++;
} }
} }
syslog(LOG_DEBUG, "%x", (int)writefds->fds_bits[0]);
return n; return n;
} }
/* executed sendto() when needed */
int try_sendto(fd_set * writefds) int try_sendto(fd_set * writefds)
{ {
ssize_t n; ssize_t n;
@ -132,17 +164,26 @@ int try_sendto(fd_set * writefds)
struct scheduled_send * next; struct scheduled_send * next;
for(elt = send_list.lh_first; elt != NULL; elt = next) { for(elt = send_list.lh_first; elt != NULL; elt = next) {
next = elt->entries.le_next; next = elt->entries.le_next;
syslog(LOG_DEBUG, "s=%d fds=%x", elt->sockfd, (int)writefds->fds_bits[0]); if((elt->state == ESENDNOW) ||
if(FD_ISSET(elt->sockfd, writefds)) { (elt->state == EWAITREADY && FD_ISSET(elt->sockfd, writefds))) {
syslog(LOG_DEBUG, "sending %d bytes", (int)elt->len); syslog(LOG_DEBUG, "try_sendto(): %d bytes on socket %d",
(int)elt->len, elt->sockfd);
n = sendto(elt->sockfd, elt->buf, elt->len, elt->flags, n = sendto(elt->sockfd, elt->buf, elt->len, elt->flags,
elt->dest_addr, elt->addrlen); elt->dest_addr, elt->addrlen);
if(n < 0) { if(n < 0) {
syslog(LOG_DEBUG, "sendto: %m"); if(errno == EINTR) {
if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) /* retry at once */
elt->state = ESENDNOW;
continue; continue;
} else if(errno == EAGAIN || errno == EWOULDBLOCK) {
/* retry once the socket is ready for writing */
elt->state = EWAITREADY;
continue;
}
/* uncatched error */
return n; return n;
} else { } else {
/* remove from the list */
LIST_REMOVE(elt, entries); LIST_REMOVE(elt, entries);
free(elt); free(elt);
} }

View File

@ -58,6 +58,10 @@ int test(void)
n = sendto_or_schedule(s, "1234", 4, 0, n = sendto_or_schedule(s, "1234", 4, 0,
(struct sockaddr *)&dest_addr, sizeof(dest_addr)); (struct sockaddr *)&dest_addr, sizeof(dest_addr));
syslog(LOG_DEBUG, "sendto_or_schedule : %d", (int)n); syslog(LOG_DEBUG, "sendto_or_schedule : %d", (int)n);
n = sendto_schedule(s, "1234", 4, 0,
(struct sockaddr *)&dest_addr, sizeof(dest_addr),
4400);
syslog(LOG_DEBUG, "sendto_schedule : %d", (int)n);
n = sendto_schedule(s, "1234", 4, 0, n = sendto_schedule(s, "1234", 4, 0,
(struct sockaddr *)&dest_addr, sizeof(dest_addr), (struct sockaddr *)&dest_addr, sizeof(dest_addr),
3000); 3000);
@ -72,10 +76,16 @@ int test(void)
FD_ZERO(&writefds); FD_ZERO(&writefds);
max_fd = 0; max_fd = 0;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
i = get_sendto_fds(&writefds, &max_fd, &now);
if(now.tv_sec > next_send.tv_sec || if(now.tv_sec > next_send.tv_sec ||
(now.tv_sec == next_send.tv_sec && now.tv_usec >= next_send.tv_usec)) { (now.tv_sec == next_send.tv_sec && now.tv_usec >= next_send.tv_usec)) {
/* wait 10sec :) */ if(i > 0) {
timeout.tv_sec = 10; /* dont wait */
timeout.tv_sec = 0;
} else {
/* wait 10sec :) */
timeout.tv_sec = 10;
}
timeout.tv_usec = 0; timeout.tv_usec = 0;
} else { } else {
/* ... */ /* ... */
@ -86,7 +96,6 @@ int test(void)
timeout.tv_sec--; timeout.tv_sec--;
} }
} }
i = get_sendto_fds(&writefds, &max_fd, &now);
syslog(LOG_DEBUG, "get_sendto_fds() returned %d", i); syslog(LOG_DEBUG, "get_sendto_fds() returned %d", i);
syslog(LOG_DEBUG, "select(%d, NULL, xx, NULL, %ld.%06ld)", syslog(LOG_DEBUG, "select(%d, NULL, xx, NULL, %ld.%06ld)",
max_fd, timeout.tv_sec, timeout.tv_usec); max_fd, timeout.tv_sec, timeout.tv_usec);