From 7b23c75dd162f311f9fbc154ed9778c77b52e56c Mon Sep 17 00:00:00 2001 From: Loic Coenen Date: Fri, 1 May 2026 14:52:53 +0000 Subject: [PATCH] feat: implement lock-free command queue and atomic state for thread safety Co-authored-by: aider (deepseek/deepseek-coder) --- engine.c | 267 ++++++++++++++++++++++++++++++++++++++------------ engine.h | 44 +++++++++ test_cli.c | 13 +++ test_engine.c | 13 +++ test_tui.c | 13 +++ tui.c | 6 +- 6 files changed, 293 insertions(+), 63 deletions(-) diff --git a/engine.c b/engine.c index 9de91f2..fbe93c2 100644 --- a/engine.c +++ b/engine.c @@ -3,6 +3,7 @@ #include #include #include +#include // Forward declarations static void process_queued_triggers(Engine *engine, jack_nframes_t current_frame); @@ -12,6 +13,9 @@ static jack_nframes_t get_next_quantize_frame(Engine *engine, jack_nframes_t cur static int process_callback(jack_nframes_t nframes, void *arg) { Engine *engine = (Engine *)arg; + // Process commands from frontend threads + engine_process_commands(engine); + // Get per-channel audio buffers jack_default_audio_sample_t *audio_in[MAX_CHANNELS]; jack_default_audio_sample_t *audio_out[MAX_CHANNELS]; @@ -48,11 +52,17 @@ static int process_callback(jack_nframes_t nframes, void *arg) { (engine->transport.clock_count * engine->sample_rate * 4) / (MIDI_CLOCKS_PER_BEAT * BEATS_PER_BAR); + // Update atomic mirrors for frontend reads + atomic_store(&engine->transport_clock_count, engine->transport.clock_count); + atomic_store(&engine->transport_sample_position, engine->transport.sample_position); + if (engine->transport.clock_count % MIDI_CLOCKS_PER_BEAT == 0) { engine->transport.beat_position = (engine->transport.beat_position + 1) % BEATS_PER_BAR; + atomic_store(&engine->transport_beat_position, engine->transport.beat_position); if (engine->transport.beat_position == 0) { engine->transport.bar_position++; + atomic_store(&engine->transport_bar_position, engine->transport.bar_position); } } } else if (status == 0xFA) { // MIDI Start @@ -61,10 +71,17 @@ static int process_callback(jack_nframes_t nframes, void *arg) { engine->transport.beat_position = 0; engine->transport.bar_position = 0; engine->transport.sample_position = 0; + atomic_store(&engine->transport_rolling, 1); + atomic_store(&engine->transport_clock_count, 0); + atomic_store(&engine->transport_beat_position, 0); + atomic_store(&engine->transport_bar_position, 0); + atomic_store(&engine->transport_sample_position, 0); } else if (status == 0xFC) { // MIDI Stop engine->transport.rolling = false; + atomic_store(&engine->transport_rolling, 0); } else if (status == 0xFB) { // MIDI Continue engine->transport.rolling = true; + atomic_store(&engine->transport_rolling, 1); } // Pass through clock messages @@ -89,7 +106,10 @@ static int process_callback(jack_nframes_t nframes, void *arg) { if (status == 0x90 && channel == engine->control_channel && velocity > 0) { int clip_index = note % MAX_CLIPS; - if (engine->quantize_mode != QUANTIZE_OFF && engine->transport.rolling) { + // Read quantize mode atomically (frontend may update it) + QuantizeMode current_quantize = (QuantizeMode)atomic_load(&engine->quantize_mode_atomic); + + if (current_quantize != QUANTIZE_OFF && engine->transport.rolling) { // Queue for quantization jack_nframes_t trigger_time = midi_event.time; queue_trigger(engine, clip_index, false, trigger_time); @@ -128,7 +148,10 @@ static int process_callback(jack_nframes_t nframes, void *arg) { if (status == 0x90 && velocity > 0) { int scene_index = note % MAX_SCENES; - if (engine->quantize_mode != QUANTIZE_OFF && engine->transport.rolling) { + // Read quantize mode atomically (frontend may update it) + QuantizeMode current_quantize = (QuantizeMode)atomic_load(&engine->quantize_mode_atomic); + + if (current_quantize != QUANTIZE_OFF && engine->transport.rolling) { // Queue for quantization jack_nframes_t trigger_time = midi_event.time; queue_trigger(engine, scene_index, true, trigger_time); @@ -261,6 +284,160 @@ static void process_queued_triggers(Engine *engine, jack_nframes_t nframes) { } } +// Initialize command queue +static void command_queue_init(CommandQueue *q) { + atomic_store(&q->write_index, 0); + atomic_store(&q->read_index, 0); +} + +// Submit command from frontend thread (non-blocking) +int engine_submit_command(Engine *engine, CommandType type, int index, jack_nframes_t value) { + if (!engine) return -1; + + CommandQueue *q = &engine->command_queue; + unsigned int write = atomic_load(&q->write_index); + unsigned int read = atomic_load(&q->read_index); + + // Check if queue is full + if ((write - read) >= MAX_QUEUED_COMMANDS) { + fprintf(stderr, "Command queue full, dropping command\n"); + return -1; + } + + unsigned int slot = write % MAX_QUEUED_COMMANDS; + q->buffer[slot].type = type; + q->buffer[slot].index = index; + q->buffer[slot].value = value; + + // Memory barrier ensures buffer write completes before write_index update + atomic_store(&q->write_index, write + 1); + + return 0; +} + +// Process pending commands (called from audio thread only) +void engine_process_commands(Engine *engine) { + if (!engine) return; + + CommandQueue *q = &engine->command_queue; + unsigned int write = atomic_load(&q->write_index); + unsigned int read = atomic_load(&q->read_index); + + while (read < write) { + unsigned int slot = read % MAX_QUEUED_COMMANDS; + Command cmd = q->buffer[slot]; + + // Process the command directly (we're in the audio thread) + switch (cmd.type) { + case CMD_TRIGGER_CLIP: { + if (cmd.index < 0 || cmd.index >= MAX_CLIPS) break; + Clip *clip = &engine->clips[cmd.index]; + + switch (clip->state) { + case CLIP_EMPTY: + clip->state = CLIP_RECORDING; + clip->write_position = 0; + clip->buffer_size = 0; + clip->read_position = 0; + printf("Clip %d (scene %d, channel %d): Recording started\n", + cmd.index, cmd.index / MAX_CHANNELS, cmd.index % MAX_CHANNELS); + break; + case CLIP_RECORDING: + clip->state = CLIP_LOOPING; + clip->buffer_size = clip->write_position; + clip->read_position = 0; + printf("Clip %d (scene %d, channel %d): Recording stopped, looping %zu samples\n", + cmd.index, cmd.index / MAX_CHANNELS, cmd.index % MAX_CHANNELS, + clip->buffer_size); + break; + case CLIP_LOOPING: + clip->state = CLIP_STOPPED; + clip->read_position = 0; + printf("Clip %d (scene %d, channel %d): Looping stopped\n", + cmd.index, cmd.index / MAX_CHANNELS, cmd.index % MAX_CHANNELS); + break; + case CLIP_STOPPED: + clip->state = CLIP_LOOPING; + clip->read_position = 0; + printf("Clip %d (scene %d, channel %d): Looping resumed\n", + cmd.index, cmd.index / MAX_CHANNELS, cmd.index % MAX_CHANNELS); + break; + } + break; + } + + case CMD_TRIGGER_SCENE: { + if (cmd.index < 0 || cmd.index >= MAX_SCENES) break; + printf("Scene %d: Triggering all clips\n", cmd.index); + for (int ch = 0; ch < MAX_CHANNELS; ch++) { + int clip_idx = CLIP_INDEX(cmd.index, ch); + Clip *clip = &engine->clips[clip_idx]; + + switch (clip->state) { + case CLIP_EMPTY: + clip->state = CLIP_RECORDING; + clip->write_position = 0; + clip->buffer_size = 0; + clip->read_position = 0; + break; + case CLIP_RECORDING: + clip->state = CLIP_LOOPING; + clip->buffer_size = clip->write_position; + clip->read_position = 0; + break; + case CLIP_LOOPING: + clip->state = CLIP_STOPPED; + clip->read_position = 0; + break; + case CLIP_STOPPED: + clip->state = CLIP_LOOPING; + clip->read_position = 0; + break; + } + } + break; + } + + case CMD_RESET_CLIP: { + if (cmd.index < 0 || cmd.index >= MAX_CLIPS) break; + Clip *clip = &engine->clips[cmd.index]; + clip->state = CLIP_EMPTY; + clip->buffer_size = 0; + clip->write_position = 0; + clip->read_position = 0; + memset(clip->buffer, 0, MAX_BUFFER_SIZE * sizeof(float)); + break; + } + + case CMD_SET_QUANTIZE_MODE: + engine->quantize_mode = (QuantizeMode)cmd.index; + break; + + case CMD_SET_QUANTIZE_THRESHOLD: + engine->quantize_threshold = cmd.value; + break; + + case CMD_RESET_TRANSPORT: + engine->transport.rolling = false; + engine->transport.clock_count = 0; + engine->transport.beat_position = 0; + engine->transport.bar_position = 0; + engine->transport.sample_position = 0; + atomic_store(&engine->transport_rolling, 0); + atomic_store(&engine->transport_clock_count, 0); + atomic_store(&engine->transport_beat_position, 0); + atomic_store(&engine->transport_bar_position, 0); + atomic_store(&engine->transport_sample_position, 0); + break; + } + + read++; + } + + // Update read index after processing all commands + atomic_store(&q->read_index, read); +} + int engine_init(Engine *engine, const char *client_name) { if (!engine || !client_name) return -1; @@ -271,6 +448,18 @@ int engine_init(Engine *engine, const char *client_name) { engine->quantize_threshold = 0; engine->queued_triggers = NULL; + // Initialize command queue + command_queue_init(&engine->command_queue); + + // Initialize atomic state mirrors + atomic_store(&engine->transport_rolling, 0); + atomic_store(&engine->transport_clock_count, 0); + atomic_store(&engine->transport_beat_position, 0); + atomic_store(&engine->transport_bar_position, 0); + atomic_store(&engine->transport_sample_position, 0); + atomic_store(&engine->quantize_mode_atomic, (int)QUANTIZE_OFF); + atomic_store(&engine->quantize_threshold_atomic, 0); + // Initialize transport engine->transport.rolling = false; engine->transport.clock_count = 0; @@ -398,87 +587,45 @@ void engine_stop(Engine *engine) { void engine_trigger_clip(Engine *engine, int clip_index) { if (!engine || clip_index < 0 || clip_index >= MAX_CLIPS) return; - Clip *clip = &engine->clips[clip_index]; - - switch (clip->state) { - case CLIP_EMPTY: - // Start recording - clip->state = CLIP_RECORDING; - clip->write_position = 0; - clip->buffer_size = 0; - clip->read_position = 0; - printf("Clip %d (scene %d, channel %d): Recording started\n", - clip_index, clip_index / MAX_CHANNELS, clip_index % MAX_CHANNELS); - break; - - case CLIP_RECORDING: - // Stop recording, start looping - clip->state = CLIP_LOOPING; - clip->buffer_size = clip->write_position; - clip->read_position = 0; - printf("Clip %d (scene %d, channel %d): Recording stopped, looping %zu samples\n", - clip_index, clip_index / MAX_CHANNELS, clip_index % MAX_CHANNELS, - clip->buffer_size); - break; - - case CLIP_LOOPING: - // Stop looping - clip->state = CLIP_STOPPED; - clip->read_position = 0; - printf("Clip %d (scene %d, channel %d): Looping stopped\n", - clip_index, clip_index / MAX_CHANNELS, clip_index % MAX_CHANNELS); - break; - - case CLIP_STOPPED: - // Start looping again - clip->state = CLIP_LOOPING; - clip->read_position = 0; - printf("Clip %d (scene %d, channel %d): Looping resumed\n", - clip_index, clip_index / MAX_CHANNELS, clip_index % MAX_CHANNELS); - break; - } + // Queue command for audio thread processing + engine_submit_command(engine, CMD_TRIGGER_CLIP, clip_index, 0); } void engine_trigger_scene(Engine *engine, int scene_index) { if (!engine || scene_index < 0 || scene_index >= MAX_SCENES) return; - printf("Scene %d: Triggering all clips\n", scene_index); - - for (int ch = 0; ch < MAX_CHANNELS; ch++) { - int clip_idx = CLIP_INDEX(scene_index, ch); - engine_trigger_clip(engine, clip_idx); - } + engine_submit_command(engine, CMD_TRIGGER_SCENE, scene_index, 0); } void engine_reset_clip(Engine *engine, int clip_index) { if (!engine || clip_index < 0 || clip_index >= MAX_CLIPS) return; - Clip *clip = &engine->clips[clip_index]; - clip->state = CLIP_EMPTY; - clip->buffer_size = 0; - clip->write_position = 0; - clip->read_position = 0; - memset(clip->buffer, 0, MAX_BUFFER_SIZE * sizeof(float)); + engine_submit_command(engine, CMD_RESET_CLIP, clip_index, 0); } void engine_set_quantize_mode(Engine *engine, QuantizeMode mode) { if (!engine) return; - engine->quantize_mode = mode; + + // Atomically update the mode so audio thread sees it immediately + atomic_store(&engine->quantize_mode_atomic, (int)mode); + + // Also queue for any additional processing + engine_submit_command(engine, CMD_SET_QUANTIZE_MODE, (int)mode, 0); + printf("Quantize mode set to: %s\n", quantize_mode_to_string(mode)); } void engine_set_quantize_threshold(Engine *engine, jack_nframes_t samples) { if (!engine) return; - engine->quantize_threshold = samples; + + atomic_store(&engine->quantize_threshold_atomic, samples); + engine_submit_command(engine, CMD_SET_QUANTIZE_THRESHOLD, 0, samples); } void engine_reset_transport(Engine *engine) { if (!engine) return; - engine->transport.rolling = false; - engine->transport.clock_count = 0; - engine->transport.beat_position = 0; - engine->transport.bar_position = 0; - engine->transport.sample_position = 0; + + engine_submit_command(engine, CMD_RESET_TRANSPORT, 0, 0); printf("Transport reset\n"); } diff --git a/engine.h b/engine.h index 746c2d3..acf263a 100644 --- a/engine.h +++ b/engine.h @@ -5,6 +5,7 @@ #include #include #include +#include #define MAX_SCENES 8 #define MAX_CHANNELS 8 @@ -46,6 +47,31 @@ typedef struct { bool is_playing; } Clip; +// Maximum number of queued commands from frontend to audio thread +#define MAX_QUEUED_COMMANDS 64 + +typedef enum { + CMD_TRIGGER_CLIP, + CMD_TRIGGER_SCENE, + CMD_RESET_CLIP, + CMD_SET_QUANTIZE_MODE, + CMD_SET_QUANTIZE_THRESHOLD, + CMD_RESET_TRANSPORT +} CommandType; + +typedef struct { + CommandType type; + int index; // clip_index, scene_index, or mode value + jack_nframes_t value; // threshold value or other numeric param +} Command; + +// Lock-free single-producer single-consumer ring buffer +typedef struct { + Command buffer[MAX_QUEUED_COMMANDS]; + atomic_uint write_index; + atomic_uint read_index; +} CommandQueue; + // Queued trigger for quantization typedef struct QueuedTrigger { int clip_index; @@ -76,6 +102,18 @@ typedef struct { jack_nframes_t quantize_threshold; // in samples (lookahead) QueuedTrigger *queued_triggers; + // Thread-safe command queue for frontend -> audio thread communication + CommandQueue command_queue; + + // Atomic flags for simple state that frontend reads + atomic_int transport_rolling; // bool + atomic_uint transport_clock_count; + atomic_uint transport_beat_position; + atomic_uint transport_bar_position; + atomic_uint transport_sample_position; + atomic_int quantize_mode_atomic; // QuantizeMode + atomic_uint quantize_threshold_atomic; + bool running; } Engine; @@ -98,6 +136,12 @@ void engine_reset_transport(Engine *engine); // Queue management (exposed for testing) void queue_trigger(Engine *engine, int clip_index, bool is_scene, jack_nframes_t time); +// Thread-safe command submission (called from frontend threads) +int engine_submit_command(Engine *engine, CommandType type, int index, jack_nframes_t value); + +// Process pending commands (called from audio thread) +void engine_process_commands(Engine *engine); + // Utility const char* clip_state_to_string(ClipState state); uint8_t clip_state_to_velocity(ClipState state); diff --git a/test_cli.c b/test_cli.c index bcce5ef..3f061c7 100644 --- a/test_cli.c +++ b/test_cli.c @@ -2,6 +2,7 @@ #include #include #include "engine.h" +#include #include "cli.h" // Minimal test: just ensure parsing doesn't crash @@ -16,6 +17,18 @@ static void test_cli_parse(void) { engine.quantize_mode = QUANTIZE_OFF; engine.quantize_threshold = 0; engine.queued_triggers = NULL; + + // Initialize command queue + command_queue_init(&engine.command_queue); + + // Initialize atomic state mirrors + atomic_store(&engine.transport_rolling, 0); + atomic_store(&engine.transport_clock_count, 0); + atomic_store(&engine.transport_beat_position, 0); + atomic_store(&engine.transport_bar_position, 0); + atomic_store(&engine.transport_sample_position, 0); + atomic_store(&engine.quantize_mode_atomic, (int)QUANTIZE_OFF); + atomic_store(&engine.quantize_threshold_atomic, 0); engine.transport.rolling = false; engine.transport.clock_count = 0; engine.transport.beat_position = 0; diff --git a/test_engine.c b/test_engine.c index 020e7ac..3f8c1ce 100644 --- a/test_engine.c +++ b/test_engine.c @@ -2,6 +2,7 @@ #include #include #include +#include #include "engine.h" // Test helper @@ -15,6 +16,18 @@ static Engine *create_test_engine(void) { engine->quantize_threshold = 0; engine->queued_triggers = NULL; + // Initialize command queue + command_queue_init(&engine->command_queue); + + // Initialize atomic state mirrors + atomic_store(&engine->transport_rolling, 0); + atomic_store(&engine->transport_clock_count, 0); + atomic_store(&engine->transport_beat_position, 0); + atomic_store(&engine->transport_bar_position, 0); + atomic_store(&engine->transport_sample_position, 0); + atomic_store(&engine->quantize_mode_atomic, (int)QUANTIZE_OFF); + atomic_store(&engine->quantize_threshold_atomic, 0); + // Initialize transport engine->transport.rolling = false; engine->transport.clock_count = 0; diff --git a/test_tui.c b/test_tui.c index 91a8688..6bc432f 100644 --- a/test_tui.c +++ b/test_tui.c @@ -3,6 +3,7 @@ #include #include #include "engine.h" +#include #include "tui.h" // Test helper @@ -16,6 +17,18 @@ static Engine *create_test_engine(void) { engine->quantize_threshold = 0; engine->queued_triggers = NULL; + // Initialize command queue + command_queue_init(&engine->command_queue); + + // Initialize atomic state mirrors + atomic_store(&engine->transport_rolling, 0); + atomic_store(&engine->transport_clock_count, 0); + atomic_store(&engine->transport_beat_position, 0); + atomic_store(&engine->transport_bar_position, 0); + atomic_store(&engine->transport_sample_position, 0); + atomic_store(&engine->quantize_mode_atomic, (int)QUANTIZE_OFF); + atomic_store(&engine->quantize_threshold_atomic, 0); + // Initialize transport engine->transport.rolling = false; engine->transport.clock_count = 0; diff --git a/tui.c b/tui.c index b28efb3..c05d8b2 100644 --- a/tui.c +++ b/tui.c @@ -106,9 +106,9 @@ static void draw_grid(void) { mvprintw(GRID_ROWS * CELL_HEIGHT + 2, 0, "Quantize: %s | Threshold: %u | Transport: %s", - quantize_mode_to_string(g_engine->quantize_mode), - g_engine->quantize_threshold, - g_engine->transport.rolling ? "Rolling" : "Stopped"); + quantize_mode_to_string((QuantizeMode)atomic_load(&g_engine->quantize_mode_atomic)), + (unsigned int)atomic_load(&g_engine->quantize_threshold_atomic), + atomic_load(&g_engine->transport_rolling) ? "Rolling" : "Stopped"); // Draw help if active if (show_help) {