1 /* $OpenBSD: ioev.c,v 1.42 2019/06/12 17:42:53 eric Exp $ */
3 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
5 * Permission to use, copy, modify, and distribute this software for any
6 * purpose with or without fee is hereby granted, provided that the above
7 * copyright notice and this permission notice appear in all copies.
9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 #include <sys/types.h>
21 #include <sys/queue.h>
22 #include <sys/socket.h>
34 #include "openbsd-compat.h"
39 #include <openssl/err.h>
40 #include <openssl/ssl.h>
53 #define IO_PAUSE_IN IO_IN
54 #define IO_PAUSE_OUT IO_OUT
57 #define IO_RW (IO_READ | IO_WRITE)
58 #define IO_RESET 0x10 /* internal */
59 #define IO_HELD 0x20 /* internal */
64 void (*cb)(struct io*, int, void *);
72 const char *error; /* only valid immediately on callback */
75 const char* io_strflags(int);
76 const char* io_evstr(short);
79 void io_hold(struct io *);
80 void io_release(struct io *);
81 void io_callback(struct io*, int);
82 void io_dispatch(int, short, void *);
83 void io_dispatch_connect(int, short, void *);
84 size_t io_pending(struct io *);
85 size_t io_queued(struct io*);
86 void io_reset(struct io *, short, void (*)(int, short, void*));
87 void io_frame_enter(const char *, struct io *, int);
88 void io_frame_leave(struct io *);
91 void ssl_error(const char *); /* XXX external */
93 static const char* io_tls_error(void);
94 void io_dispatch_accept_tls(int, short, void *);
95 void io_dispatch_connect_tls(int, short, void *);
96 void io_dispatch_read_tls(int, short, void *);
97 void io_dispatch_write_tls(int, short, void *);
98 void io_reload_tls(struct io *io);
101 static struct io *current = NULL;
102 static uint64_t frame = 0;
103 static int _io_debug = 0;
105 #define io_debug(args...) do { if (_io_debug) printf(args); } while(0)
109 io_strio(struct io *io)
111 static char buf[128];
117 (void)snprintf(ssl, sizeof ssl, " tls=%s:%s:%d",
118 SSL_get_version(io->tls),
119 SSL_get_cipher_name(io->tls),
120 SSL_get_cipher_bits(io->tls, NULL));
124 (void)snprintf(buf, sizeof buf,
125 "<io:%p fd=%d to=%d fl=%s%s ib=%zu ob=%zu>",
126 io, io->sock, io->timeout, io_strflags(io->flags), ssl,
127 io_pending(io), io_queued(io));
132 #define CASE(x) case x : return #x
144 CASE(IO_DISCONNECTED);
148 (void)snprintf(buf, sizeof(buf), "IO_? %d", evt);
154 io_set_nonblocking(int fd)
158 if ((flags = fcntl(fd, F_GETFL)) == -1)
159 err(1, "io_set_blocking:fcntl(F_GETFL)");
163 if (fcntl(fd, F_SETFL, flags) == -1)
164 err(1, "io_set_blocking:fcntl(F_SETFL)");
168 io_set_nolinger(int fd)
172 memset(&l, 0, sizeof(l));
173 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) == -1)
174 err(1, "io_set_linger:setsockopt()");
178 * Event framing must not rely on an io pointer to refer to the "same" io
179 * throughout the frame, because this is not always the case:
181 * 1) enter(addr0) -> free(addr0) -> leave(addr0) = SEGV
182 * 2) enter(addr0) -> free(addr0) -> malloc == addr0 -> leave(addr0) = BAD!
184 * In both case, the problem is that the io is freed in the callback, so
185 * the pointer becomes invalid. If that happens, the user is required to
186 * call io_clear, so we can adapt the frame state there.
189 io_frame_enter(const char *where, struct io *io, int ev)
191 io_debug("\n=== %" PRIu64 " ===\n"
192 "io_frame_enter(%s, %s, %s)\n",
193 frame, where, io_evstr(ev), io_strio(io));
196 errx(1, "io_frame_enter: interleaved frames");
204 io_frame_leave(struct io *io)
206 io_debug("io_frame_leave(%" PRIu64 ")\n", frame);
208 if (current && current != io)
209 errx(1, "io_frame_leave: io mismatch");
211 /* io has been cleared */
215 /* TODO: There is a possible optimization there:
216 * In a typical half-duplex request/response scenario,
217 * the io is waiting to read a request, and when done, it queues
218 * the response in the output buffer and goes to write mode.
219 * There, the write event is set and will be triggered in the next
220 * event frame. In most case, the write call could be done
221 * immediately as part of the last read frame, thus avoiding to go
222 * through the event loop machinery. So, as an optimisation, we
223 * could detect that case here and force an event dispatching.
226 /* Reload the io if it has not been reset already. */
230 io_debug("=== /%" PRIu64 "\n", frame);
244 _io_debug = getenv("IO_DEBUG") != NULL;
254 if ((io = calloc(1, sizeof(*io))) == NULL)
260 if (iobuf_init(&io->iobuf, 0, 0) == -1) {
269 io_free(struct io *io)
271 io_debug("io_clear(%p)\n", io);
273 /* the current io is virtually dead */
282 if (event_initialized(&io->ev))
284 if (io->sock != -1) {
289 iobuf_clear(&io->iobuf);
294 io_hold(struct io *io)
296 io_debug("io_enter(%p)\n", io);
298 if (io->flags & IO_HELD)
299 errx(1, "io_hold: io is already held");
301 io->flags &= ~IO_RESET;
302 io->flags |= IO_HELD;
306 io_release(struct io *io)
308 if (!(io->flags & IO_HELD))
309 errx(1, "io_release: io is not held");
311 io->flags &= ~IO_HELD;
312 if (!(io->flags & IO_RESET))
317 io_set_fd(struct io *io, int fd)
325 io_set_callback(struct io *io, void(*cb)(struct io *, int, void *), void *arg)
332 io_set_timeout(struct io *io, int msec)
334 io_debug("io_set_timeout(%p, %d)\n", io, msec);
340 io_set_lowat(struct io *io, size_t lowat)
342 io_debug("io_set_lowat(%p, %zu)\n", io, lowat);
348 io_pause(struct io *io, int dir)
350 io_debug("io_pause(%p, %x)\n", io, dir);
352 io->flags |= dir & (IO_PAUSE_IN | IO_PAUSE_OUT);
357 io_resume(struct io *io, int dir)
359 io_debug("io_resume(%p, %x)\n", io, dir);
361 io->flags &= ~(dir & (IO_PAUSE_IN | IO_PAUSE_OUT));
366 io_set_read(struct io *io)
370 io_debug("io_set_read(%p)\n", io);
372 mode = io->flags & IO_RW;
373 if (!(mode == 0 || mode == IO_WRITE))
374 errx(1, "io_set_read(): full-duplex or reading");
377 io->flags |= IO_READ;
382 io_set_write(struct io *io)
386 io_debug("io_set_write(%p)\n", io);
388 mode = io->flags & IO_RW;
389 if (!(mode == 0 || mode == IO_READ))
390 errx(1, "io_set_write(): full-duplex or writing");
393 io->flags |= IO_WRITE;
398 io_error(struct io *io)
404 io_tls(struct io *io)
410 io_fileno(struct io *io)
416 io_paused(struct io *io, int what)
418 return (io->flags & (IO_PAUSE_IN | IO_PAUSE_OUT)) == what;
422 * Buffered output functions
426 io_write(struct io *io, const void *buf, size_t len)
430 r = iobuf_queue(&io->iobuf, buf, len);
438 io_writev(struct io *io, const struct iovec *iov, int iovcount)
442 r = iobuf_queuev(&io->iobuf, iov, iovcount);
450 io_print(struct io *io, const char *s)
452 return io_write(io, s, strlen(s));
456 io_printf(struct io *io, const char *fmt, ...)
462 r = io_vprintf(io, fmt, ap);
469 io_vprintf(struct io *io, const char *fmt, va_list ap)
475 len = vasprintf(&buf, fmt, ap);
478 len = io_write(io, buf, len);
485 io_queued(struct io *io)
487 return iobuf_queued(&io->iobuf);
491 * Buffered input functions
495 io_data(struct io *io)
497 return iobuf_data(&io->iobuf);
501 io_datalen(struct io *io)
503 return iobuf_len(&io->iobuf);
507 io_getline(struct io *io, size_t *sz)
509 return iobuf_getline(&io->iobuf, sz);
513 io_drop(struct io *io, size_t sz)
515 return iobuf_drop(&io->iobuf, sz);
519 #define IO_READING(io) (((io)->flags & IO_RW) != IO_WRITE)
520 #define IO_WRITING(io) (((io)->flags & IO_RW) != IO_READ)
523 * Setup the necessary events as required by the current io state,
524 * honouring duplex mode and i/o pauses.
527 io_reload(struct io *io)
531 /* io will be reloaded at release time */
532 if (io->flags & IO_HELD)
535 iobuf_normalize(&io->iobuf);
544 io_debug("io_reload(%p)\n", io);
547 if (IO_READING(io) && !(io->flags & IO_PAUSE_IN))
549 if (IO_WRITING(io) && !(io->flags & IO_PAUSE_OUT) && io_queued(io))
552 io_reset(io, events, io_dispatch);
555 /* Set the requested event. */
557 io_reset(struct io *io, short events, void (*dispatch)(int, short, void*))
559 struct timeval tv, *ptv;
561 io_debug("io_reset(%p, %s, %p) -> %s\n",
562 io, io_evstr(events), dispatch, io_strio(io));
565 * Indicate that the event has already been reset so that reload
566 * is not called on frame_leave.
568 io->flags |= IO_RESET;
570 if (event_initialized(&io->ev) &&
571 event_pending(&io->ev, EV_READ|EV_WRITE, NULL))
575 * The io is paused by the user, so we don't want the timeout to be
581 event_set(&io->ev, io->sock, events, dispatch, io);
582 if (io->timeout >= 0) {
583 tv.tv_sec = io->timeout / 1000;
584 tv.tv_usec = (io->timeout % 1000) * 1000;
589 event_add(&io->ev, ptv);
593 io_pending(struct io *io)
595 return iobuf_len(&io->iobuf);
599 io_strflags(int flags)
605 switch (flags & IO_RW) {
607 (void)strlcat(buf, "rw", sizeof buf);
610 (void)strlcat(buf, "R", sizeof buf);
613 (void)strlcat(buf, "W", sizeof buf);
616 (void)strlcat(buf, "RW", sizeof buf);
620 if (flags & IO_PAUSE_IN)
621 (void)strlcat(buf, ",F_PI", sizeof buf);
622 if (flags & IO_PAUSE_OUT)
623 (void)strlcat(buf, ",F_PO", sizeof buf);
639 (void)strlcat(buf, "<NONE>", sizeof(buf));
643 if (ev & EV_TIMEOUT) {
644 (void)strlcat(buf, "EV_TIMEOUT", sizeof(buf));
651 (void)strlcat(buf, "|", sizeof(buf));
652 (void)strlcat(buf, "EV_READ", sizeof(buf));
659 (void)strlcat(buf, "|", sizeof(buf));
660 (void)strlcat(buf, "EV_WRITE", sizeof(buf));
665 if (ev & EV_SIGNAL) {
667 (void)strlcat(buf, "|", sizeof(buf));
668 (void)strlcat(buf, "EV_SIGNAL", sizeof(buf));
675 (void)strlcat(buf, "|", sizeof(buf));
676 (void)strlcat(buf, "EV_?=0x", sizeof(buf));
677 (void)snprintf(buf2, sizeof(buf2), "%hx", ev);
678 (void)strlcat(buf, buf2, sizeof(buf));
685 io_dispatch(__unused int fd, short ev, void *humppa)
687 struct io *io = humppa;
692 io_frame_enter("io_dispatch", io, ev);
694 if (ev == EV_TIMEOUT) {
695 io_callback(io, IO_TIMEOUT);
699 if (ev & EV_WRITE && (w = io_queued(io))) {
700 if ((n = iobuf_write(&io->iobuf, io->sock)) < 0) {
701 if (n == IOBUF_WANT_WRITE) /* kqueue bug? */
703 if (n == IOBUF_CLOSED)
704 io_callback(io, IO_DISCONNECTED);
707 io->error = strerror(errno);
709 io_callback(io, IO_ERROR);
713 if (w > io->lowat && w - n <= io->lowat)
714 io_callback(io, IO_LOWAT);
719 iobuf_normalize(&io->iobuf);
720 if ((n = iobuf_read(&io->iobuf, io->sock)) < 0) {
721 if (n == IOBUF_CLOSED)
722 io_callback(io, IO_DISCONNECTED);
725 io->error = strerror(errno);
727 io_callback(io, IO_ERROR);
732 io_callback(io, IO_DATAIN);
740 io_callback(struct io *io, int evt)
742 io->cb(io, evt, io->arg);
746 io_connect(struct io *io, const struct sockaddr *sa, const struct sockaddr *bsa)
748 int sock, errno_save;
750 if ((sock = socket(sa->sa_family, SOCK_STREAM, 0)) == -1)
753 io_set_nonblocking(sock);
754 io_set_nolinger(sock);
756 if (bsa && bind(sock, bsa, SA_LEN(bsa)) == -1)
759 if (connect(sock, sa, SA_LEN(sa)) == -1)
760 if (errno != EINPROGRESS)
764 io_reset(io, EV_WRITE, io_dispatch_connect);
773 io->error = strerror(errno);
779 io_dispatch_connect(int fd, short ev, void *humppa)
781 struct io *io = humppa;
785 io_frame_enter("io_dispatch_connect", io, ev);
787 if (ev == EV_TIMEOUT) {
790 io_callback(io, IO_TIMEOUT);
793 r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &e, &sl);
795 warn("io_dispatch_connect: getsockopt");
801 io->error = strerror(e);
802 io_callback(io, e == ETIMEDOUT ? IO_TIMEOUT : IO_ERROR);
805 io->state = IO_STATE_UP;
806 io_callback(io, IO_CONNECTED);
818 static char buf[128];
821 e = ERR_peek_last_error();
823 ERR_error_string(e, buf);
827 return ("No TLS error");
831 io_start_tls(struct io *io, void *tls)
835 mode = io->flags & IO_RW;
836 if (mode == 0 || mode == IO_RW)
837 errx(1, "io_start_tls(): full-duplex or unset");
840 errx(1, "io_start_tls(): TLS already started");
843 if (SSL_set_fd(io->tls, io->sock) == 0) {
844 ssl_error("io_start_tls:SSL_set_fd");
848 if (mode == IO_WRITE) {
849 io->state = IO_STATE_CONNECT_TLS;
850 SSL_set_connect_state(io->tls);
851 io_reset(io, EV_WRITE, io_dispatch_connect_tls);
853 io->state = IO_STATE_ACCEPT_TLS;
854 SSL_set_accept_state(io->tls);
855 io_reset(io, EV_READ, io_dispatch_accept_tls);
862 io_dispatch_accept_tls(int fd, short event, void *humppa)
864 struct io *io = humppa;
867 io_frame_enter("io_dispatch_accept_tls", io, event);
869 if (event == EV_TIMEOUT) {
870 io_callback(io, IO_TIMEOUT);
874 if ((ret = SSL_accept(io->tls)) > 0) {
875 io->state = IO_STATE_UP;
876 io_callback(io, IO_TLSREADY);
880 switch ((e = SSL_get_error(io->tls, ret))) {
881 case SSL_ERROR_WANT_READ:
882 io_reset(io, EV_READ, io_dispatch_accept_tls);
884 case SSL_ERROR_WANT_WRITE:
885 io_reset(io, EV_WRITE, io_dispatch_accept_tls);
888 io->error = io_tls_error();
889 ssl_error("io_dispatch_accept_tls:SSL_accept");
890 io_callback(io, IO_ERROR);
899 io_dispatch_connect_tls(int fd, short event, void *humppa)
901 struct io *io = humppa;
904 io_frame_enter("io_dispatch_connect_tls", io, event);
906 if (event == EV_TIMEOUT) {
907 io_callback(io, IO_TIMEOUT);
911 if ((ret = SSL_connect(io->tls)) > 0) {
912 io->state = IO_STATE_UP;
913 io_callback(io, IO_TLSREADY);
917 switch ((e = SSL_get_error(io->tls, ret))) {
918 case SSL_ERROR_WANT_READ:
919 io_reset(io, EV_READ, io_dispatch_connect_tls);
921 case SSL_ERROR_WANT_WRITE:
922 io_reset(io, EV_WRITE, io_dispatch_connect_tls);
925 io->error = io_tls_error();
926 ssl_error("io_dispatch_connect_ssl:SSL_connect");
927 io_callback(io, IO_TLSERROR);
936 io_dispatch_read_tls(int fd, short event, void *humppa)
938 struct io *io = humppa;
941 io_frame_enter("io_dispatch_read_tls", io, event);
943 if (event == EV_TIMEOUT) {
944 io_callback(io, IO_TIMEOUT);
949 iobuf_normalize(&io->iobuf);
950 switch ((n = iobuf_read_tls(&io->iobuf, (SSL*)io->tls))) {
951 case IOBUF_WANT_READ:
952 io_reset(io, EV_READ, io_dispatch_read_tls);
954 case IOBUF_WANT_WRITE:
955 io_reset(io, EV_WRITE, io_dispatch_read_tls);
958 io_callback(io, IO_DISCONNECTED);
962 io->error = strerror(errno);
964 io_callback(io, IO_ERROR);
967 io->error = io_tls_error();
968 ssl_error("io_dispatch_read_tls:SSL_read");
969 io_callback(io, IO_ERROR);
972 io_debug("io_dispatch_read_tls(...) -> r=%d\n", n);
973 io_callback(io, IO_DATAIN);
974 if (current == io && IO_READING(io) && SSL_pending(io->tls))
983 io_dispatch_write_tls(int fd, short event, void *humppa)
985 struct io *io = humppa;
989 io_frame_enter("io_dispatch_write_tls", io, event);
991 if (event == EV_TIMEOUT) {
992 io_callback(io, IO_TIMEOUT);
997 switch ((n = iobuf_write_tls(&io->iobuf, (SSL*)io->tls))) {
998 case IOBUF_WANT_READ:
999 io_reset(io, EV_READ, io_dispatch_write_tls);
1001 case IOBUF_WANT_WRITE:
1002 io_reset(io, EV_WRITE, io_dispatch_write_tls);
1005 io_callback(io, IO_DISCONNECTED);
1008 saved_errno = errno;
1009 io->error = strerror(errno);
1010 errno = saved_errno;
1011 io_callback(io, IO_ERROR);
1013 case IOBUF_TLSERROR:
1014 io->error = io_tls_error();
1015 ssl_error("io_dispatch_write_tls:SSL_write");
1016 io_callback(io, IO_ERROR);
1019 io_debug("io_dispatch_write_tls(...) -> w=%d\n", n);
1021 if (w > io->lowat && w2 <= io->lowat)
1022 io_callback(io, IO_LOWAT);
1031 io_reload_tls(struct io *io)
1034 void (*dispatch)(int, short, void*) = NULL;
1036 switch (io->state) {
1037 case IO_STATE_CONNECT_TLS:
1039 dispatch = io_dispatch_connect_tls;
1041 case IO_STATE_ACCEPT_TLS:
1043 dispatch = io_dispatch_accept_tls;
1047 if (IO_READING(io) && !(io->flags & IO_PAUSE_IN)) {
1049 dispatch = io_dispatch_read_tls;
1051 else if (IO_WRITING(io) && !(io->flags & IO_PAUSE_OUT) &&
1054 dispatch = io_dispatch_write_tls;
1057 return; /* paused */
1060 errx(1, "io_reload_tls(): bad state");
1063 io_reset(io, ev, dispatch);