diff --git a/Source/Core/Common/CMakeLists.txt b/Source/Core/Common/CMakeLists.txt index 608e2f270a..e778a755c8 100644 --- a/Source/Core/Common/CMakeLists.txt +++ b/Source/Core/Common/CMakeLists.txt @@ -134,6 +134,7 @@ add_library(common SocketContext.h SpanUtils.h SPSCQueue.h + StdJThread.h StringLiteral.h StringUtil.cpp StringUtil.h diff --git a/Source/Core/Common/StdJThread.h b/Source/Core/Common/StdJThread.h new file mode 100644 index 0000000000..d0c203556b --- /dev/null +++ b/Source/Core/Common/StdJThread.h @@ -0,0 +1,164 @@ +// Copyright 2025 Dolphin Emulator Project +// SPDX-License-Identifier: GPL-2.0-or-later + +#pragma once + +// This header provides a basic implementation of jthread in the StdCompat namespace +// It pulls the stdlib provided std::jthread into the StdCompat namespace when available. + +// TODO: Eliminate this when we can rely on std::jthread being in the stdlib. +// Clang libc++ provides P0660R10 in version 18 (with -fexperimental-library) or version 20. +// GCC libstdc++ provides it in version 10. + +#include + +#if defined(__cpp_lib_jthread) && __cpp_lib_jthread >= 201911L +namespace StdCompat +{ +using std::jthread; +using std::stop_source; +using std::stop_token; +} // namespace StdCompat +#else +#include +#include + +#include "Common/TypeUtils.h" + +namespace StdCompat +{ +class stop_source; + +class stop_token +{ + friend stop_source; + +public: + stop_token() = default; + + bool stop_requested() const + { + return stop_possible() && m_stop_state->stop_requested_flag.test(); + } + + bool stop_possible() const { return m_stop_state != nullptr; } + + void swap(stop_token& other) { m_stop_state.swap(other.m_stop_state); } + + friend bool operator==(const stop_token&, const stop_token&) = default; + +private: + struct StopState + { + std::atomic_flag stop_requested_flag = false; + }; + + explicit stop_token(std::shared_ptr stop_state) : m_stop_state{std::move(stop_state)} + { + } + + std::shared_ptr m_stop_state; +}; + +struct nostopstate_t +{ + explicit nostopstate_t() = default; +}; +inline constexpr nostopstate_t nostopstate{}; + +class stop_source +{ +public: + stop_source() : m_stop_token{std::make_shared()} {} + explicit stop_source(nostopstate_t) {} + + stop_source(const stop_source&) = default; + stop_source(stop_source&&) = default; + + stop_source& operator=(const stop_source&) = default; + stop_source& operator=(stop_source&&) = default; + + ~stop_source() = default; + + bool request_stop() + { + return m_stop_token.stop_possible() && + !m_stop_token.m_stop_state->stop_requested_flag.test_and_set(); + } + + void swap(stop_source& other) { m_stop_token.swap(other.m_stop_token); } + + stop_token get_token() const { return m_stop_token; } + + bool stop_requested() const { return m_stop_token.stop_requested(); } + bool stop_possible() const { return m_stop_token.stop_possible(); } + + friend bool operator==(const stop_source&, const stop_source&) = default; + +private: + stop_token m_stop_token; +}; + +class jthread +{ +public: + using native_handle_type = std::thread::native_handle_type; + + jthread() : m_stop_source(nostopstate) {} + + template + explicit jthread(F&& f, Args&&... args) + { + if constexpr (std::is_invocable_v, stop_token, std::decay_t...>) + { + m_thread = std::thread{std::forward(f), get_stop_token(), std::forward(args)...}; + } + else if constexpr (std::is_invocable_v, std::decay_t...>) + { + m_thread = std::thread{std::forward(f), std::forward(args)...}; + } + else + { + static_assert(Common::DependentFalse(), + "thread function is not invocable with provided arguments."); + } + } + + jthread(jthread&& other) = default; + jthread& operator=(jthread&& other) = default; + + jthread(const jthread&) = delete; + jthread& operator=(const jthread&) = delete; + + ~jthread() + { + if (joinable()) + { + request_stop(); + join(); + } + } + + bool joinable() const { return m_thread.joinable(); } + auto get_id() const { return m_thread.get_id(); }; + auto native_handle() { return m_thread.native_handle(); } + static auto hardware_concurrency() { return std::thread::hardware_concurrency(); } + + void join() { m_thread.join(); } + void detach() { m_thread.detach(); } + void swap(jthread& other) + { + m_thread.swap(other.m_thread); + m_stop_source.swap(other.m_stop_source); + } + + stop_source get_stop_source() { return m_stop_source; } + stop_token get_stop_token() const { return m_stop_source.get_token(); } + bool request_stop() { return m_stop_source.request_stop(); } + +private: + std::thread m_thread; + stop_source m_stop_source; +}; +} // namespace StdCompat +#endif diff --git a/Source/Core/Common/TypeUtils.h b/Source/Core/Common/TypeUtils.h index 714f5a718d..8079abf4da 100644 --- a/Source/Core/Common/TypeUtils.h +++ b/Source/Core/Common/TypeUtils.h @@ -82,4 +82,15 @@ static_assert(!IsNOf::value); static_assert(IsNOf::value); static_assert(IsNOf::value); // Type conversions ARE allowed static_assert(!IsNOf::value); + +template +struct DependentFalse : std::false_type +{ +}; + +template +struct DependentTrue : std::true_type +{ +}; + } // namespace Common diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index be723299b2..a9fe55b45f 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -4,169 +4,167 @@ #pragma once #include -#include #include -#include +#include #include -#include -#include +#include "Common/Event.h" +#include "Common/SPSCQueue.h" +#include "Common/StdJThread.h" #include "Common/Thread.h" // A thread that executes the given function for every item placed into its queue. namespace Common { -template -class WorkQueueThread +namespace detail +{ +template +class WorkQueueThreadBase final { public: - WorkQueueThread() = default; - WorkQueueThread(const std::string_view name, std::function function) + WorkQueueThreadBase() = default; + WorkQueueThreadBase(std::string name, std::function function) { - Reset(name, std::move(function)); + Reset(std::move(name), std::move(function)); } - ~WorkQueueThread() { Shutdown(); } + ~WorkQueueThreadBase() { Shutdown(); } // Shuts the current work thread down (if any) and starts a new thread with the given function // Note: Some consumers of this API push items to the queue before starting the thread. - void Reset(const std::string_view name, std::function function) + void Reset(std::string name, std::function function) { + auto lg = GetLockGuard(); Shutdown(); - std::lock_guard lg(m_lock); - m_thread_name = name; - m_shutdown = false; - m_function = std::move(function); - m_thread = std::thread(&WorkQueueThread::ThreadLoop, this); + m_thread = StdCompat::jthread(std::bind_front(&WorkQueueThreadBase::ThreadLoop, this), + std::move(name), std::move(function)); } // Adds an item to the work queue template void EmplaceItem(Args&&... args) { - std::lock_guard lg(m_lock); - if (m_shutdown) - return; - - m_items.emplace(std::forward(args)...); - m_idle = false; - m_worker_cond_var.notify_one(); + auto lg = GetLockGuard(); + m_items.Emplace(std::forward(args)...); + m_event.Set(); } + void Push(T&& item) { EmplaceItem(std::move(item)); } + void Push(const T& item) { EmplaceItem(item); } - // Adds an item to the work queue - void Push(T&& item) - { - std::lock_guard lg(m_lock); - if (m_shutdown) - return; - - m_items.push(std::move(item)); - m_idle = false; - m_worker_cond_var.notify_one(); - } - - // Adds an item to the work queue - void Push(const T& item) - { - std::lock_guard lg(m_lock); - if (m_shutdown) - return; - - m_items.push(item); - m_idle = false; - m_worker_cond_var.notify_one(); - } - - // Empties the queue - // If the worker polls IsCanceling(), it can abort it's work when Cancelling + // Empties the queue, skipping all work. + // Blocks until the current work is cancelled. void Cancel() { - std::unique_lock lg(m_lock); - if (m_shutdown) - return; - - m_cancelling = true; - m_items = std::queue(); - m_worker_cond_var.notify_one(); + auto lg = GetLockGuard(); + if (IsRunning()) + { + m_skip_work.store(true, std::memory_order_relaxed); + WaitForCompletion(); + m_skip_work.store(false, std::memory_order_relaxed); + } + else + { + m_items.Clear(); + } } - // Tells the worker to shut down when it's queue is empty - // Blocks until the worker thread exits. - // If cancel is true, will Cancel before before telling the worker to exit - // Otherwise, all currently queued items will complete before the worker exits - void Shutdown(bool cancel = false) + // Tells the worker thread to stop when its queue is empty. + // Blocks until the worker thread exits. Does nothing if thread isn't running. + void Shutdown() { StopThread(true); } + + // Tells the worker thread to stop immediately, potentially leaving work in the queue. + // Blocks until the worker thread exits. Does nothing if thread isn't running. + void Stop() { StopThread(false); } + + // Stops the worker thread ASAP and empties the queue. + void StopAndCancel() { - { - std::unique_lock lg(m_lock); - if (m_shutdown || !m_thread.joinable()) - return; - - if (cancel) - { - m_cancelling = true; - m_items = std::queue(); - } - - m_shutdown = true; - m_worker_cond_var.notify_one(); - } - - m_thread.join(); + auto lg = GetLockGuard(); + Stop(); + Cancel(); } // Blocks until all items in the queue have been processed (or cancelled) + // Does nothing if thread isn't running. void WaitForCompletion() { - std::unique_lock lg(m_lock); - // don't check m_shutdown, because it gets set to request a shutdown, and we want to wait until - // after the shutdown completes. - // We also check m_cancelling, because we want to ensure the worker acknowledges our cancel. - if (m_idle && !m_cancelling.load()) - return; - - m_wait_cond_var.wait(lg, [&] { return m_idle && !m_cancelling; }); + auto lg = GetLockGuard(); + if (IsRunning()) + m_items.WaitForEmpty(); } - // If the worker polls IsCanceling(), it can abort its work when Cancelling - bool IsCancelling() const { return m_cancelling.load(); } - private: - void ThreadLoop() + void StopThread(bool wait_for_completion) { - Common::SetCurrentThreadName(m_thread_name.c_str()); + auto lg = GetLockGuard(); - while (true) + if (wait_for_completion) + WaitForCompletion(); + + if (m_thread.request_stop()) { - std::unique_lock lg(m_lock); - while (m_items.empty()) - { - m_idle = true; - m_cancelling = false; - m_wait_cond_var.notify_all(); - if (m_shutdown) - return; - - m_worker_cond_var.wait( - lg, [&] { return !m_items.empty() || m_shutdown || m_cancelling.load(); }); - } - T item{std::move(m_items.front())}; - m_items.pop(); - lg.unlock(); - - m_function(std::move(item)); + m_event.Set(); + m_thread.join(); } } - std::function m_function; - std::string m_thread_name; - std::thread m_thread; - std::mutex m_lock; - std::queue m_items; - std::condition_variable m_wait_cond_var; - std::condition_variable m_worker_cond_var; - std::atomic m_cancelling = false; - bool m_idle = true; - bool m_shutdown = false; + auto GetLockGuard() + { + struct DummyLockGuard + { + // Silences unused variable warning. + ~DummyLockGuard() {} + }; + + if constexpr (IsSingleProducer) + return DummyLockGuard{}; + else + return std::lock_guard(m_mutex); + } + + bool IsRunning() { return m_thread.joinable(); } + + void ThreadLoop(const StdCompat::stop_token& stoken, const std::string& thread_name, + const std::function& function) + { + Common::SetCurrentThreadName(thread_name.c_str()); + + while (!stoken.stop_requested()) + { + if (m_items.Empty()) + { + m_event.Wait(); + continue; + } + + if (m_skip_work.load(std::memory_order_relaxed)) + { + m_items.Clear(); + continue; + } + + function(std::move(m_items.Front())); + m_items.Pop(); + } + } + + StdCompat::jthread m_thread; + Common::WaitableSPSCQueue m_items; + Common::Event m_event; + std::atomic_bool m_skip_work = false; + + using ProducerMutex = std::conditional_t; + ProducerMutex m_mutex; }; +} // namespace detail + +// Multiple threads may use the public interface. +template +using WorkQueueThread = detail::WorkQueueThreadBase; + +// A "Single Producer" WorkQueueThread. +// It uses no mutex but only one thread can safely manipulate the queue. +template +using WorkQueueThreadSP = detail::WorkQueueThreadBase; } // namespace Common diff --git a/Source/Core/DolphinLib.props b/Source/Core/DolphinLib.props index 67e3a15f4c..71d495cc98 100644 --- a/Source/Core/DolphinLib.props +++ b/Source/Core/DolphinLib.props @@ -158,6 +158,7 @@ + diff --git a/Source/Core/DolphinQt/GameList/GameTracker.cpp b/Source/Core/DolphinQt/GameList/GameTracker.cpp index 6031d37a32..f145045ed5 100644 --- a/Source/Core/DolphinQt/GameList/GameTracker.cpp +++ b/Source/Core/DolphinQt/GameList/GameTracker.cpp @@ -37,7 +37,7 @@ GameTracker::GameTracker(QObject* parent) : QFileSystemWatcher(parent) connect(qApp, &QApplication::aboutToQuit, this, [this] { m_processing_halted = true; - m_load_thread.Shutdown(true); + m_load_thread.StopAndCancel(); }); connect(this, &QFileSystemWatcher::directoryChanged, this, &GameTracker::UpdateDirectory); connect(this, &QFileSystemWatcher::fileChanged, this, &GameTracker::UpdateFile); diff --git a/Source/Core/VideoCommon/Assets/CustomAssetLoader.cpp b/Source/Core/VideoCommon/Assets/CustomAssetLoader.cpp index 119d7444ea..e70faa8359 100644 --- a/Source/Core/VideoCommon/Assets/CustomAssetLoader.cpp +++ b/Source/Core/VideoCommon/Assets/CustomAssetLoader.cpp @@ -69,9 +69,9 @@ void CustomAssetLoader::Init() }); } -void CustomAssetLoader ::Shutdown() +void CustomAssetLoader::Shutdown() { - m_asset_load_thread.Shutdown(true); + m_asset_load_thread.StopAndCancel(); m_asset_monitor_thread_shutdown.Set(); m_asset_monitor_thread.join(); diff --git a/Source/UnitTests/Common/CMakeLists.txt b/Source/UnitTests/Common/CMakeLists.txt index e509f160f2..3b0ffe79e6 100644 --- a/Source/UnitTests/Common/CMakeLists.txt +++ b/Source/UnitTests/Common/CMakeLists.txt @@ -19,6 +19,7 @@ add_dolphin_test(SettingsHandlerTest SettingsHandlerTest.cpp) add_dolphin_test(SPSCQueueTest SPSCQueueTest.cpp) add_dolphin_test(StringUtilTest StringUtilTest.cpp) add_dolphin_test(SwapTest SwapTest.cpp) +add_dolphin_test(WorkQueueThreadTest WorkQueueThreadTest.cpp) if (_M_X86_64) add_dolphin_test(x64EmitterTest x64EmitterTest.cpp) diff --git a/Source/UnitTests/Common/WorkQueueThreadTest.cpp b/Source/UnitTests/Common/WorkQueueThreadTest.cpp new file mode 100644 index 0000000000..a4bfc654f2 --- /dev/null +++ b/Source/UnitTests/Common/WorkQueueThreadTest.cpp @@ -0,0 +1,53 @@ +// Copyright 2025 Dolphin Emulator Project +// SPDX-License-Identifier: GPL-2.0-or-later + +#include + +#include "Common/WorkQueueThread.h" + +TEST(WorkQueueThread, Simple) +{ + Common::WorkQueueThreadSP worker; + + constexpr int BIG_VAL = 1000; + + int x = 0; + const auto func = [&](int value) { x = value; }; + + worker.Push(1); + worker.WaitForCompletion(); + // Still zero because it's not running. + EXPECT_EQ(x, 0); + + // Does nothing if not running. + worker.Shutdown(); + + worker.Reset("test worker", func); + worker.WaitForCompletion(); + // Items pushed before Reset are processed. + EXPECT_EQ(x, 1); + + worker.Shutdown(); + worker.Push(0); + worker.WaitForCompletion(); + // Still 1 because it's no longer running. + EXPECT_EQ(x, 1); + + worker.Cancel(); + worker.Reset("test worker", func); + worker.WaitForCompletion(); + // Still 1 because the work was canceled. + EXPECT_EQ(x, 1); + + for (int i = 0; i != BIG_VAL; ++i) + worker.Push(i); + worker.Cancel(); + // Could be any one of the pushed values. + EXPECT_LT(x, BIG_VAL); + GTEST_LOG_(INFO) << "Canceled work after item " << x; + + worker.Push(2); + worker.WaitForCompletion(); + // Still running after cancelation. + EXPECT_EQ(x, 2); +} diff --git a/Source/UnitTests/UnitTests.vcxproj b/Source/UnitTests/UnitTests.vcxproj index 45702ed7fd..b5e5256915 100644 --- a/Source/UnitTests/UnitTests.vcxproj +++ b/Source/UnitTests/UnitTests.vcxproj @@ -58,6 +58,7 @@ + @@ -118,4 +119,4 @@ - \ No newline at end of file +