From af46180272295b46d1e455944295b3b6f24f40a2 Mon Sep 17 00:00:00 2001 From: Leonid Evdokimov Date: Sun, 3 Apr 2016 16:39:11 +0300 Subject: [PATCH] Add naive zero-copy implementation using `splice` It gives ~33% increase of throughput on CPU-bound box. E.g. following machine single-connection throughput goes from ~30 Mbit/s to ~40 Mbit/s system type: xRX200 rev 1.2 machine: TDW8980 - TP-LINK TD-W8980 cpu model: MIPS 34Kc V5.6 BogoMIPS: 332.54 --- Makefile | 3 +- debug.c | 12 ++ http-relay.c | 16 +- log.c | 2 +- log.h | 2 + main.c | 8 + redsocks.c | 481 +++++++++++++++++++++++++++++++++++++++--- redsocks.conf.example | 4 + redsocks.h | 52 ++++- socks5.c | 6 +- 10 files changed, 546 insertions(+), 40 deletions(-) diff --git a/Makefile b/Makefile index 141dc63..f64db91 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,8 @@ VERSION := 0.4 # -levent_core may be used for space reduction LIBS := -levent CFLAGS += -g -O2 -override CFLAGS += -std=c99 -D_XOPEN_SOURCE=600 -D_BSD_SOURCE -D_DEFAULT_SOURCE -Wall +# _GNU_SOURCE is used to get splice(2), it also implies _BSD_SOURCE +override CFLAGS += -std=c99 -D_XOPEN_SOURCE=600 -D_DEFAULT_SOURCE -D_GNU_SOURCE -Wall all: $(OUT) diff --git a/debug.c b/debug.c index 187c548..af0ad5c 100644 --- a/debug.c +++ b/debug.c @@ -73,6 +73,13 @@ static parser_section debug_conf_section = .onexit = debug_onexit, }; +static void debug_pipe(struct evhttp_request *req, void *arg) +{ + UNUSED(arg); + int fds[2]; + int code = (pipe(fds) == 0) ? HTTP_OK : HTTP_SERVUNAVAIL; + evhttp_send_reply(req, code, NULL, NULL); +} static void debug_meminfo_json(struct evhttp_request *req, void *arg) { UNUSED(arg); @@ -125,6 +132,11 @@ static int debug_init(struct event_base* evbase) goto fail; } + if (evhttp_set_cb(instance.http_server, "/debug/pipe", debug_pipe, NULL) != 0) { + log_errno(LOG_ERR, "evhttp_set_cb()"); + goto fail; + } + if (evhttp_set_cb(instance.http_server, "/debug/meminfo.json", debug_meminfo_json, NULL) != 0) { log_errno(LOG_ERR, "evhttp_set_cb()"); goto fail; diff --git a/http-relay.c b/http-relay.c index 11e23f2..23f3f47 100644 --- a/http-relay.c +++ b/http-relay.c @@ -99,7 +99,7 @@ static int httpr_buffer_append(httpr_buffer *buff, const char *data, int len) static void httpr_client_init(redsocks_client *client) { - httpr_client *httpr = (void*)(client + 1); + httpr_client *httpr = red_payload(client); client->state = httpr_new; memset(httpr, 0, sizeof(*httpr)); @@ -109,7 +109,7 @@ static void httpr_client_init(redsocks_client *client) static void httpr_client_fini(redsocks_client *client) { - httpr_client *httpr = (void*)(client + 1); + httpr_client *httpr = red_payload(client); free(httpr->firstline); httpr->firstline = NULL; @@ -144,7 +144,7 @@ static char *get_auth_request_header(struct evbuffer *buf) static void httpr_relay_read_cb(struct bufferevent *buffev, void *_arg) { redsocks_client *client = _arg; - httpr_client *httpr = (void*)(client + 1); + httpr_client *httpr = red_payload(client); int dropped = 0; assert(client->state >= httpr_request_sent); @@ -263,7 +263,7 @@ static void httpr_relay_read_cb(struct bufferevent *buffev, void *_arg) static void httpr_relay_write_cb(struct bufferevent *buffev, void *_arg) { redsocks_client *client = _arg; - httpr_client *httpr = (void*)(client + 1); + httpr_client *httpr = red_payload(client); int len = 0; assert(client->state >= httpr_recv_request_headers); @@ -365,7 +365,7 @@ static void httpr_relay_write_cb(struct bufferevent *buffev, void *_arg) // drops client on failure static int httpr_append_header(redsocks_client *client, char *line) { - httpr_client *httpr = (void*)(client + 1); + httpr_client *httpr = red_payload(client); if (httpr_buffer_append(&httpr->client_buffer, line, strlen(line)) != 0) return -1; @@ -392,7 +392,7 @@ static char *fmt_http_host(struct sockaddr_in addr) static int httpr_toss_http_firstline(redsocks_client *client) { - httpr_client *httpr = (void*)(client + 1); + httpr_client *httpr = red_payload(client); char *uri = NULL; char *host = httpr->has_host ? httpr->host : fmt_http_host(client->destaddr); @@ -439,7 +439,7 @@ fail: static void httpr_client_read_content(struct bufferevent *buffev, redsocks_client *client) { - httpr_client *httpr = (void*)(client + 1); + httpr_client *httpr = red_payload(client); static int post_buffer_len = 64 * 1024; char *post_buffer = calloc(post_buffer_len, 1); @@ -476,7 +476,7 @@ static void httpr_client_read_content(struct bufferevent *buffev, redsocks_clien static void httpr_client_read_cb(struct bufferevent *buffev, void *_arg) { redsocks_client *client = _arg; - httpr_client *httpr = (void*)(client + 1); + httpr_client *httpr = red_payload(client); redsocks_touch_client(client); diff --git a/log.c b/log.c index f9769be..cada8db 100644 --- a/log.c +++ b/log.c @@ -84,7 +84,7 @@ static log_func log_msg_next = NULL; static bool should_log_info = true; static bool should_log_debug = false; -static bool should_log(int priority) +bool should_log(int priority) { return (priority != LOG_DEBUG && priority != LOG_INFO) || (priority == LOG_DEBUG && should_log_debug) diff --git a/log.h b/log.h index 63308d2..259422e 100644 --- a/log.h +++ b/log.h @@ -13,6 +13,8 @@ extern const char *error_lowmem; int log_preopen(const char *dst, bool log_debug, bool log_info); void log_open(); +bool should_log(int priority); + void _log_vwrite(const char *file, int line, const char *func, int do_errno, int priority, const char *fmt, va_list ap); void _log_write(const char *file, int line, const char *func, int do_errno, int priority, const char *fmt, ...) diff --git a/main.c b/main.c index 82f94dc..3a56199 100644 --- a/main.c +++ b/main.c @@ -97,6 +97,10 @@ int main(int argc, char **argv) } } + if (event_get_struct_event_size() != sizeof(struct event)) { + puts("libevent event_get_struct_event_size() != sizeof(struct event)! Check `redsocks -v` and recompile redsocks"); + return EXIT_FAILURE; + } FILE *f = fopen(confname, "r"); if (!f) { @@ -153,6 +157,10 @@ int main(int argc, char **argv) } } + if (LIBEVENT_VERSION_NUMBER != event_get_version_number()) { + log_error(LOG_WARNING, "libevent version mismatch! headers %8x, runtime %8x\n", LIBEVENT_VERSION_NUMBER, event_get_version_number()); + } + log_error(LOG_NOTICE, "redsocks started"); event_dispatch(); diff --git a/redsocks.c b/redsocks.c index aeac74c..0c038ec 100644 --- a/redsocks.c +++ b/redsocks.c @@ -15,16 +15,19 @@ */ #include +#include #include #include #include #include +#include #include #include #include #include #include #include +#include #include #include "list.h" #include "parser.h" @@ -45,6 +48,8 @@ enum pump_state_t { static void redsocks_shutdown(redsocks_client *client, struct bufferevent *buffev, int how); static const char *redsocks_event_str(unsigned short what); +static int redsocks_start_bufferpump(redsocks_client *client); +static int redsocks_start_splicepump(redsocks_client *client); extern relay_subsys http_connect_subsys; @@ -71,6 +76,7 @@ static parser_entry redsocks_entries[] = { .key = "login", .type = pt_pchar }, { .key = "password", .type = pt_pchar }, { .key = "listenq", .type = pt_uint16 }, + { .key = "splice", .type = pt_bool }, { .key = "min_accept_backoff", .type = pt_uint16 }, { .key = "max_accept_backoff", .type = pt_uint16 }, { } @@ -102,6 +108,28 @@ static int tracked_event_del(struct tracked_event *tev) return ret; } +static bool is_splice_good() +{ + struct utsname u; + if (uname(&u) != 0) { + return false; + } + + unsigned long int v[4] = { 0, 0, 0, 0 }; + char *rel = u.release; + for (int i = 0; i < SIZEOF_ARRAY(v); ++i) { + v[i] = strtoul(rel, &rel, 0); + while (*rel && !isdigit(*rel)) + ++rel; + } + + // haproxy assumes that splice "works" for 2.6.27.13+ + return (v[0] > 2) || + (v[0] == 2 && v[1] > 6) || + (v[0] == 2 && v[1] == 6 && v[2] > 27) || + (v[0] == 2 && v[1] == 6 && v[2] == 27 && v[3] >= 13); +} + static int redsocks_onenter(parser_section *section) { // FIXME: find proper way to calulate instance_payload_len @@ -129,6 +157,7 @@ static int redsocks_onenter(parser_section *section) instance->config.listenq = SOMAXCONN; instance->config.min_backoff_ms = 100; instance->config.max_backoff_ms = 60000; + instance->config.use_splice = is_splice_good(); for (parser_entry *entry = §ion->entries[0]; entry->key; entry++) entry->addr = @@ -140,6 +169,7 @@ static int redsocks_onenter(parser_section *section) (strcmp(entry->key, "login") == 0) ? (void*)&instance->config.login : (strcmp(entry->key, "password") == 0) ? (void*)&instance->config.password : (strcmp(entry->key, "listenq") == 0) ? (void*)&instance->config.listenq : + (strcmp(entry->key, "splice") == 0) ? (void*)&instance->config.use_splice : (strcmp(entry->key, "min_accept_backoff") == 0) ? (void*)&instance->config.min_backoff_ms : (strcmp(entry->key, "max_accept_backoff") == 0) ? (void*)&instance->config.max_backoff_ms : NULL; @@ -210,6 +240,9 @@ void redsocks_log_write_plain( const struct sockaddr_in *clientaddr, const struct sockaddr_in *destaddr, int priority, const char *orig_fmt, ... ) { + if (!should_log(priority)) + return; + int saved_errno = errno; struct evbuffer *fmt = evbuffer_new(); va_list ap; @@ -241,7 +274,18 @@ void redsocks_touch_client(redsocks_client *client) redsocks_time(&client->last_event); } -static inline const char* bufname(redsocks_client *client, struct bufferevent *buf) +static bool shut_both(redsocks_client *client) +{ + return client->relay_evshut == (EV_READ|EV_WRITE) && client->client_evshut == (EV_READ|EV_WRITE); +} + +static int bufprio(redsocks_client *client, struct bufferevent *buffev) +{ + // client errors are logged with LOG_INFO, server errors with LOG_NOTICE + return (buffev == client->client) ? LOG_INFO : LOG_NOTICE; +} + +static const char* bufname(redsocks_client *client, struct bufferevent *buf) { assert(buf == client->client || buf == client->relay); return buf == client->client ? "client" : "relay"; @@ -281,7 +325,6 @@ static void redsocks_relay_writecb(redsocks_client *client, struct bufferevent * } } - static void redsocks_relay_relayreadcb(struct bufferevent *from, void *_client) { redsocks_client *client = _client; @@ -312,13 +355,20 @@ static void redsocks_relay_clientwritecb(struct bufferevent *to, void *_client) void redsocks_start_relay(redsocks_client *client) { - int error; - if (client->instance->relay_ss->fini) client->instance->relay_ss->fini(client); client->state = pump_active; + int error = ((client->instance->config.use_splice) ? redsocks_start_splicepump : redsocks_start_bufferpump)(client); + if (!error) + redsocks_log_error(client, LOG_DEBUG, "data relaying started"); + else + redsocks_drop_client(client); +} + +static int redsocks_start_bufferpump(redsocks_client *client) +{ // wm_write.high is respected by libevent-2.0.22 only for ssl and filters, // so it's implemented in redsocks callbacks. wm_read.high works as expected. bufferevent_setwatermark(client->client, EV_READ|EV_WRITE, 0, REDSOCKS_RELAY_HALFBUFF); @@ -329,19 +379,303 @@ void redsocks_start_relay(redsocks_client *client) client->relay->readcb = redsocks_relay_relayreadcb; client->relay->writecb = redsocks_relay_relaywritecb; - error = bufferevent_enable(client->client, (EV_READ|EV_WRITE) & ~(client->client_evshut)); + int error = bufferevent_enable(client->client, (EV_READ|EV_WRITE) & ~(client->client_evshut)); if (!error) error = bufferevent_enable(client->relay, (EV_READ|EV_WRITE) & ~(client->relay_evshut)); - - if (!error) { - redsocks_log_error(client, LOG_DEBUG, "data relaying started"); - } - else { + if (error) redsocks_log_errno(client, LOG_ERR, "bufferevent_enable"); - redsocks_drop_client(client); + return error; +} + +static int pipeprio(redsocks_pump *pump, int fd) +{ + // client errors are logged with LOG_INFO, server errors with LOG_NOTICE + return (fd == event_get_fd(&pump->client_read)) ? LOG_INFO : LOG_NOTICE; +} + +static const char* pipename(redsocks_pump *pump, int fd) +{ + return (fd == event_get_fd(&pump->client_read)) ? "client" : "relay"; +} + +static void bufferevent_free_unused(struct bufferevent **p) { + if (*p && !evbuffer_get_length((*p)->input) && !evbuffer_get_length((*p)->output)) { + bufferevent_free(*p); + *p = NULL; } } +static bool would_block(int e) { + return e == EAGAIN || e == EWOULDBLOCK; +} + +typedef struct redsplice_write_ctx_t { + // drain ebsrc[0], ebsrc[1], pisrc in that order + struct evbuffer *ebsrc[2]; + splice_pipe *pisrc; + struct event *evsrc; + struct event *evdst; + const evshut_t *shut_src; + evshut_t *shut_dst; +} redsplice_write_ctx; + +static void redsplice_write_cb(redsocks_pump *pump, redsplice_write_ctx *c, int out) +{ + bool has_data = false; // there is some pending data to be written + bool can_write = true; // socket SEEMS TO BE writable + + // short write -- goto read/write management + // write error -- drop client alltogether + // full write -- take next buffer + // got EOF & no data -- relay EOF + + for (int i = 0; i < SIZEOF_ARRAY(c->ebsrc); ++i) { + struct evbuffer *ebsrc = c->ebsrc[i]; + if (ebsrc) { + const size_t avail = evbuffer_get_length(ebsrc); + has_data = !!avail; + if (avail) { + const ssize_t sent = evbuffer_write(ebsrc, out); + if (sent == -1) { + if (would_block(errno)) { // short (zero) write + can_write = false; + goto decide; + } else { + redsocks_log_errno(&pump->c, pipeprio(pump, out), "evbuffer_write(to %s)", pipename(pump, out)); + redsocks_drop_client(&pump->c); + return; + } + } else if (avail == sent) { + has_data = false; // unless stated otherwise + } else { // short write + can_write = false; + goto decide; + } + } + } + } + + do { + has_data = !!c->pisrc->size; + const size_t avail = c->pisrc->size; + if (avail) { + const ssize_t sent = splice(c->pisrc->read, NULL, out, NULL, avail, SPLICE_F_MOVE|SPLICE_F_NONBLOCK); + if (sent == -1) { + if (would_block(errno)) { // short (zero) write + can_write = false; + goto decide; + } else { + redsocks_log_errno(&pump->c, pipeprio(pump, out), "splice(to %s)", pipename(pump, out)); + redsocks_drop_client(&pump->c); + return; + } + } else { + c->pisrc->size -= sent; + if (avail == sent) { + has_data = false; + } else { // short write + can_write = false; + goto decide; + } + } + } + } while (0); + +decide: + if (!has_data && (*c->shut_src & EV_READ) && !(*c->shut_dst & EV_WRITE)) { + if (shutdown(out, SHUT_WR) != 0) { + redsocks_log_errno(&pump->c, LOG_ERR, "shutdown(%s, SHUT_WR)", pipename(pump, out)); + } + *c->shut_dst |= EV_WRITE; + can_write = false; + assert(!c->pisrc->size); + redsocks_close(c->pisrc->read); + c->pisrc->read = -1; + redsocks_close(c->pisrc->write); + c->pisrc->write = -1; + if (shut_both(&pump->c)) { + redsocks_drop_client(&pump->c); + return; + } + } + + assert(!(can_write && has_data)); // incomplete write to writable socket + + if (!can_write && has_data) { + if (event_pending(c->evsrc, EV_READ, NULL)) + redsocks_log_error(&pump->c, LOG_DEBUG, "backpressure: event_del(%s_read)", pipename(pump, event_get_fd(c->evsrc))); + redsocks_event_del(&pump->c, c->evsrc); + redsocks_event_add(&pump->c, c->evdst); + } else if (can_write && !has_data) { + if (!event_pending(c->evsrc, EV_READ, NULL)) + redsocks_log_error(&pump->c, LOG_DEBUG, "backpressure: event_add(%s_read)", pipename(pump, event_get_fd(c->evsrc))); + redsocks_event_add(&pump->c, c->evsrc); + redsocks_event_del(&pump->c, c->evdst); + } else if (!can_write && !has_data) { // something like EOF + redsocks_event_del(&pump->c, c->evsrc); + redsocks_event_del(&pump->c, c->evdst); + } +} + +typedef struct redsplice_read_ctx_t { + splice_pipe *dst; + struct event *evsrc; + struct event *evdst; + evshut_t *shut_src; +} redsplice_read_ctx; + +static void redsplice_read_cb(redsocks_pump *pump, redsplice_read_ctx *c, int in) +{ + const size_t pipesize = 1048576; // some default value from fs.pipe-max-size + const ssize_t got = splice(in, NULL, c->dst->write, NULL, pipesize, SPLICE_F_MOVE|SPLICE_F_NONBLOCK); + if (got == -1) { + if (would_block(errno)) { + // there is data at `in', but pipe is full + if (!event_pending(c->evsrc, EV_READ, NULL)) + redsocks_log_error(&pump->c, LOG_DEBUG, "backpressure: event_del(%s_read)", pipename(pump, event_get_fd(c->evsrc))); + redsocks_event_del(&pump->c, c->evsrc); + } else { + redsocks_log_errno(&pump->c, pipeprio(pump, in), "splice(from %s)", pipename(pump, in)); + redsocks_drop_client(&pump->c); + } + return; + } + + if (got == 0) { // got EOF + if (shutdown(in, SHUT_RD) != 0) { + if (errno != ENOTCONN) { // do not log common case for splice() + redsocks_log_errno(&pump->c, LOG_DEBUG, "shutdown(%s, SHUT_RD) after EOF", pipename(pump, in)); + } + } + *c->shut_src |= EV_READ; + redsocks_event_del(&pump->c, c->evsrc); + } else { + c->dst->size += got; + } + event_active(c->evdst, EV_WRITE, 0); +} + +static void redsocks_touch_pump(redsocks_pump *pump) +{ + redsocks_touch_client(&pump->c); + bufferevent_free_unused(&pump->c.client); + bufferevent_free_unused(&pump->c.relay); +} + +static void redsplice_relay_read(int fd, short what, void *_pump) +{ + redsocks_pump *pump = _pump; + assert(fd == event_get_fd(&pump->relay_read) && (what & EV_READ)); + redsocks_touch_pump(pump); + redsplice_read_ctx c = { + .dst = &pump->reply, + .evsrc = &pump->relay_read, + .evdst = &pump->client_write, + .shut_src = &pump->c.relay_evshut, + }; + redsplice_read_cb(pump, &c, fd); +} + +static void redsplice_client_read(int fd, short what, void *_pump) +{ + redsocks_pump *pump = _pump; + assert(fd == event_get_fd(&pump->client_read) && (what & EV_READ)); + redsocks_touch_pump(pump); + redsplice_read_ctx c = { + .dst = &pump->request, + .evsrc = &pump->client_read, + .evdst = &pump->relay_write, + .shut_src = &pump->c.client_evshut, + }; + redsplice_read_cb(pump, &c, fd); +} + +static void redsplice_relay_write(int fd, short what, void *_pump) +{ + redsocks_pump *pump = _pump; + assert(fd == event_get_fd(&pump->relay_write) && (what & EV_WRITE)); + redsocks_touch_pump(pump); + redsplice_write_ctx c = { + .ebsrc = { + pump->c.relay ? pump->c.relay->output : NULL, + pump->c.client ? pump->c.client->input : NULL, + }, + .pisrc = &pump->request, + .evsrc = &pump->client_read, + .evdst = &pump->relay_write, + .shut_src = &pump->c.client_evshut, + .shut_dst = &pump->c.relay_evshut, + }; + redsplice_write_cb(pump, &c, fd); +} + +static void redsplice_client_write(int fd, short what, void *_pump) +{ + redsocks_pump *pump = _pump; + assert(fd == event_get_fd(&pump->client_write) && (what & EV_WRITE)); + redsocks_touch_pump(pump); + redsplice_write_ctx c = { + .ebsrc = { + pump->c.client ? pump->c.client->output : NULL, + pump->c.relay ? pump->c.relay->input : NULL, + }, + .pisrc = &pump->reply, + .evsrc = &pump->relay_read, + .evdst = &pump->client_write, + .shut_src = &pump->c.relay_evshut, + .shut_dst = &pump->c.client_evshut, + }; + redsplice_write_cb(pump, &c, fd); +} + +static int redsocks_start_splicepump(redsocks_client *client) +{ + int error = bufferevent_disable(client->client, EV_READ|EV_WRITE); + if (!error) + error = bufferevent_disable(client->relay, EV_READ|EV_WRITE); + if (error) { + redsocks_log_errno(client, LOG_ERR, "bufferevent_disable"); + return error; + } + + redsocks_pump *pump = red_pump(client); + if (!error) + error = pipe2(&pump->request.read, O_NONBLOCK); + if (!error) + error = pipe2(&pump->reply.read, O_NONBLOCK); + if (error) { + redsocks_log_errno(client, LOG_ERR, "pipe2"); + return error; + } + + struct event_base *base = NULL; + const int relay_fd = bufferevent_getfd(client->relay); + const int client_fd = bufferevent_getfd(client->client); + if (!error) + error = event_assign(&pump->client_read, base, client_fd, EV_READ|EV_PERSIST, redsplice_client_read, pump); + if (!error) + error = event_assign(&pump->client_write, base, client_fd, EV_WRITE|EV_PERSIST, redsplice_client_write, pump); + if (!error) + error = event_assign(&pump->relay_read, base, relay_fd, EV_READ|EV_PERSIST, redsplice_relay_read, pump); + if (!error) + error = event_assign(&pump->relay_write, base, relay_fd, EV_WRITE|EV_PERSIST, redsplice_relay_write, pump); + if (error) { + redsocks_log_errno(client, LOG_ERR, "event_assign"); + return error; + } + + redsocks_bufferevent_dropfd(client, client->relay); + redsocks_bufferevent_dropfd(client, client->client); + + // flush buffers (if any) and enable EV_READ callbacks + event_active(&pump->client_write, EV_WRITE, 0); + event_active(&pump->relay_write, EV_WRITE, 0); + redsocks_event_add(&pump->c, &pump->client_read); + redsocks_event_add(&pump->c, &pump->relay_read); + + return 0; +} + static bool has_loopback_destination(redsocks_client *client) { const uint32_t net = ntohl(client->destaddr.sin_addr.s_addr) >> 24; @@ -351,7 +685,7 @@ static bool has_loopback_destination(redsocks_client *client) void redsocks_drop_client(redsocks_client *client) { - if (client->relay_evshut == (EV_READ|EV_WRITE) && client->client_evshut == (EV_READ|EV_WRITE)) { + if (shut_both(client)) { redsocks_log_error(client, LOG_INFO, "connection closed"); } else { if (has_loopback_destination(client)) { @@ -378,6 +712,38 @@ void redsocks_drop_client(redsocks_client *client) if (client->relay) redsocks_bufferevent_free(client->relay); + if (client->instance->config.use_splice) { + redsocks_pump *pump = red_pump(client); + + if (pump->request.read != -1) + redsocks_close(pump->request.read); + if (pump->request.write != -1) + redsocks_close(pump->request.write); + if (pump->reply.read != -1) + redsocks_close(pump->reply.read); + if (pump->reply.write != -1) + redsocks_close(pump->reply.write); + + // redsocks_close MAY log error if some of events was not properly initialized + if (event_initialized(&pump->client_read)) { + int fd = event_get_fd(&pump->client_read); + redsocks_event_del(&pump->c, &pump->client_read); + redsocks_close(fd); + } + if (event_initialized(&pump->client_write)) { + redsocks_event_del(&pump->c, &pump->client_write); + } + + if (event_initialized(&pump->relay_read)) { + int fd = event_get_fd(&pump->relay_read); + redsocks_event_del(&pump->c, &pump->relay_read); + redsocks_close(fd); + } + if (event_initialized(&pump->relay_write)) { + redsocks_event_del(&pump->c, &pump->relay_write); + } + } + list_del(&client->list); free(client); } @@ -426,7 +792,7 @@ static void redsocks_shutdown(redsocks_client *client, struct bufferevent *buffe *pevshut |= evhow; - if (client->relay_evshut == (EV_READ|EV_WRITE) && client->client_evshut == (EV_READ|EV_WRITE)) { + if (shut_both(client)) { redsocks_log_error(client, LOG_DEBUG, "both client and server disconnected"); redsocks_drop_client(client); } @@ -474,8 +840,7 @@ static void redsocks_event_error(struct bufferevent *buffev, short what, void *_ } else { errno = bakerrno; } - // client errors are logged with LOG_INFO, server errors with LOG_NOTICE - redsocks_log_errno(client, (buffev == client->client) ? LOG_INFO : LOG_NOTICE, "%s %serror, code " event_fmt_str, + redsocks_log_errno(client, bufprio(client, buffev), "%s %serror, code " event_fmt_str, bufname(client, buffev), errsrc, event_fmt(what)); @@ -659,12 +1024,39 @@ void redsocks_close_internal(int fd, const char* file, int line, const char *fun } } +void redsocks_event_add_internal(redsocks_client *client, struct event *ev, const char *file, int line, const char *func) +{ + if (event_add(ev, NULL) != 0) { + const int do_errno = 1; + redsocks_log_write_plain(file, line, func, do_errno, &(client)->clientaddr, &(client)->destaddr, LOG_WARNING, "event_add"); + } +} + +void redsocks_event_del_internal(redsocks_client *client, struct event *ev, const char *file, int line, const char *func) +{ + if (event_del(ev) != 0) { + const int do_errno = 1; + redsocks_log_write_plain(file, line, func, do_errno, &(client)->clientaddr, &(client)->destaddr, LOG_WARNING, "event_del"); + } +} + +void redsocks_bufferevent_dropfd_internal(redsocks_client *client, struct bufferevent *ev, const char *file, int line, const char *func) +{ + if (bufferevent_setfd(ev, -1) != 0) { + const int do_errno = 1; + redsocks_log_write_plain(file, line, func, do_errno, &(client)->clientaddr, &(client)->destaddr, LOG_WARNING, "bufferevent_setfd"); + } +} + void redsocks_bufferevent_free(struct bufferevent *buffev) { int fd = bufferevent_getfd(buffev); - bufferevent_setfd(buffev, -1); // to avoid EBADFD warnings from epoll + if (bufferevent_setfd(buffev, -1)) { // to avoid EBADFD warnings from epoll + log_errno(LOG_WARNING, "bufferevent_setfd"); + } bufferevent_free(buffev); - redsocks_close(fd); + if (fd != -1) + redsocks_close(fd); } static void redsocks_accept_client(int fd, short what, void *_arg) @@ -718,12 +1110,19 @@ static void redsocks_accept_client(int fd, short what, void *_arg) goto fail; // everything seems to be ok, let's allocate some memory - client = calloc(1, sizeof(redsocks_client) + self->relay_ss->payload_len); + client = calloc(1, sizeof_client(self)); if (!client) { log_errno(LOG_ERR, "calloc"); goto fail; } client->instance = self; + if (client->instance->config.use_splice) { + redsocks_pump *pump = red_pump(client); + pump->request.read = -1; + pump->request.write = -1; + pump->reply.read = -1; + pump->reply.write = -1; + } memcpy(&client->clientaddr, &clientaddr, sizeof(clientaddr)); memcpy(&client->destaddr, &destaddr, sizeof(destaddr)); INIT_LIST_HEAD(&client->list); @@ -796,15 +1195,47 @@ static void redsocks_debug_dump_instance(redsocks_instance *instance, time_t now const char *s_client_evshut = redsocks_evshut_str(client->client_evshut); const char *s_relay_evshut = redsocks_evshut_str(client->relay_evshut); - redsocks_log_error(client, LOG_NOTICE, "client: %i (%s)%s%s, relay: %i (%s)%s%s, age: %li sec, idle: %li sec.", - client->client ? event_get_fd(&client->client->ev_write) : -1, + if (!instance->config.use_splice) { + redsocks_log_error(client, LOG_NOTICE, "client: %i (%s)%s%s, relay: %i (%s)%s%s, age: %li sec, idle: %li sec.", + client->client ? bufferevent_getfd(client->client) : -1, client->client ? redsocks_event_str(client->client->enabled) : "NULL", - s_client_evshut[0] ? " " : "", s_client_evshut, - client->relay ? event_get_fd(&client->relay->ev_write) : -1, + s_client_evshut[0] ? " " : "", + s_client_evshut, + client->relay ? bufferevent_getfd(client->relay) : -1, client->relay ? redsocks_event_str(client->relay->enabled) : "NULL", - s_relay_evshut[0] ? " " : "", s_relay_evshut, - now - client->first_event, - now - client->last_event); + s_relay_evshut[0] ? " " : "", + s_relay_evshut, + now - client->first_event, + now - client->last_event); + } else { + redsocks_pump *pump = red_pump(client); + + redsocks_log_error(client, LOG_NOTICE, "client: buf %i (%s) splice %i (%s/%s) pipe[%d, %d]=%zu%s%s, relay: buf %i (%s) splice %i (%s/%s) pipe[%d, %d]=%zu%s%s, age: %li sec, idle: %li sec.", + client->client ? bufferevent_getfd(client->client) : -1, + client->client ? redsocks_event_str(client->client->enabled) : "NULL", + event_get_fd(&pump->client_read), + event_pending(&pump->client_read, EV_READ, NULL) ? "R" : "-", + event_pending(&pump->client_write, EV_WRITE, NULL) ? "W" : "-", + pump->request.read, + pump->request.write, + pump->request.size, + s_client_evshut[0] ? " " : "", + s_client_evshut, + + client->relay ? bufferevent_getfd(client->relay) : -1, + client->relay ? redsocks_event_str(client->relay->enabled) : "NULL", + event_get_fd(&pump->relay_read), + event_pending(&pump->relay_read, EV_READ, NULL) ? "R" : "-", + event_pending(&pump->relay_write, EV_WRITE, NULL) ? "W" : "-", + pump->reply.read, + pump->reply.write, + pump->reply.size, + s_relay_evshut[0] ? " " : "", + s_relay_evshut, + + now - client->first_event, + now - client->last_event); + } } log_error(LOG_NOTICE, "End of client list."); } diff --git a/redsocks.conf.example b/redsocks.conf.example index 478de4e..b9ffd94 100644 --- a/redsocks.conf.example +++ b/redsocks.conf.example @@ -57,6 +57,10 @@ redsocks { // good enough for most of us. // listenq = 128; // SOMAXCONN equals 128 on my Linux box. + // Enable or disable faster data pump based on splice(2) syscall. + // Default value depends on your kernel version, true for 2.6.27.13+ + // splice = false; + // `max_accept_backoff` is a delay to retry `accept()` after accept // failure (e.g. due to lack of file descriptors). It's measured in // milliseconds and maximal value is 65535. `min_accept_backoff` is diff --git a/redsocks.h b/redsocks.h index 2704e0c..6b06f7e 100644 --- a/redsocks.h +++ b/redsocks.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include "list.h" @@ -32,6 +33,7 @@ typedef struct redsocks_config_t { uint16_t min_backoff_ms; uint16_t max_backoff_ms; // backoff capped by 65 seconds is enough :) uint16_t listenq; + bool use_splice; } redsocks_config; struct tracked_event { @@ -49,6 +51,8 @@ typedef struct redsocks_instance_t { relay_subsys *relay_ss; } redsocks_instance; +typedef unsigned short evshut_t; // EV_READ | EV_WRITE + typedef struct redsocks_client_t { list_head list; redsocks_instance *instance; @@ -57,12 +61,47 @@ typedef struct redsocks_client_t { struct sockaddr_in clientaddr; struct sockaddr_in destaddr; int state; // it's used by bottom layer - unsigned short client_evshut; - unsigned short relay_evshut; + evshut_t client_evshut; + evshut_t relay_evshut; time_t first_event; time_t last_event; } redsocks_client; +typedef struct splice_pipe_t { + int read; + int write; + size_t size; +} splice_pipe; + +typedef struct redsocks_pump_t { + /* Quick-n-dirty test show, that some Linux 4.4.0 build uses ~1.5 kb of + * slab_unreclaimable RAM per every pipe pair. Most of connections are + * usually idle and it's possble to save some measurable amount of RAM + * using shared pipe pool. */ + redsocks_client c; + splice_pipe request; + splice_pipe reply; + struct event client_read; + struct event client_write; + struct event relay_read; + struct event relay_write; +} redsocks_pump; + +static inline size_t sizeof_client(redsocks_instance *i) +{ + return ((i->config.use_splice) ? sizeof(redsocks_pump) : sizeof(redsocks_client)) + i->relay_ss->payload_len; +} + +static inline void* red_payload(redsocks_client *c) +{ + return (c->instance->config.use_splice) ? (void*)(((redsocks_pump*)c) + 1) : (void*)(c + 1); +} + +static inline redsocks_pump* red_pump(redsocks_client *c) +{ + assert(c->instance->config.use_splice); + return (redsocks_pump*)c; +} void redsocks_drop_client(redsocks_client *client); void redsocks_touch_client(redsocks_client *client); @@ -94,6 +133,15 @@ int redsocks_write_helper( #define redsocks_close(fd) redsocks_close_internal((fd), __FILE__, __LINE__, __func__) void redsocks_close_internal(int fd, const char* file, int line, const char *func); +#define redsocks_event_add(client, ev) redsocks_event_add_internal((client), (ev), __FILE__, __LINE__, __func__) +void redsocks_event_add_internal(redsocks_client *client, struct event *ev, const char *file, int line, const char *func); + +#define redsocks_event_del(client, ev) redsocks_event_del_internal((client), (ev), __FILE__, __LINE__, __func__) +void redsocks_event_del_internal(redsocks_client *client, struct event *ev, const char *file, int line, const char *func); + +#define redsocks_bufferevent_dropfd(client, ev) redsocks_bufferevent_dropfd_internal((client), (ev), __FILE__, __LINE__, __func__) +void redsocks_bufferevent_dropfd_internal(redsocks_client *client, struct bufferevent *ev, const char *file, int line, const char *func); + // I have to account descriptiors for accept-backoff, that's why BEV_OPT_CLOSE_ON_FREE is not used. void redsocks_bufferevent_free(struct bufferevent *buffev); diff --git a/socks5.c b/socks5.c index 4b5456d..71b36e3 100644 --- a/socks5.c +++ b/socks5.c @@ -78,7 +78,7 @@ int socks5_is_valid_cred(const char *login, const char *password) void socks5_client_init(redsocks_client *client) { - socks5_client *socks5 = (void*)(client + 1); + socks5_client *socks5 = red_payload(client); const redsocks_config *config = &client->instance->config; client->state = socks5_new; @@ -87,7 +87,7 @@ void socks5_client_init(redsocks_client *client) static struct evbuffer *socks5_mkmethods(redsocks_client *client) { - socks5_client *socks5 = (void*)(client + 1); + socks5_client *socks5 = red_payload(client); return socks5_mkmethods_plain(socks5->do_password); } @@ -274,7 +274,7 @@ static void socks5_read_reply(struct bufferevent *buffev, redsocks_client *clien static void socks5_read_cb(struct bufferevent *buffev, void *_arg) { redsocks_client *client = _arg; - socks5_client *socks5 = (void*)(client + 1); + socks5_client *socks5 = red_payload(client); redsocks_touch_client(client);