diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index be723299b2..a9fe55b45f 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -4,169 +4,167 @@ #pragma once #include -#include #include -#include +#include #include -#include -#include +#include "Common/Event.h" +#include "Common/SPSCQueue.h" +#include "Common/StdJThread.h" #include "Common/Thread.h" // A thread that executes the given function for every item placed into its queue. namespace Common { -template -class WorkQueueThread +namespace detail +{ +template +class WorkQueueThreadBase final { public: - WorkQueueThread() = default; - WorkQueueThread(const std::string_view name, std::function function) + WorkQueueThreadBase() = default; + WorkQueueThreadBase(std::string name, std::function function) { - Reset(name, std::move(function)); + Reset(std::move(name), std::move(function)); } - ~WorkQueueThread() { Shutdown(); } + ~WorkQueueThreadBase() { Shutdown(); } // Shuts the current work thread down (if any) and starts a new thread with the given function // Note: Some consumers of this API push items to the queue before starting the thread. - void Reset(const std::string_view name, std::function function) + void Reset(std::string name, std::function function) { + auto lg = GetLockGuard(); Shutdown(); - std::lock_guard lg(m_lock); - m_thread_name = name; - m_shutdown = false; - m_function = std::move(function); - m_thread = std::thread(&WorkQueueThread::ThreadLoop, this); + m_thread = StdCompat::jthread(std::bind_front(&WorkQueueThreadBase::ThreadLoop, this), + std::move(name), std::move(function)); } // Adds an item to the work queue template void EmplaceItem(Args&&... args) { - std::lock_guard lg(m_lock); - if (m_shutdown) - return; - - m_items.emplace(std::forward(args)...); - m_idle = false; - m_worker_cond_var.notify_one(); + auto lg = GetLockGuard(); + m_items.Emplace(std::forward(args)...); + m_event.Set(); } + void Push(T&& item) { EmplaceItem(std::move(item)); } + void Push(const T& item) { EmplaceItem(item); } - // Adds an item to the work queue - void Push(T&& item) - { - std::lock_guard lg(m_lock); - if (m_shutdown) - return; - - m_items.push(std::move(item)); - m_idle = false; - m_worker_cond_var.notify_one(); - } - - // Adds an item to the work queue - void Push(const T& item) - { - std::lock_guard lg(m_lock); - if (m_shutdown) - return; - - m_items.push(item); - m_idle = false; - m_worker_cond_var.notify_one(); - } - - // Empties the queue - // If the worker polls IsCanceling(), it can abort it's work when Cancelling + // Empties the queue, skipping all work. + // Blocks until the current work is cancelled. void Cancel() { - std::unique_lock lg(m_lock); - if (m_shutdown) - return; - - m_cancelling = true; - m_items = std::queue(); - m_worker_cond_var.notify_one(); + auto lg = GetLockGuard(); + if (IsRunning()) + { + m_skip_work.store(true, std::memory_order_relaxed); + WaitForCompletion(); + m_skip_work.store(false, std::memory_order_relaxed); + } + else + { + m_items.Clear(); + } } - // Tells the worker to shut down when it's queue is empty - // Blocks until the worker thread exits. - // If cancel is true, will Cancel before before telling the worker to exit - // Otherwise, all currently queued items will complete before the worker exits - void Shutdown(bool cancel = false) + // Tells the worker thread to stop when its queue is empty. + // Blocks until the worker thread exits. Does nothing if thread isn't running. + void Shutdown() { StopThread(true); } + + // Tells the worker thread to stop immediately, potentially leaving work in the queue. + // Blocks until the worker thread exits. Does nothing if thread isn't running. + void Stop() { StopThread(false); } + + // Stops the worker thread ASAP and empties the queue. + void StopAndCancel() { - { - std::unique_lock lg(m_lock); - if (m_shutdown || !m_thread.joinable()) - return; - - if (cancel) - { - m_cancelling = true; - m_items = std::queue(); - } - - m_shutdown = true; - m_worker_cond_var.notify_one(); - } - - m_thread.join(); + auto lg = GetLockGuard(); + Stop(); + Cancel(); } // Blocks until all items in the queue have been processed (or cancelled) + // Does nothing if thread isn't running. void WaitForCompletion() { - std::unique_lock lg(m_lock); - // don't check m_shutdown, because it gets set to request a shutdown, and we want to wait until - // after the shutdown completes. - // We also check m_cancelling, because we want to ensure the worker acknowledges our cancel. - if (m_idle && !m_cancelling.load()) - return; - - m_wait_cond_var.wait(lg, [&] { return m_idle && !m_cancelling; }); + auto lg = GetLockGuard(); + if (IsRunning()) + m_items.WaitForEmpty(); } - // If the worker polls IsCanceling(), it can abort its work when Cancelling - bool IsCancelling() const { return m_cancelling.load(); } - private: - void ThreadLoop() + void StopThread(bool wait_for_completion) { - Common::SetCurrentThreadName(m_thread_name.c_str()); + auto lg = GetLockGuard(); - while (true) + if (wait_for_completion) + WaitForCompletion(); + + if (m_thread.request_stop()) { - std::unique_lock lg(m_lock); - while (m_items.empty()) - { - m_idle = true; - m_cancelling = false; - m_wait_cond_var.notify_all(); - if (m_shutdown) - return; - - m_worker_cond_var.wait( - lg, [&] { return !m_items.empty() || m_shutdown || m_cancelling.load(); }); - } - T item{std::move(m_items.front())}; - m_items.pop(); - lg.unlock(); - - m_function(std::move(item)); + m_event.Set(); + m_thread.join(); } } - std::function m_function; - std::string m_thread_name; - std::thread m_thread; - std::mutex m_lock; - std::queue m_items; - std::condition_variable m_wait_cond_var; - std::condition_variable m_worker_cond_var; - std::atomic m_cancelling = false; - bool m_idle = true; - bool m_shutdown = false; + auto GetLockGuard() + { + struct DummyLockGuard + { + // Silences unused variable warning. + ~DummyLockGuard() {} + }; + + if constexpr (IsSingleProducer) + return DummyLockGuard{}; + else + return std::lock_guard(m_mutex); + } + + bool IsRunning() { return m_thread.joinable(); } + + void ThreadLoop(const StdCompat::stop_token& stoken, const std::string& thread_name, + const std::function& function) + { + Common::SetCurrentThreadName(thread_name.c_str()); + + while (!stoken.stop_requested()) + { + if (m_items.Empty()) + { + m_event.Wait(); + continue; + } + + if (m_skip_work.load(std::memory_order_relaxed)) + { + m_items.Clear(); + continue; + } + + function(std::move(m_items.Front())); + m_items.Pop(); + } + } + + StdCompat::jthread m_thread; + Common::WaitableSPSCQueue m_items; + Common::Event m_event; + std::atomic_bool m_skip_work = false; + + using ProducerMutex = std::conditional_t; + ProducerMutex m_mutex; }; +} // namespace detail + +// Multiple threads may use the public interface. +template +using WorkQueueThread = detail::WorkQueueThreadBase; + +// A "Single Producer" WorkQueueThread. +// It uses no mutex but only one thread can safely manipulate the queue. +template +using WorkQueueThreadSP = detail::WorkQueueThreadBase; } // namespace Common diff --git a/Source/Core/DolphinQt/GameList/GameTracker.cpp b/Source/Core/DolphinQt/GameList/GameTracker.cpp index 6031d37a32..f145045ed5 100644 --- a/Source/Core/DolphinQt/GameList/GameTracker.cpp +++ b/Source/Core/DolphinQt/GameList/GameTracker.cpp @@ -37,7 +37,7 @@ GameTracker::GameTracker(QObject* parent) : QFileSystemWatcher(parent) connect(qApp, &QApplication::aboutToQuit, this, [this] { m_processing_halted = true; - m_load_thread.Shutdown(true); + m_load_thread.StopAndCancel(); }); connect(this, &QFileSystemWatcher::directoryChanged, this, &GameTracker::UpdateDirectory); connect(this, &QFileSystemWatcher::fileChanged, this, &GameTracker::UpdateFile); diff --git a/Source/Core/VideoCommon/Assets/CustomAssetLoader.cpp b/Source/Core/VideoCommon/Assets/CustomAssetLoader.cpp index 119d7444ea..e70faa8359 100644 --- a/Source/Core/VideoCommon/Assets/CustomAssetLoader.cpp +++ b/Source/Core/VideoCommon/Assets/CustomAssetLoader.cpp @@ -69,9 +69,9 @@ void CustomAssetLoader::Init() }); } -void CustomAssetLoader ::Shutdown() +void CustomAssetLoader::Shutdown() { - m_asset_load_thread.Shutdown(true); + m_asset_load_thread.StopAndCancel(); m_asset_monitor_thread_shutdown.Set(); m_asset_monitor_thread.join();