diff --git a/Core/AppRuntime/CMakeLists.txt b/Core/AppRuntime/CMakeLists.txt index ad8f670d..ba671233 100644 --- a/Core/AppRuntime/CMakeLists.txt +++ b/Core/AppRuntime/CMakeLists.txt @@ -9,9 +9,7 @@ set(SOURCES "Include/Babylon/AppRuntime.h" "Source/AppRuntime.cpp" "Source/AppRuntime_${NAPI_JAVASCRIPT_ENGINE}.cpp" - "Source/AppRuntime_${JSRUNTIMEHOST_PLATFORM}.${IMPL_EXT}" - "Source/WorkQueue.cpp" - "Source/WorkQueue.h") + "Source/AppRuntime_${JSRUNTIMEHOST_PLATFORM}.${IMPL_EXT}") add_library(AppRuntime ${SOURCES}) warnings_as_errors(AppRuntime) diff --git a/Core/AppRuntime/Include/Babylon/AppRuntime.h b/Core/AppRuntime/Include/Babylon/AppRuntime.h index ba0ffa71..0de98b8d 100644 --- a/Core/AppRuntime/Include/Babylon/AppRuntime.h +++ b/Core/AppRuntime/Include/Babylon/AppRuntime.h @@ -6,14 +6,19 @@ #include +#include +#include + #include #include #include +#include +#include +#include +#include namespace Babylon { - class WorkQueue; - class AppRuntime final { public: @@ -43,6 +48,23 @@ namespace Babylon static void BABYLON_API DefaultUnhandledExceptionHandler(const Napi::Error& error); private: + template + void Append(CallableT callable) + { + if constexpr (std::is_copy_constructible::value) + { + m_dispatcher.queue([this, callable = std::move(callable)]() { + callable(m_env.value()); + }); + } + else + { + m_dispatcher.queue([this, callablePtr = std::make_shared(std::move(callable))]() { + (*callablePtr)(m_env.value()); + }); + } + } + // These three methods are the mechanism by which platform- and JavaScript-specific // code can be "injected" into the execution of the JavaScript thread. These three // functions are implemented in separate files, thus allowing implementations to be @@ -62,6 +84,10 @@ namespace Babylon void Execute(Dispatchable callback); Options m_options; - std::unique_ptr m_workQueue; + std::optional m_env{}; + std::optional> m_suspensionLock{}; + arcana::cancellation_source m_cancelSource{}; + arcana::manual_dispatcher<128> m_dispatcher{}; + std::thread m_thread; }; } diff --git a/Core/AppRuntime/Source/AppRuntime.cpp b/Core/AppRuntime/Source/AppRuntime.cpp index b3a90b1b..ae1ee4f9 100644 --- a/Core/AppRuntime/Source/AppRuntime.cpp +++ b/Core/AppRuntime/Source/AppRuntime.cpp @@ -1,5 +1,4 @@ #include "AppRuntime.h" -#include "WorkQueue.h" #include namespace Babylon @@ -11,7 +10,7 @@ namespace Babylon AppRuntime::AppRuntime(Options options) : m_options{std::move(options)} - , m_workQueue{std::make_unique([this] { RunPlatformTier(); })} + , m_thread{[this] { RunPlatformTier(); }} { Dispatch([this](Napi::Env env) { JsRuntime::CreateForJavaScript(env, [this](auto func) { Dispatch(std::move(func)); }); @@ -20,26 +19,58 @@ namespace Babylon AppRuntime::~AppRuntime() { + if (m_suspensionLock.has_value()) + { + m_suspensionLock.reset(); + } + + // Cancel immediately so pending work is dropped promptly, then append + // a no-op work item to wake the worker thread from blocking_tick. The + // no-op goes through push() which acquires the queue mutex, avoiding + // the race where a bare notify_all() can be missed by wait(). + // + // NOTE: This preserves the existing shutdown behavior where pending + // callbacks are dropped on cancellation. A more complete solution + // would add cooperative shutdown (e.g. NotifyDisposing/Rundown) so + // consumers can finish cleanup work before the runtime is destroyed. + m_cancelSource.cancel(); + Append([](Napi::Env) {}); + + m_thread.join(); } void AppRuntime::Run(Napi::Env env) { - m_workQueue->Run(env); + m_env = std::make_optional(env); + + m_dispatcher.set_affinity(std::this_thread::get_id()); + + while (!m_cancelSource.cancelled()) + { + m_dispatcher.blocking_tick(m_cancelSource); + } + + // The dispatcher can be non-empty if something is dispatched after cancellation. + m_dispatcher.clear(); } void AppRuntime::Suspend() { - m_workQueue->Suspend(); + auto suspensionMutex = std::make_shared(); + m_suspensionLock.emplace(*suspensionMutex); + Append([suspensionMutex{std::move(suspensionMutex)}](Napi::Env) { + std::scoped_lock lock{*suspensionMutex}; + }); } void AppRuntime::Resume() { - m_workQueue->Resume(); + m_suspensionLock.reset(); } void AppRuntime::Dispatch(Dispatchable func) { - m_workQueue->Append([this, func{std::move(func)}](Napi::Env env) mutable { + Append([this, func{std::move(func)}](Napi::Env env) mutable { Execute([this, env, func{std::move(func)}]() mutable { try { diff --git a/Core/AppRuntime/Source/AppRuntime_JSI.cpp b/Core/AppRuntime/Source/AppRuntime_JSI.cpp index 020bf83f..3ddd19da 100644 --- a/Core/AppRuntime/Source/AppRuntime_JSI.cpp +++ b/Core/AppRuntime/Source/AppRuntime_JSI.cpp @@ -1,5 +1,4 @@ #include "AppRuntime.h" -#include "WorkQueue.h" #include #include @@ -10,15 +9,15 @@ namespace class TaskRunnerAdapter : public v8runtime::JSITaskRunner { public: - TaskRunnerAdapter(Babylon::WorkQueue& workQueue) - : m_workQueue(workQueue) + TaskRunnerAdapter(Babylon::AppRuntime& runtime) + : m_runtime(runtime) { } void postTask(std::unique_ptr task) override { - std::shared_ptr shared_task(task.release()); - m_workQueue.Append([shared_task2 = std::move(shared_task)](Napi::Env) { + std::shared_ptr shared_task(std::move(task)); + m_runtime.Dispatch([shared_task2 = std::move(shared_task)](Napi::Env) { shared_task2->run(); }); } @@ -27,7 +26,7 @@ namespace TaskRunnerAdapter(const TaskRunnerAdapter&) = delete; TaskRunnerAdapter& operator=(const TaskRunnerAdapter&) = delete; - Babylon::WorkQueue& m_workQueue; + Babylon::AppRuntime& m_runtime; }; } @@ -37,7 +36,7 @@ namespace Babylon { v8runtime::V8RuntimeArgs args{}; args.inspectorPort = 5643; - args.foreground_task_runner = std::make_shared(*m_workQueue); + args.foreground_task_runner = std::make_shared(*this); const auto runtime = v8runtime::makeV8Runtime(std::move(args)); const auto env = Napi::Attach(*runtime); diff --git a/Core/AppRuntime/Source/WorkQueue.cpp b/Core/AppRuntime/Source/WorkQueue.cpp deleted file mode 100644 index 8ff5efe1..00000000 --- a/Core/AppRuntime/Source/WorkQueue.cpp +++ /dev/null @@ -1,58 +0,0 @@ -#include "WorkQueue.h" - -namespace Babylon -{ - WorkQueue::WorkQueue(std::function threadProcedure) - : m_thread{std::move(threadProcedure)} - { - } - - WorkQueue::~WorkQueue() - { - if (m_suspensionLock.has_value()) - { - Resume(); - } - - // Cancel immediately so pending work is dropped promptly, then append - // a no-op work item to wake the worker thread from blocking_tick. The - // no-op goes through push() which acquires the queue mutex, avoiding - // the race where a bare notify_all() can be missed by wait(). - // - // NOTE: This preserves the existing shutdown behavior where pending - // callbacks are dropped on cancellation. A more complete solution - // would add cooperative shutdown (e.g. NotifyDisposing/Rundown) so - // consumers can finish cleanup work before the runtime is destroyed. - m_cancelSource.cancel(); - Append([](Napi::Env) {}); - - m_thread.join(); - } - - void WorkQueue::Suspend() - { - auto suspensionMutex = std::make_shared(); - m_suspensionLock.emplace(*suspensionMutex); - Append([suspensionMutex{std::move(suspensionMutex)}](Napi::Env) { - std::scoped_lock lock{*suspensionMutex}; - }); - } - - void WorkQueue::Resume() - { - m_suspensionLock.reset(); - } - - void WorkQueue::Run(Napi::Env env) - { - m_env = std::make_optional(env); - m_dispatcher.set_affinity(std::this_thread::get_id()); - - while (!m_cancelSource.cancelled()) - { - m_dispatcher.blocking_tick(m_cancelSource); - } - - m_dispatcher.clear(); - } -} diff --git a/Core/AppRuntime/Source/WorkQueue.h b/Core/AppRuntime/Source/WorkQueue.h deleted file mode 100644 index 95a504d7..00000000 --- a/Core/AppRuntime/Source/WorkQueue.h +++ /dev/null @@ -1,58 +0,0 @@ -#pragma once - -#include -#include -#include - -#include -#include -#include - -namespace Babylon -{ - class WorkQueue - { - public: - WorkQueue(std::function threadProcedure); - ~WorkQueue(); - - template - void Append(CallableT callable) - { - // Manual dispatcher queueing requires a copyable CallableT, we use a shared pointer trick to make a - // copyable callable if necessary. - if constexpr (std::is_copy_constructible::value) - { - m_dispatcher.queue([this, callable = std::move(callable)]() { - Invoke(callable); - }); - } - else - { - m_dispatcher.queue([this, callablePtr = std::make_shared(std::move(callable))]() { - Invoke(*callablePtr); - }); - } - } - - void Suspend(); - void Resume(); - void Run(Napi::Env); - - private: - template - void Invoke(CallableT& callable) - { - callable(m_env.value()); - } - - std::optional m_env{}; - - std::optional> m_suspensionLock{}; - - arcana::cancellation_source m_cancelSource{}; - arcana::manual_dispatcher<128> m_dispatcher{}; - - std::thread m_thread; - }; -}