From af960651e81edf9fd57aba8e19857a5aa48ca184 Mon Sep 17 00:00:00 2001 From: Jordan Woyak Date: Wed, 12 Mar 2025 03:21:44 -0500 Subject: [PATCH] Common: SPSCQueue cleanups and improvements. --- Source/Core/Common/SPSCQueue.h | 143 +++++++++++-------- Source/Core/Core/CoreTiming.h | 2 +- Source/Core/Core/HW/DVD/DVDThread.h | 4 +- Source/Core/Core/NetPlayClient.h | 2 +- Source/Core/Core/NetPlayServer.h | 4 +- Source/Core/VideoCommon/PerformanceTracker.h | 2 +- Source/UnitTests/Common/SPSCQueueTest.cpp | 45 ++++-- 7 files changed, 121 insertions(+), 81 deletions(-) diff --git a/Source/Core/Common/SPSCQueue.h b/Source/Core/Common/SPSCQueue.h index 27a0a8f282..b586012226 100644 --- a/Source/Core/Common/SPSCQueue.h +++ b/Source/Core/Common/SPSCQueue.h @@ -6,104 +6,123 @@ // a simple lockless thread-safe, // single producer, single consumer queue -#include #include -#include - -#include "Common/CommonTypes.h" +#include +#include namespace Common { -template -class SPSCQueue + +namespace detail +{ +template +class SPSCQueueBase final { public: - SPSCQueue() : m_size(0) { m_write_ptr = m_read_ptr = new ElementPtr(); } - ~SPSCQueue() + SPSCQueueBase() = default; + ~SPSCQueueBase() { - // this will empty out the whole queue + Clear(); delete m_read_ptr; } - u32 Size() const + SPSCQueueBase(const SPSCQueueBase&) = delete; + SPSCQueueBase& operator=(const SPSCQueueBase&) = delete; + + std::size_t Size() const { return m_size.load(std::memory_order_acquire); } + bool Empty() const { return Size() == 0; } + + // The following are only safe from the "producer thread": + void Push(const T& arg) { Emplace(arg); } + void Push(T&& arg) { Emplace(std::move(arg)); } + template + void Emplace(Args&&... args) { - static_assert(NeedSize, "using Size() on SPSCQueue without NeedSize"); - return m_size.load(); + std::construct_at(&m_write_ptr->value.data, std::forward(args)...); + + Node* const new_ptr = new Node; + m_write_ptr->next = new_ptr; + m_write_ptr = new_ptr; + + AdjustSize(1); } - bool Empty() const { return !m_read_ptr->next.load(); } - T& Front() const { return m_read_ptr->current; } - template - void Push(Arg&& t) + void WaitForEmpty() requires(IncludeWaitFunctionality) { - // create the element, add it to the queue - m_write_ptr->current = std::forward(t); - // set the next pointer to a new element ptr - // then advance the write pointer - ElementPtr* new_ptr = new ElementPtr(); - m_write_ptr->next.store(new_ptr, std::memory_order_release); - m_write_ptr = new_ptr; - if (NeedSize) - m_size++; + while (const std::size_t old_size = Size()) + m_size.wait(old_size, std::memory_order_acquire); } + // The following are only safe from the "consumer thread": + T& Front() { return m_read_ptr->value.data; } + const T& Front() const { return m_read_ptr->value.data; } + void Pop() { - if (NeedSize) - m_size--; - ElementPtr* tmpptr = m_read_ptr; - // advance the read pointer - m_read_ptr = tmpptr->next.load(); - // set the next element to nullptr to stop the recursive deletion - tmpptr->next.store(nullptr); - delete tmpptr; // this also deletes the element + assert(!Empty()); + + std::destroy_at(&Front()); + + Node* const old_node = m_read_ptr; + m_read_ptr = old_node->next; + delete old_node; + + AdjustSize(-1); } - bool Pop(T& t) + bool Pop(T& result) { if (Empty()) return false; - if (NeedSize) - m_size--; - - ElementPtr* tmpptr = m_read_ptr; - m_read_ptr = tmpptr->next.load(std::memory_order_acquire); - t = std::move(tmpptr->current); - tmpptr->next.store(nullptr); - delete tmpptr; + result = std::move(Front()); + Pop(); return true; } - // not thread-safe + void WaitForData() requires(IncludeWaitFunctionality) + { + m_size.wait(0, std::memory_order_acquire); + } + void Clear() { - m_size.store(0); - delete m_read_ptr; - m_write_ptr = m_read_ptr = new ElementPtr(); + while (!Empty()) + Pop(); } private: - // stores a pointer to element - // and a pointer to the next ElementPtr - class ElementPtr + struct Node { - public: - ElementPtr() : next(nullptr) {} - ~ElementPtr() + // union allows value construction to be deferred until Push. + union Value { - ElementPtr* next_ptr = next.load(); + T data; + Value() {} + ~Value() {} + } value; - if (next_ptr) - delete next_ptr; - } - - T current{}; - std::atomic next; + Node* next; }; - ElementPtr* m_write_ptr; - ElementPtr* m_read_ptr; - std::atomic m_size; + Node* m_write_ptr = new Node; + Node* m_read_ptr = m_write_ptr; + + void AdjustSize(std::size_t value) + { + m_size.fetch_add(value, std::memory_order_release); + if constexpr (IncludeWaitFunctionality) + m_size.notify_one(); + } + + std::atomic m_size = 0; }; +} // namespace detail + +template +using SPSCQueue = detail::SPSCQueueBase; + +template +using WaitableSPSCQueue = detail::SPSCQueueBase; + } // namespace Common diff --git a/Source/Core/Core/CoreTiming.h b/Source/Core/Core/CoreTiming.h index f0ed02ea64..3ffa5d897f 100644 --- a/Source/Core/Core/CoreTiming.h +++ b/Source/Core/Core/CoreTiming.h @@ -184,7 +184,7 @@ private: std::vector m_event_queue; u64 m_event_fifo_id = 0; std::mutex m_ts_write_lock; - Common::SPSCQueue m_ts_queue; + Common::SPSCQueue m_ts_queue; float m_last_oc_factor = 0.0f; diff --git a/Source/Core/Core/HW/DVD/DVDThread.h b/Source/Core/Core/HW/DVD/DVDThread.h index 6f9e23f37d..4ac0f78bb3 100644 --- a/Source/Core/Core/HW/DVD/DVDThread.h +++ b/Source/Core/Core/HW/DVD/DVDThread.h @@ -135,8 +135,8 @@ private: Common::Event m_result_queue_expanded; // Is set by DVD thread Common::Flag m_dvd_thread_exiting = Common::Flag(false); // Is set by CPU thread - Common::SPSCQueue m_request_queue; - Common::SPSCQueue m_result_queue; + Common::SPSCQueue m_request_queue; + Common::SPSCQueue m_result_queue; std::map m_result_map; std::unique_ptr m_disc; diff --git a/Source/Core/Core/NetPlayClient.h b/Source/Core/Core/NetPlayClient.h index 7ab716aef7..b5ba32bcb4 100644 --- a/Source/Core/Core/NetPlayClient.h +++ b/Source/Core/Core/NetPlayClient.h @@ -196,7 +196,7 @@ protected: std::recursive_mutex async_queue_write; } m_crit; - Common::SPSCQueue m_async_queue; + Common::SPSCQueue m_async_queue; std::array, 4> m_pad_buffer; std::array, 4> m_wiimote_buffer; diff --git a/Source/Core/Core/NetPlayServer.h b/Source/Core/Core/NetPlayServer.h index 621b7cfe40..12bfebb323 100644 --- a/Source/Core/Core/NetPlayServer.h +++ b/Source/Core/Core/NetPlayServer.h @@ -197,8 +197,8 @@ private: std::recursive_mutex chunked_data_queue_write; } m_crit; - Common::SPSCQueue m_async_queue; - Common::SPSCQueue m_chunked_data_queue; + Common::SPSCQueue m_async_queue; + Common::SPSCQueue m_chunked_data_queue; SyncIdentifier m_selected_game_identifier; std::string m_selected_game_name; diff --git a/Source/Core/VideoCommon/PerformanceTracker.h b/Source/Core/VideoCommon/PerformanceTracker.h index e45b5aab4e..fac6435101 100644 --- a/Source/Core/VideoCommon/PerformanceTracker.h +++ b/Source/Core/VideoCommon/PerformanceTracker.h @@ -59,7 +59,7 @@ private: // Push'd from Count() // and Pop'd from UpdateStats() - Common::SPSCQueue m_raw_dts; + Common::SPSCQueue
m_raw_dts; std::atomic
m_last_raw_dt = DT::zero(); // Amount of time to sample dt's over (defaults to config) diff --git a/Source/UnitTests/Common/SPSCQueueTest.cpp b/Source/UnitTests/Common/SPSCQueueTest.cpp index 1673395027..93b3e584fd 100644 --- a/Source/UnitTests/Common/SPSCQueueTest.cpp +++ b/Source/UnitTests/Common/SPSCQueueTest.cpp @@ -2,8 +2,11 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include + +#include #include +#include "Common/CommonTypes.h" #include "Common/SPSCQueue.h" TEST(SPSCQueue, Simple) @@ -44,21 +47,36 @@ TEST(SPSCQueue, Simple) TEST(SPSCQueue, MultiThreaded) { - Common::SPSCQueue q; - - auto inserter = [&q]() { - for (u32 i = 0; i < 100000; ++i) - q.Push(i); + struct Foo + { + std::shared_ptr ptr; + u32 i; }; - auto popper = [&q]() { - for (u32 i = 0; i < 100000; ++i) + // A shared_ptr held by every element in the queue. + auto sptr = std::make_shared(0); + + auto queue_ptr = std::make_unique>(); + auto& q = *queue_ptr; + + constexpr u32 reps = 100000; + + auto inserter = [&]() { + for (u32 i = 0; i != reps; ++i) + q.Push({sptr, i}); + + q.WaitForEmpty(); + EXPECT_EQ(sptr.use_count(), 1); + q.Push({sptr, 0}); + EXPECT_EQ(sptr.use_count(), 2); + }; + + auto popper = [&]() { + for (u32 i = 0; i != reps; ++i) { - while (q.Empty()) - ; - u32 v; - q.Pop(v); - EXPECT_EQ(i, v); + q.WaitForData(); + EXPECT_EQ(i, q.Front().i); + q.Pop(); } }; @@ -67,4 +85,7 @@ TEST(SPSCQueue, MultiThreaded) popper_thread.join(); inserter_thread.join(); + + queue_ptr.reset(); + EXPECT_EQ(sptr.use_count(), 1); }