1 /* $OpenBSD: ioev.c,v 1.42 2019/06/12 17:42:53 eric Exp $ */
3 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
5 * Permission to use, copy, modify, and distribute this software for any
6 * purpose with or without fee is hereby granted, provided that the above
7 * copyright notice and this permission notice appear in all copies.
9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 #include <sys/types.h>
21 #include <sys/queue.h>
22 #include <sys/socket.h>
34 #include "openbsd-compat.h"
39 #include <openssl/err.h>
40 #include <openssl/ssl.h>
53 #define IO_PAUSE_IN IO_IN
54 #define IO_PAUSE_OUT IO_OUT
57 #define IO_RW (IO_READ | IO_WRITE)
58 #define IO_RESET 0x10 /* internal */
59 #define IO_HELD 0x20 /* internal */
64 void (*cb)(struct io*, int, void *);
72 const char *error; /* only valid immediately on callback */
75 const char* io_strflags(int);
76 const char* io_evstr(short);
79 void io_hold(struct io *);
80 void io_release(struct io *);
81 void io_callback(struct io*, int);
82 void io_dispatch(int, short, void *);
83 void io_dispatch_connect(int, short, void *);
84 size_t io_pending(struct io *);
85 size_t io_queued(struct io*);
86 void io_reset(struct io *, short, void (*)(int, short, void*));
87 void io_frame_enter(const char *, struct io *, int);
88 void io_frame_leave(struct io *);
91 void ssl_error(const char *); /* XXX external */
93 static const char* io_tls_error(void);
94 void io_dispatch_accept_tls(int, short, void *);
95 void io_dispatch_connect_tls(int, short, void *);
96 void io_dispatch_read_tls(int, short, void *);
97 void io_dispatch_write_tls(int, short, void *);
98 void io_reload_tls(struct io *io);
101 static struct io *current = NULL;
102 static uint64_t frame = 0;
103 static int _io_debug = 0;
105 #define io_debug(args...) do { if (_io_debug) printf(args); } while(0)
109 io_strio(struct io *io)
111 static char buf[128];
117 (void)snprintf(ssl, sizeof ssl, " tls=%s:%s:%d",
118 SSL_get_version(io->tls),
119 SSL_get_cipher_name(io->tls),
120 SSL_get_cipher_bits(io->tls, NULL));
124 (void)snprintf(buf, sizeof buf,
125 "<io:%p fd=%d to=%d fl=%s%s ib=%zu ob=%zu>",
126 io, io->sock, io->timeout, io_strflags(io->flags), ssl,
127 io_pending(io), io_queued(io));
132 #define CASE(x) case x : return #x
144 CASE(IO_DISCONNECTED);
148 (void)snprintf(buf, sizeof(buf), "IO_? %d", evt);
154 io_set_nonblocking(int fd)
158 if ((flags = fcntl(fd, F_GETFL)) == -1)
159 err(1, "io_set_blocking:fcntl(F_GETFL)");
163 if (fcntl(fd, F_SETFL, flags) == -1)
164 err(1, "io_set_blocking:fcntl(F_SETFL)");
168 io_set_nolinger(int fd)
172 memset(&l, 0, sizeof(l));
173 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) == -1)
174 err(1, "io_set_linger:setsockopt()");
178 * Event framing must not rely on an io pointer to refer to the "same" io
179 * throughout the frame, because this is not always the case:
181 * 1) enter(addr0) -> free(addr0) -> leave(addr0) = SEGV
182 * 2) enter(addr0) -> free(addr0) -> malloc == addr0 -> leave(addr0) = BAD!
184 * In both case, the problem is that the io is freed in the callback, so
185 * the pointer becomes invalid. If that happens, the user is required to
186 * call io_clear, so we can adapt the frame state there.
189 io_frame_enter(const char *where, struct io *io, int ev)
191 io_debug("\n=== %" PRIu64 " ===\n"
192 "io_frame_enter(%s, %s, %s)\n",
193 frame, where, io_evstr(ev), io_strio(io));
196 errx(1, "io_frame_enter: interleaved frames");
204 io_frame_leave(struct io *io)
206 io_debug("io_frame_leave(%" PRIu64 ")\n", frame);
208 if (current && current != io)
209 errx(1, "io_frame_leave: io mismatch");
211 /* io has been cleared */
215 /* TODO: There is a possible optimization there:
216 * In a typical half-duplex request/response scenario,
217 * the io is waiting to read a request, and when done, it queues
218 * the response in the output buffer and goes to write mode.
219 * There, the write event is set and will be triggered in the next
220 * event frame. In most case, the write call could be done
221 * immediately as part of the last read frame, thus avoiding to go
222 * through the event loop machinery. So, as an optimisation, we
223 * could detect that case here and force an event dispatching.
226 /* Reload the io if it has not been reset already. */
230 io_debug("=== /%" PRIu64 "\n", frame);
244 _io_debug = getenv("IO_DEBUG") != NULL;
254 if ((io = calloc(1, sizeof(*io))) == NULL)
260 if (iobuf_init(&io->iobuf, 0, 0) == -1) {
269 io_free(struct io *io)
271 io_debug("io_clear(%p)\n", io);
273 /* the current io is virtually dead */
282 if (event_initialized(&io->ev))
284 if (io->sock != -1) {
289 iobuf_clear(&io->iobuf);
294 io_hold(struct io *io)
296 io_debug("io_enter(%p)\n", io);
298 if (io->flags & IO_HELD)
299 errx(1, "io_hold: io is already held");
301 io->flags &= ~IO_RESET;
302 io->flags |= IO_HELD;
306 io_release(struct io *io)
308 if (!(io->flags & IO_HELD))
309 errx(1, "io_release: io is not held");
311 io->flags &= ~IO_HELD;
312 if (!(io->flags & IO_RESET))
317 io_set_fd(struct io *io, int fd)
325 io_set_callback(struct io *io, void(*cb)(struct io *, int, void *), void *arg)
332 io_set_timeout(struct io *io, int msec)
334 io_debug("io_set_timeout(%p, %d)\n", io, msec);
340 io_set_lowat(struct io *io, size_t lowat)
342 io_debug("io_set_lowat(%p, %zu)\n", io, lowat);
348 io_pause(struct io *io, int dir)
350 io_debug("io_pause(%p, %x)\n", io, dir);
352 io->flags |= dir & (IO_PAUSE_IN | IO_PAUSE_OUT);
357 io_resume(struct io *io, int dir)
359 io_debug("io_resume(%p, %x)\n", io, dir);
361 io->flags &= ~(dir & (IO_PAUSE_IN | IO_PAUSE_OUT));
366 io_set_read(struct io *io)
370 io_debug("io_set_read(%p)\n", io);
372 mode = io->flags & IO_RW;
373 if (!(mode == 0 || mode == IO_WRITE))
374 errx(1, "io_set_read(): full-duplex or reading");
377 io->flags |= IO_READ;
382 io_set_write(struct io *io)
386 io_debug("io_set_write(%p)\n", io);
388 mode = io->flags & IO_RW;
389 if (!(mode == 0 || mode == IO_READ))
390 errx(1, "io_set_write(): full-duplex or writing");
393 io->flags |= IO_WRITE;
398 io_error(struct io *io)
404 io_tls(struct io *io)
410 io_fileno(struct io *io)
416 io_paused(struct io *io, int what)
418 return (io->flags & (IO_PAUSE_IN | IO_PAUSE_OUT)) == what;
422 * Buffered output functions
426 io_write(struct io *io, const void *buf, size_t len)
430 r = iobuf_queue(&io->iobuf, buf, len);
438 io_writev(struct io *io, const struct iovec *iov, int iovcount)
442 r = iobuf_queuev(&io->iobuf, iov, iovcount);
450 io_print(struct io *io, const char *s)
452 return io_write(io, s, strlen(s));
456 io_printf(struct io *io, const char *fmt, ...)
462 r = io_vprintf(io, fmt, ap);
469 io_vprintf(struct io *io, const char *fmt, va_list ap)
475 len = vasprintf(&buf, fmt, ap);
478 len = io_write(io, buf, len);
485 io_queued(struct io *io)
487 return iobuf_queued(&io->iobuf);
491 * Buffered input functions
495 io_data(struct io *io)
497 return iobuf_data(&io->iobuf);
501 io_datalen(struct io *io)
503 return iobuf_len(&io->iobuf);
507 io_getline(struct io *io, size_t *sz)
509 return iobuf_getline(&io->iobuf, sz);
513 io_drop(struct io *io, size_t sz)
515 return iobuf_drop(&io->iobuf, sz);
519 #define IO_READING(io) (((io)->flags & IO_RW) != IO_WRITE)
520 #define IO_WRITING(io) (((io)->flags & IO_RW) != IO_READ)
523 * Setup the necessary events as required by the current io state,
524 * honouring duplex mode and i/o pauses.
527 io_reload(struct io *io)
531 /* io will be reloaded at release time */
532 if (io->flags & IO_HELD)
535 iobuf_normalize(&io->iobuf);
544 io_debug("io_reload(%p)\n", io);
547 if (IO_READING(io) && !(io->flags & IO_PAUSE_IN))
549 if (IO_WRITING(io) && !(io->flags & IO_PAUSE_OUT) && io_queued(io))
552 io_reset(io, events, io_dispatch);
555 /* Set the requested event. */
557 io_reset(struct io *io, short events, void (*dispatch)(int, short, void*))
559 struct timeval tv, *ptv;
561 io_debug("io_reset(%p, %s, %p) -> %s\n",
562 io, io_evstr(events), dispatch, io_strio(io));
565 * Indicate that the event has already been reset so that reload
566 * is not called on frame_leave.
568 io->flags |= IO_RESET;
570 if (event_initialized(&io->ev))
574 * The io is paused by the user, so we don't want the timeout to be
580 event_set(&io->ev, io->sock, events, dispatch, io);
581 if (io->timeout >= 0) {
582 tv.tv_sec = io->timeout / 1000;
583 tv.tv_usec = (io->timeout % 1000) * 1000;
588 event_add(&io->ev, ptv);
592 io_pending(struct io *io)
594 return iobuf_len(&io->iobuf);
598 io_strflags(int flags)
604 switch (flags & IO_RW) {
606 (void)strlcat(buf, "rw", sizeof buf);
609 (void)strlcat(buf, "R", sizeof buf);
612 (void)strlcat(buf, "W", sizeof buf);
615 (void)strlcat(buf, "RW", sizeof buf);
619 if (flags & IO_PAUSE_IN)
620 (void)strlcat(buf, ",F_PI", sizeof buf);
621 if (flags & IO_PAUSE_OUT)
622 (void)strlcat(buf, ",F_PO", sizeof buf);
638 (void)strlcat(buf, "<NONE>", sizeof(buf));
642 if (ev & EV_TIMEOUT) {
643 (void)strlcat(buf, "EV_TIMEOUT", sizeof(buf));
650 (void)strlcat(buf, "|", sizeof(buf));
651 (void)strlcat(buf, "EV_READ", sizeof(buf));
658 (void)strlcat(buf, "|", sizeof(buf));
659 (void)strlcat(buf, "EV_WRITE", sizeof(buf));
664 if (ev & EV_SIGNAL) {
666 (void)strlcat(buf, "|", sizeof(buf));
667 (void)strlcat(buf, "EV_SIGNAL", sizeof(buf));
674 (void)strlcat(buf, "|", sizeof(buf));
675 (void)strlcat(buf, "EV_?=0x", sizeof(buf));
676 (void)snprintf(buf2, sizeof(buf2), "%hx", ev);
677 (void)strlcat(buf, buf2, sizeof(buf));
684 io_dispatch(__unused int fd, short ev, void *humppa)
686 struct io *io = humppa;
691 io_frame_enter("io_dispatch", io, ev);
693 if (ev == EV_TIMEOUT) {
694 io_callback(io, IO_TIMEOUT);
698 if (ev & EV_WRITE && (w = io_queued(io))) {
699 if ((n = iobuf_write(&io->iobuf, io->sock)) < 0) {
700 if (n == IOBUF_WANT_WRITE) /* kqueue bug? */
702 if (n == IOBUF_CLOSED)
703 io_callback(io, IO_DISCONNECTED);
706 io->error = strerror(errno);
708 io_callback(io, IO_ERROR);
712 if (w > io->lowat && w - n <= io->lowat)
713 io_callback(io, IO_LOWAT);
718 iobuf_normalize(&io->iobuf);
719 if ((n = iobuf_read(&io->iobuf, io->sock)) < 0) {
720 if (n == IOBUF_CLOSED)
721 io_callback(io, IO_DISCONNECTED);
724 io->error = strerror(errno);
726 io_callback(io, IO_ERROR);
731 io_callback(io, IO_DATAIN);
739 io_callback(struct io *io, int evt)
741 io->cb(io, evt, io->arg);
745 io_connect(struct io *io, const struct sockaddr *sa, const struct sockaddr *bsa)
747 int sock, errno_save;
749 if ((sock = socket(sa->sa_family, SOCK_STREAM, 0)) == -1)
752 io_set_nonblocking(sock);
753 io_set_nolinger(sock);
755 if (bsa && bind(sock, bsa, SA_LEN(bsa)) == -1)
758 if (connect(sock, sa, SA_LEN(sa)) == -1)
759 if (errno != EINPROGRESS)
763 io_reset(io, EV_WRITE, io_dispatch_connect);
772 io->error = strerror(errno);
778 io_dispatch_connect(int fd, short ev, void *humppa)
780 struct io *io = humppa;
784 io_frame_enter("io_dispatch_connect", io, ev);
786 if (ev == EV_TIMEOUT) {
789 io_callback(io, IO_TIMEOUT);
792 r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &e, &sl);
794 warn("io_dispatch_connect: getsockopt");
800 io->error = strerror(e);
801 io_callback(io, e == ETIMEDOUT ? IO_TIMEOUT : IO_ERROR);
804 io->state = IO_STATE_UP;
805 io_callback(io, IO_CONNECTED);
817 static char buf[128];
820 e = ERR_peek_last_error();
822 ERR_error_string(e, buf);
826 return ("No TLS error");
830 io_start_tls(struct io *io, void *tls)
834 mode = io->flags & IO_RW;
835 if (mode == 0 || mode == IO_RW)
836 errx(1, "io_start_tls(): full-duplex or unset");
839 errx(1, "io_start_tls(): TLS already started");
842 if (SSL_set_fd(io->tls, io->sock) == 0) {
843 ssl_error("io_start_tls:SSL_set_fd");
847 if (mode == IO_WRITE) {
848 io->state = IO_STATE_CONNECT_TLS;
849 SSL_set_connect_state(io->tls);
850 io_reset(io, EV_WRITE, io_dispatch_connect_tls);
852 io->state = IO_STATE_ACCEPT_TLS;
853 SSL_set_accept_state(io->tls);
854 io_reset(io, EV_READ, io_dispatch_accept_tls);
861 io_dispatch_accept_tls(int fd, short event, void *humppa)
863 struct io *io = humppa;
866 io_frame_enter("io_dispatch_accept_tls", io, event);
868 if (event == EV_TIMEOUT) {
869 io_callback(io, IO_TIMEOUT);
873 if ((ret = SSL_accept(io->tls)) > 0) {
874 io->state = IO_STATE_UP;
875 io_callback(io, IO_TLSREADY);
879 switch ((e = SSL_get_error(io->tls, ret))) {
880 case SSL_ERROR_WANT_READ:
881 io_reset(io, EV_READ, io_dispatch_accept_tls);
883 case SSL_ERROR_WANT_WRITE:
884 io_reset(io, EV_WRITE, io_dispatch_accept_tls);
887 io->error = io_tls_error();
888 ssl_error("io_dispatch_accept_tls:SSL_accept");
889 io_callback(io, IO_ERROR);
898 io_dispatch_connect_tls(int fd, short event, void *humppa)
900 struct io *io = humppa;
903 io_frame_enter("io_dispatch_connect_tls", io, event);
905 if (event == EV_TIMEOUT) {
906 io_callback(io, IO_TIMEOUT);
910 if ((ret = SSL_connect(io->tls)) > 0) {
911 io->state = IO_STATE_UP;
912 io_callback(io, IO_TLSREADY);
916 switch ((e = SSL_get_error(io->tls, ret))) {
917 case SSL_ERROR_WANT_READ:
918 io_reset(io, EV_READ, io_dispatch_connect_tls);
920 case SSL_ERROR_WANT_WRITE:
921 io_reset(io, EV_WRITE, io_dispatch_connect_tls);
924 io->error = io_tls_error();
925 ssl_error("io_dispatch_connect_ssl:SSL_connect");
926 io_callback(io, IO_TLSERROR);
935 io_dispatch_read_tls(int fd, short event, void *humppa)
937 struct io *io = humppa;
940 io_frame_enter("io_dispatch_read_tls", io, event);
942 if (event == EV_TIMEOUT) {
943 io_callback(io, IO_TIMEOUT);
948 iobuf_normalize(&io->iobuf);
949 switch ((n = iobuf_read_tls(&io->iobuf, (SSL*)io->tls))) {
950 case IOBUF_WANT_READ:
951 io_reset(io, EV_READ, io_dispatch_read_tls);
953 case IOBUF_WANT_WRITE:
954 io_reset(io, EV_WRITE, io_dispatch_read_tls);
957 io_callback(io, IO_DISCONNECTED);
961 io->error = strerror(errno);
963 io_callback(io, IO_ERROR);
966 io->error = io_tls_error();
967 ssl_error("io_dispatch_read_tls:SSL_read");
968 io_callback(io, IO_ERROR);
971 io_debug("io_dispatch_read_tls(...) -> r=%d\n", n);
972 io_callback(io, IO_DATAIN);
973 if (current == io && IO_READING(io) && SSL_pending(io->tls))
982 io_dispatch_write_tls(int fd, short event, void *humppa)
984 struct io *io = humppa;
988 io_frame_enter("io_dispatch_write_tls", io, event);
990 if (event == EV_TIMEOUT) {
991 io_callback(io, IO_TIMEOUT);
996 switch ((n = iobuf_write_tls(&io->iobuf, (SSL*)io->tls))) {
997 case IOBUF_WANT_READ:
998 io_reset(io, EV_READ, io_dispatch_write_tls);
1000 case IOBUF_WANT_WRITE:
1001 io_reset(io, EV_WRITE, io_dispatch_write_tls);
1004 io_callback(io, IO_DISCONNECTED);
1007 saved_errno = errno;
1008 io->error = strerror(errno);
1009 errno = saved_errno;
1010 io_callback(io, IO_ERROR);
1012 case IOBUF_TLSERROR:
1013 io->error = io_tls_error();
1014 ssl_error("io_dispatch_write_tls:SSL_write");
1015 io_callback(io, IO_ERROR);
1018 io_debug("io_dispatch_write_tls(...) -> w=%d\n", n);
1020 if (w > io->lowat && w2 <= io->lowat)
1021 io_callback(io, IO_LOWAT);
1030 io_reload_tls(struct io *io)
1033 void (*dispatch)(int, short, void*) = NULL;
1035 switch (io->state) {
1036 case IO_STATE_CONNECT_TLS:
1038 dispatch = io_dispatch_connect_tls;
1040 case IO_STATE_ACCEPT_TLS:
1042 dispatch = io_dispatch_accept_tls;
1046 if (IO_READING(io) && !(io->flags & IO_PAUSE_IN)) {
1048 dispatch = io_dispatch_read_tls;
1050 else if (IO_WRITING(io) && !(io->flags & IO_PAUSE_OUT) &&
1053 dispatch = io_dispatch_write_tls;
1056 return; /* paused */
1059 errx(1, "io_reload_tls(): bad state");
1062 io_reset(io, ev, dispatch);