Merge pull request #13431 from jordan-woyak/spsc-waiting

Common: SPSCQueue cleanups and improvements.
This commit is contained in:
Admiral H. Curtiss 2025-04-23 22:19:15 +02:00 committed by GitHub
commit 879a8889aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 121 additions and 81 deletions

View file

@ -6,104 +6,123 @@
// a simple lockless thread-safe, // a simple lockless thread-safe,
// single producer, single consumer queue // single producer, single consumer queue
#include <algorithm>
#include <atomic> #include <atomic>
#include <cstddef> #include <cassert>
#include <memory>
#include "Common/CommonTypes.h"
namespace Common namespace Common
{ {
template <typename T, bool NeedSize = true>
class SPSCQueue namespace detail
{
template <typename T, bool IncludeWaitFunctionality>
class SPSCQueueBase final
{ {
public: public:
SPSCQueue() : m_size(0) { m_write_ptr = m_read_ptr = new ElementPtr(); } SPSCQueueBase() = default;
~SPSCQueue() ~SPSCQueueBase()
{ {
// this will empty out the whole queue Clear();
delete m_read_ptr; delete m_read_ptr;
} }
u32 Size() const SPSCQueueBase(const SPSCQueueBase&) = delete;
SPSCQueueBase& operator=(const SPSCQueueBase&) = delete;
std::size_t Size() const { return m_size.load(std::memory_order_acquire); }
bool Empty() const { return Size() == 0; }
// The following are only safe from the "producer thread":
void Push(const T& arg) { Emplace(arg); }
void Push(T&& arg) { Emplace(std::move(arg)); }
template <typename... Args>
void Emplace(Args&&... args)
{ {
static_assert(NeedSize, "using Size() on SPSCQueue without NeedSize"); std::construct_at(&m_write_ptr->value.data, std::forward<Args>(args)...);
return m_size.load();
Node* const new_ptr = new Node;
m_write_ptr->next = new_ptr;
m_write_ptr = new_ptr;
AdjustSize(1);
} }
bool Empty() const { return !m_read_ptr->next.load(); } void WaitForEmpty() requires(IncludeWaitFunctionality)
T& Front() const { return m_read_ptr->current; }
template <typename Arg>
void Push(Arg&& t)
{ {
// create the element, add it to the queue while (const std::size_t old_size = Size())
m_write_ptr->current = std::forward<Arg>(t); m_size.wait(old_size, std::memory_order_acquire);
// set the next pointer to a new element ptr
// then advance the write pointer
ElementPtr* new_ptr = new ElementPtr();
m_write_ptr->next.store(new_ptr, std::memory_order_release);
m_write_ptr = new_ptr;
if (NeedSize)
m_size++;
} }
// The following are only safe from the "consumer thread":
T& Front() { return m_read_ptr->value.data; }
const T& Front() const { return m_read_ptr->value.data; }
void Pop() void Pop()
{ {
if (NeedSize) assert(!Empty());
m_size--;
ElementPtr* tmpptr = m_read_ptr; std::destroy_at(&Front());
// advance the read pointer
m_read_ptr = tmpptr->next.load(); Node* const old_node = m_read_ptr;
// set the next element to nullptr to stop the recursive deletion m_read_ptr = old_node->next;
tmpptr->next.store(nullptr); delete old_node;
delete tmpptr; // this also deletes the element
AdjustSize(-1);
} }
bool Pop(T& t) bool Pop(T& result)
{ {
if (Empty()) if (Empty())
return false; return false;
if (NeedSize) result = std::move(Front());
m_size--; Pop();
ElementPtr* tmpptr = m_read_ptr;
m_read_ptr = tmpptr->next.load(std::memory_order_acquire);
t = std::move(tmpptr->current);
tmpptr->next.store(nullptr);
delete tmpptr;
return true; return true;
} }
// not thread-safe void WaitForData() requires(IncludeWaitFunctionality)
{
m_size.wait(0, std::memory_order_acquire);
}
void Clear() void Clear()
{ {
m_size.store(0); while (!Empty())
delete m_read_ptr; Pop();
m_write_ptr = m_read_ptr = new ElementPtr();
} }
private: private:
// stores a pointer to element struct Node
// and a pointer to the next ElementPtr
class ElementPtr
{ {
public: // union allows value construction to be deferred until Push.
ElementPtr() : next(nullptr) {} union Value
~ElementPtr()
{ {
ElementPtr* next_ptr = next.load(); T data;
Value() {}
~Value() {}
} value;
if (next_ptr) Node* next;
delete next_ptr;
}
T current{};
std::atomic<ElementPtr*> next;
}; };
ElementPtr* m_write_ptr; Node* m_write_ptr = new Node;
ElementPtr* m_read_ptr; Node* m_read_ptr = m_write_ptr;
std::atomic<u32> m_size;
void AdjustSize(std::size_t value)
{
m_size.fetch_add(value, std::memory_order_release);
if constexpr (IncludeWaitFunctionality)
m_size.notify_one();
}
std::atomic<std::size_t> m_size = 0;
}; };
} // namespace detail
template <typename T>
using SPSCQueue = detail::SPSCQueueBase<T, false>;
template <typename T>
using WaitableSPSCQueue = detail::SPSCQueueBase<T, true>;
} // namespace Common } // namespace Common

View file

@ -184,7 +184,7 @@ private:
std::vector<Event> m_event_queue; std::vector<Event> m_event_queue;
u64 m_event_fifo_id = 0; u64 m_event_fifo_id = 0;
std::mutex m_ts_write_lock; std::mutex m_ts_write_lock;
Common::SPSCQueue<Event, false> m_ts_queue; Common::SPSCQueue<Event> m_ts_queue;
float m_last_oc_factor = 0.0f; float m_last_oc_factor = 0.0f;

View file

@ -135,8 +135,8 @@ private:
Common::Event m_result_queue_expanded; // Is set by DVD thread Common::Event m_result_queue_expanded; // Is set by DVD thread
Common::Flag m_dvd_thread_exiting = Common::Flag(false); // Is set by CPU thread Common::Flag m_dvd_thread_exiting = Common::Flag(false); // Is set by CPU thread
Common::SPSCQueue<ReadRequest, false> m_request_queue; Common::SPSCQueue<ReadRequest> m_request_queue;
Common::SPSCQueue<ReadResult, false> m_result_queue; Common::SPSCQueue<ReadResult> m_result_queue;
std::map<u64, ReadResult> m_result_map; std::map<u64, ReadResult> m_result_map;
std::unique_ptr<DiscIO::Volume> m_disc; std::unique_ptr<DiscIO::Volume> m_disc;

View file

@ -196,7 +196,7 @@ protected:
std::recursive_mutex async_queue_write; std::recursive_mutex async_queue_write;
} m_crit; } m_crit;
Common::SPSCQueue<AsyncQueueEntry, false> m_async_queue; Common::SPSCQueue<AsyncQueueEntry> m_async_queue;
std::array<Common::SPSCQueue<GCPadStatus>, 4> m_pad_buffer; std::array<Common::SPSCQueue<GCPadStatus>, 4> m_pad_buffer;
std::array<Common::SPSCQueue<WiimoteEmu::SerializedWiimoteState>, 4> m_wiimote_buffer; std::array<Common::SPSCQueue<WiimoteEmu::SerializedWiimoteState>, 4> m_wiimote_buffer;

View file

@ -197,8 +197,8 @@ private:
std::recursive_mutex chunked_data_queue_write; std::recursive_mutex chunked_data_queue_write;
} m_crit; } m_crit;
Common::SPSCQueue<AsyncQueueEntry, false> m_async_queue; Common::SPSCQueue<AsyncQueueEntry> m_async_queue;
Common::SPSCQueue<ChunkedDataQueueEntry, false> m_chunked_data_queue; Common::SPSCQueue<ChunkedDataQueueEntry> m_chunked_data_queue;
SyncIdentifier m_selected_game_identifier; SyncIdentifier m_selected_game_identifier;
std::string m_selected_game_name; std::string m_selected_game_name;

View file

@ -59,7 +59,7 @@ private:
// Push'd from Count() // Push'd from Count()
// and Pop'd from UpdateStats() // and Pop'd from UpdateStats()
Common::SPSCQueue<DT, false> m_raw_dts; Common::SPSCQueue<DT> m_raw_dts;
std::atomic<DT> m_last_raw_dt = DT::zero(); std::atomic<DT> m_last_raw_dt = DT::zero();
// Amount of time to sample dt's over (defaults to config) // Amount of time to sample dt's over (defaults to config)

View file

@ -2,8 +2,11 @@
// SPDX-License-Identifier: GPL-2.0-or-later // SPDX-License-Identifier: GPL-2.0-or-later
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <memory>
#include <thread> #include <thread>
#include "Common/CommonTypes.h"
#include "Common/SPSCQueue.h" #include "Common/SPSCQueue.h"
TEST(SPSCQueue, Simple) TEST(SPSCQueue, Simple)
@ -44,21 +47,36 @@ TEST(SPSCQueue, Simple)
TEST(SPSCQueue, MultiThreaded) TEST(SPSCQueue, MultiThreaded)
{ {
Common::SPSCQueue<u32> q; struct Foo
{
auto inserter = [&q]() { std::shared_ptr<int> ptr;
for (u32 i = 0; i < 100000; ++i) u32 i;
q.Push(i);
}; };
auto popper = [&q]() { // A shared_ptr held by every element in the queue.
for (u32 i = 0; i < 100000; ++i) auto sptr = std::make_shared<int>(0);
auto queue_ptr = std::make_unique<Common::WaitableSPSCQueue<Foo>>();
auto& q = *queue_ptr;
constexpr u32 reps = 100000;
auto inserter = [&]() {
for (u32 i = 0; i != reps; ++i)
q.Push({sptr, i});
q.WaitForEmpty();
EXPECT_EQ(sptr.use_count(), 1);
q.Push({sptr, 0});
EXPECT_EQ(sptr.use_count(), 2);
};
auto popper = [&]() {
for (u32 i = 0; i != reps; ++i)
{ {
while (q.Empty()) q.WaitForData();
; EXPECT_EQ(i, q.Front().i);
u32 v; q.Pop();
q.Pop(v);
EXPECT_EQ(i, v);
} }
}; };
@ -67,4 +85,7 @@ TEST(SPSCQueue, MultiThreaded)
popper_thread.join(); popper_thread.join();
inserter_thread.join(); inserter_thread.join();
queue_ptr.reset();
EXPECT_EQ(sptr.use_count(), 1);
} }