added poll() support so that the number of file handles is not limited (#502)

This commit is contained in:
Fabrice Bellard 2026-06-01 18:24:48 +02:00
parent 8a3aaa129d
commit 163b9b725d
3 changed files with 118 additions and 21 deletions

@ -456,6 +456,7 @@ test: qjs$(EXE)
$(WINE) ./qjs$(EXE) tests/test_worker.js
ifndef CONFIG_WIN32
$(WINE) ./qjs$(EXE) tests/test_std.js
$(WINE) ./qjs$(EXE) tests/test_rw_handler.js
endif
ifdef CONFIG_SHARED_LIBS
$(WINE) ./qjs$(EXE) tests/test_bjson.js

@ -46,6 +46,7 @@
#include <termios.h>
#include <sys/ioctl.h>
#include <sys/wait.h>
#include <poll.h>
#if defined(__FreeBSD__)
extern char **environ;
@ -87,6 +88,7 @@ typedef sig_t sighandler_t;
typedef struct {
struct list_head link;
int fd;
int poll_fd_index; /* temporary use in js_os_poll() */
JSValue rw_func[2];
} JSOSRWHandler;
@ -134,6 +136,7 @@ typedef struct {
struct list_head link;
JSWorkerMessagePipe *recv_pipe;
JSValue on_message_func;
int poll_fd_index; /* temporary use in js_os_poll() */
} JSWorkerMessageHandler;
typedef struct {
@ -152,6 +155,10 @@ typedef struct JSThreadState {
int next_timer_id; /* for setTimeout() */
/* not used in the main thread */
JSWorkerMessagePipe *recv_pipe, *send_pipe;
#if !defined(_WIN32)
struct pollfd *poll_fds;
int poll_fds_size;
#endif
} JSThreadState;
static uint64_t os_pending_signals;
@ -2507,16 +2514,44 @@ static int js_os_poll(JSContext *ctx)
#else
static no_inline int js_poll_expand(JSThreadState *ts)
{
struct pollfd *new_fds;
int new_size = max_int(ts->poll_fds_size +
ts->poll_fds_size / 2, 16);
new_fds = realloc(ts->poll_fds, new_size * sizeof(struct pollfd));
if (!new_fds)
return -1;
ts->poll_fds = new_fds;
ts->poll_fds_size = new_size;
return 0;
}
static int js_poll_add_poll_fd(JSThreadState *ts, int *pnfds, int fd, int events)
{
struct pollfd *fds;
int nfds;
nfds = *pnfds;
if (unlikely(nfds >= ts->poll_fds_size)) {
if (js_poll_expand(ts))
return -1;
}
fds = &ts->poll_fds[nfds++];
fds->fd = fd;
fds->events = events;
fds->revents = 0;
*pnfds = nfds;
return 0;
}
static int js_os_poll(JSContext *ctx)
{
JSRuntime *rt = JS_GetRuntime(ctx);
JSThreadState *ts = JS_GetRuntimeOpaque(rt);
int ret, fd_max, min_delay;
int min_delay, nfds;
int64_t cur_time, delay;
fd_set rfds, wfds;
JSOSRWHandler *rh;
struct list_head *el;
struct timeval tv, *tvp;
/* only check signals in the main thread */
if (!ts->recv_pipe &&
@ -2558,46 +2593,49 @@ static int js_os_poll(JSContext *ctx)
min_delay = delay;
}
}
tv.tv_sec = min_delay / 1000;
tv.tv_usec = (min_delay % 1000) * 1000;
tvp = &tv;
} else {
tvp = NULL;
min_delay = -1; /* infinite */
}
FD_ZERO(&rfds);
FD_ZERO(&wfds);
fd_max = -1;
nfds = 0;
list_for_each(el, &ts->os_rw_handlers) {
int events;
rh = list_entry(el, JSOSRWHandler, link);
fd_max = max_int(fd_max, rh->fd);
events = 0;
if (!JS_IsNull(rh->rw_func[0]))
FD_SET(rh->fd, &rfds);
events |= POLLIN;
if (!JS_IsNull(rh->rw_func[1]))
FD_SET(rh->fd, &wfds);
events |= POLLOUT;
if (events) {
rh->poll_fd_index = nfds;
if (js_poll_add_poll_fd(ts, &nfds, rh->fd, events))
return -1;
}
}
list_for_each(el, &ts->port_list) {
JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link);
if (!JS_IsNull(port->on_message_func)) {
JSWorkerMessagePipe *ps = port->recv_pipe;
fd_max = max_int(fd_max, ps->waker.read_fd);
FD_SET(ps->waker.read_fd, &rfds);
port->poll_fd_index = nfds;
if (js_poll_add_poll_fd(ts, &nfds, ps->waker.read_fd, POLLIN))
return -1;
}
}
ret = select(fd_max + 1, &rfds, &wfds, NULL, tvp);
if (ret > 0) {
nfds = poll(ts->poll_fds, nfds, min_delay);
if (nfds > 0) {
list_for_each(el, &ts->os_rw_handlers) {
rh = list_entry(el, JSOSRWHandler, link);
if (!JS_IsNull(rh->rw_func[0]) &&
FD_ISSET(rh->fd, &rfds)) {
(ts->poll_fds[rh->poll_fd_index].revents & (POLLERR | POLLHUP | POLLNVAL | POLLIN))) {
call_handler(ctx, rh->rw_func[0]);
/* must stop because the list may have been modified */
goto done;
}
if (!JS_IsNull(rh->rw_func[1]) &&
FD_ISSET(rh->fd, &wfds)) {
(ts->poll_fds[rh->poll_fd_index].revents & (POLLERR | POLLHUP | POLLNVAL | POLLOUT))) {
call_handler(ctx, rh->rw_func[1]);
/* must stop because the list may have been modified */
goto done;
@ -2607,8 +2645,7 @@ static int js_os_poll(JSContext *ctx)
list_for_each(el, &ts->port_list) {
JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link);
if (!JS_IsNull(port->on_message_func)) {
JSWorkerMessagePipe *ps = port->recv_pipe;
if (FD_ISSET(ps->waker.read_fd, &rfds)) {
if (ts->poll_fds[port->poll_fd_index].revents != 0) {
if (handle_posted_message(rt, ctx, port))
goto done;
}
@ -4164,6 +4201,8 @@ void js_std_free_handlers(JSRuntime *rt)
}
#endif
free(ts->poll_fds);
free(ts);
JS_SetRuntimeOpaque(rt, NULL); /* fail safe */
}

57
tests/test_rw_handler.js Normal file

@ -0,0 +1,57 @@
import * as std from "std";
import * as os from "os";
function assert(actual, expected, message) {
if (arguments.length == 1)
expected = true;
if (Object.is(actual, expected))
return;
if (actual !== null && expected !== null
&& typeof actual == 'object' && typeof expected == 'object'
&& actual.toString() === expected.toString())
return;
throw Error("assertion failed: got |" + actual + "|" +
", expected |" + expected + "|" +
(message ? " (" + message + ")" : ""));
}
function handle_read(fd_r, fd_w, i)
{
var buf = new Uint32Array(1);
var val, len;
len = os.read(fd_r, buf.buffer, 0, 4);
os.setReadHandler(fd_r, null);
val = buf[0];
// print("read fd=", fd_r, "val=", val, "len=", len);
assert(val, i);
}
function handle_write(fd_r, fd_w, i)
{
var buf = new Uint32Array(1);
buf[0] = i;
os.write(fd_w, buf.buffer, 0, 4);
os.setWriteHandler(fd_w, null);
}
function test_rw_handlers(n)
{
var tab, fd_r, fd_w, i;
tab = [];
for(i = 0; i < n; i++) {
tab[i] = os.pipe();
fd_r = tab[i][0];
fd_w = tab[i][1];
os.setReadHandler(fd_r, handle_read.bind(null, fd_r, fd_w, i));
}
for(i = n - 1; i >= 0; i--) {
fd_r = tab[i][0];
fd_w = tab[i][1];
os.setWriteHandler(fd_w, handle_write.bind(null, fd_r, fd_w, i));
}
}
test_rw_handlers(100);