Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ext/node): do not exit worker thread when there is pending async op #27378

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ impl CliMainWorkerFactory {
serve_port: shared.options.serve_port,
serve_host: shared.options.serve_host.clone(),
otel_config: shared.otel_config.clone(),
close_on_idle: true,
},
extensions: custom_extensions,
startup_snapshot: crate::js::deno_isolate_init(),
Expand Down Expand Up @@ -812,6 +813,7 @@ fn create_web_worker_callback(
serve_port: shared.options.serve_port,
serve_host: shared.options.serve_host.clone(),
otel_config: shared.otel_config.clone(),
close_on_idle: args.close_on_idle,
},
extensions: vec![],
startup_snapshot: crate::js::deno_isolate_init(),
Expand Down
6 changes: 3 additions & 3 deletions ext/node/polyfills/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
nodeWorkerThreadCloseCb,
refMessagePort,
serializeJsMessageData,
unrefPollForMessages,
unrefParentPort,
} from "ext:deno_web/13_message_port.js";
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { notImplemented } from "ext:deno_node/_utils.ts";
Expand Down Expand Up @@ -451,10 +451,10 @@ internals.__initWorkerThreads = (
parentPort.emit("close");
});
parentPort.unref = () => {
parentPort[unrefPollForMessages] = true;
parentPort[unrefParentPort] = true;
};
parentPort.ref = () => {
parentPort[unrefPollForMessages] = false;
parentPort[unrefParentPort] = false;
};

if (isWorkerThread) {
Expand Down
4 changes: 2 additions & 2 deletions ext/web/13_message_port.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
export const refMessagePort = Symbol("refMessagePort");
/** It is used by 99_main.js and worker_threads to
* unref/ref on the global pollForMessages promise. */
export const unrefPollForMessages = Symbol("unrefPollForMessages");
* unref/ref on the global message event handler count. */
export const unrefParentPort = Symbol("unrefParentPort");

/**
* @param {number} id
Expand Down
11 changes: 9 additions & 2 deletions runtime/js/99_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ function postMessage(message, transferOrOptions = { __proto__: null }) {

let isClosing = false;
let globalDispatchEvent;
let closeOnIdle;

function hasMessageEventListener() {
// the function name is kind of a misnomer, but we want to behave
// as if we have message event listeners if a node message port is explicitly
// refed (and the inverse as well)
return event.listenerCount(globalThis, "message") > 0 ||
return (event.listenerCount(globalThis, "message") > 0 &&
!globalThis[messagePort.unrefParentPort]) ||
messagePort.refedMessagePortsCount > 0;
}

Expand All @@ -188,7 +190,10 @@ async function pollForMessages() {
}
while (!isClosing) {
const recvMessage = op_worker_recv_message();
if (globalThis[messagePort.unrefPollForMessages] === true) {
// In a Node.js worker, unref() the op promise to prevent it from
// keeping the event loop alive. This avoids the need to explicitly
// call self.close() or worker.terminate().
if (closeOnIdle) {
core.unrefOpPromise(recvMessage);
}
const data = await recvMessage;
Expand Down Expand Up @@ -915,6 +920,7 @@ function bootstrapWorkerRuntime(
6: argv0,
7: nodeDebug,
13: otelConfig,
14: closeOnIdle_,
} = runtimeOptions;

performance.setTimeOrigin();
Expand Down Expand Up @@ -967,6 +973,7 @@ function bootstrapWorkerRuntime(

globalThis.pollForMessages = pollForMessages;
globalThis.hasMessageEventListener = hasMessageEventListener;
closeOnIdle = closeOnIdle_;

for (let i = 0; i <= unstableFeatures.length; i++) {
const id = unstableFeatures[i];
Expand Down
33 changes: 4 additions & 29 deletions runtime/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use std::task::Poll;
use crate::inspector_server::InspectorServer;
use crate::ops;
use crate::ops::process::NpmProcessStateProviderRc;
use crate::ops::worker_host::WorkersTable;
use crate::shared::maybe_transpile_source;
use crate::shared::runtime;
use crate::tokio_util::create_and_run_current_thread;
Expand Down Expand Up @@ -385,7 +384,6 @@ pub struct WebWorker {
pub js_runtime: JsRuntime,
pub name: String,
close_on_idle: bool,
has_executed_main_module: bool,
internal_handle: WebWorkerInternalHandle,
pub worker_type: WebWorkerType,
pub main_module: ModuleSpecifier,
Expand Down Expand Up @@ -658,7 +656,6 @@ impl WebWorker {
has_message_event_listener_fn: None,
bootstrap_fn_global: Some(bootstrap_fn_global),
close_on_idle: options.close_on_idle,
has_executed_main_module: false,
maybe_worker_metadata: options.maybe_worker_metadata,
},
external_handle,
Expand Down Expand Up @@ -799,7 +796,6 @@ impl WebWorker {

maybe_result = &mut receiver => {
debug!("received worker module evaluate {:#?}", maybe_result);
self.has_executed_main_module = true;
maybe_result
}

Expand Down Expand Up @@ -837,6 +833,9 @@ impl WebWorker {
}

if self.close_on_idle {
if self.has_message_event_listener() {
return Poll::Pending;
}
return Poll::Ready(Ok(()));
}

Expand All @@ -851,22 +850,7 @@ impl WebWorker {
Poll::Ready(Ok(()))
}
}
Poll::Pending => {
// This is special code path for workers created from `node:worker_threads`
// module that have different semantics than Web workers.
// We want the worker thread to terminate automatically if we've done executing
// Top-Level await, there are no child workers spawned by that workers
// and there's no "message" event listener.
if self.close_on_idle
&& self.has_executed_main_module
&& !self.has_child_workers()
&& !self.has_message_event_listener()
{
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}

Expand Down Expand Up @@ -904,15 +888,6 @@ impl WebWorker {
None => false,
}
}

fn has_child_workers(&mut self) -> bool {
!self
.js_runtime
.op_state()
.borrow()
.borrow::<WorkersTable>()
.is_empty()
}
}

fn print_worker_error(
Expand Down
5 changes: 5 additions & 0 deletions runtime/worker_bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub struct BootstrapOptions {
pub serve_port: Option<u16>,
pub serve_host: Option<String>,
pub otel_config: OtelConfig,
pub close_on_idle: bool,
}

impl Default for BootstrapOptions {
Expand Down Expand Up @@ -155,6 +156,7 @@ impl Default for BootstrapOptions {
serve_port: Default::default(),
serve_host: Default::default(),
otel_config: Default::default(),
close_on_idle: false,
}
}
}
Expand Down Expand Up @@ -198,6 +200,8 @@ struct BootstrapV8<'a>(
Option<usize>,
// OTEL config
Box<[u8]>,
// close on idle
bool,
);

impl BootstrapOptions {
Expand Down Expand Up @@ -225,6 +229,7 @@ impl BootstrapOptions {
serve_is_main,
serve_worker_count,
self.otel_config.as_v8(),
self.close_on_idle,
);

bootstrap.serialize(ser).unwrap()
Expand Down
3 changes: 1 addition & 2 deletions tests/specs/permission/allow_import_worker/denied.out
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ await import(specifier);
^
at async file:///[WILDLINE]
error: Uncaught (in promise) Error: Unhandled error in child worker.
at [WILDLINE]
at [WILDLINE]
at [WILDCARD]
23 changes: 23 additions & 0 deletions tests/unit_node/worker_threads_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -841,3 +841,26 @@ Deno.test({
assertEquals(result, true);
},
});

Deno.test("[node/worker_threads] Worker runs async ops correctly", async () => {
const recvMessage = Promise.withResolvers<void>();
const timer = setTimeout(() => recvMessage.reject(), 1000);
const worker = new workerThreads.Worker(
`
import { parentPort } from "node:worker_threads";
setTimeout(() => {
parentPort.postMessage("Hello from worker");
}, 10);
`,
{ eval: true },
);

worker.on("message", (msg) => {
assertEquals(msg, "Hello from worker");
worker.terminate();
recvMessage.resolve();
clearTimeout(timer);
});

await recvMessage.promise;
});
Loading