fix: make command queue thread-safe with CAS and memory fences

Co-authored-by: aider (deepseek/deepseek-coder) <aider@aider.chat>
This commit is contained in:
Loic Coenen
2026-05-01 23:41:27 +00:00
parent 5423daabb1
commit 31653d8b4a

View File

@@ -248,8 +248,12 @@ int engine_submit_command(Engine *engine, CommandType type, int index, jack_nfra
if (!engine) return -1; if (!engine) return -1;
CommandQueue *q = &engine->command_queue; CommandQueue *q = &engine->command_queue;
unsigned int write = atomic_load(&q->write_index);
unsigned int read = atomic_load(&q->read_index); // Use CAS to atomically claim a slot
unsigned int write, next_write, read;
do {
write = atomic_load(&q->write_index);
read = atomic_load(&q->read_index);
// Check if queue is full // Check if queue is full
if ((write - read) >= MAX_QUEUED_COMMANDS) { if ((write - read) >= MAX_QUEUED_COMMANDS) {
@@ -257,13 +261,17 @@ int engine_submit_command(Engine *engine, CommandType type, int index, jack_nfra
return -1; return -1;
} }
next_write = write + 1;
} while (!atomic_compare_exchange_weak(&q->write_index, &write, next_write));
// We now own this slot exclusively
unsigned int slot = write % MAX_QUEUED_COMMANDS; unsigned int slot = write % MAX_QUEUED_COMMANDS;
q->buffer[slot].type = type; q->buffer[slot].type = type;
q->buffer[slot].index = index; q->buffer[slot].index = index;
q->buffer[slot].value = value; q->buffer[slot].value = value;
// Memory barrier ensures buffer write completes before write_index update // Release fence ensures the buffer write is visible before any consumer reads it
atomic_store(&q->write_index, write + 1); atomic_thread_fence(memory_order_release);
return 0; return 0;
} }
@@ -273,11 +281,15 @@ void engine_process_commands(Engine *engine) {
if (!engine) return; if (!engine) return;
CommandQueue *q = &engine->command_queue; CommandQueue *q = &engine->command_queue;
unsigned int write = atomic_load(&q->write_index);
unsigned int read = atomic_load(&q->read_index); unsigned int read = atomic_load(&q->read_index);
unsigned int write = atomic_load(&q->write_index);
while (read < write) { while (read < write) {
unsigned int slot = read % MAX_QUEUED_COMMANDS; unsigned int slot = read % MAX_QUEUED_COMMANDS;
// Acquire fence ensures we see the fully written command data
atomic_thread_fence(memory_order_acquire);
Command cmd = q->buffer[slot]; Command cmd = q->buffer[slot];
// Process the command directly (we're in the audio thread) // Process the command directly (we're in the audio thread)
@@ -363,7 +375,7 @@ void engine_process_commands(Engine *engine) {
case CMD_RESET_CLIP: { case CMD_RESET_CLIP: {
if (cmd.index < 0 || cmd.index >= MAX_CLIPS) break; if (cmd.index < 0 || cmd.index >= MAX_CLIPS) break;
Clip *clip = &engine->clips[cmd.index]; Clip *clip = &engine->clips[cmd.index];
if (!clip->buffer) break; // ADD THIS - prevent segfault on freed buffer if (!clip->buffer) break;
// Record undo action // Record undo action
UndoAction action; UndoAction action;
@@ -503,10 +515,9 @@ void engine_process_commands(Engine *engine) {
} }
read++; read++;
} // Store read_index after processing each command
// Update read index after processing all commands
atomic_store(&q->read_index, read); atomic_store(&q->read_index, read);
}
} }
// Push an action to the undo history // Push an action to the undo history