feat: implement lock-free command queue and atomic state for thread safety

Co-authored-by: aider (deepseek/deepseek-coder) <aider@aider.chat>
This commit is contained in:
Loic Coenen
2026-05-01 14:52:53 +00:00
parent 563380df66
commit 7b23c75dd1
6 changed files with 293 additions and 63 deletions

267
engine.c
View File

@@ -3,6 +3,7 @@
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <stdatomic.h>
// Forward declarations
static void process_queued_triggers(Engine *engine, jack_nframes_t current_frame);
@@ -12,6 +13,9 @@ static jack_nframes_t get_next_quantize_frame(Engine *engine, jack_nframes_t cur
static int process_callback(jack_nframes_t nframes, void *arg) {
Engine *engine = (Engine *)arg;
// Process commands from frontend threads
engine_process_commands(engine);
// Get per-channel audio buffers
jack_default_audio_sample_t *audio_in[MAX_CHANNELS];
jack_default_audio_sample_t *audio_out[MAX_CHANNELS];
@@ -48,11 +52,17 @@ static int process_callback(jack_nframes_t nframes, void *arg) {
(engine->transport.clock_count * engine->sample_rate * 4) /
(MIDI_CLOCKS_PER_BEAT * BEATS_PER_BAR);
// Update atomic mirrors for frontend reads
atomic_store(&engine->transport_clock_count, engine->transport.clock_count);
atomic_store(&engine->transport_sample_position, engine->transport.sample_position);
if (engine->transport.clock_count % MIDI_CLOCKS_PER_BEAT == 0) {
engine->transport.beat_position =
(engine->transport.beat_position + 1) % BEATS_PER_BAR;
atomic_store(&engine->transport_beat_position, engine->transport.beat_position);
if (engine->transport.beat_position == 0) {
engine->transport.bar_position++;
atomic_store(&engine->transport_bar_position, engine->transport.bar_position);
}
}
} else if (status == 0xFA) { // MIDI Start
@@ -61,10 +71,17 @@ static int process_callback(jack_nframes_t nframes, void *arg) {
engine->transport.beat_position = 0;
engine->transport.bar_position = 0;
engine->transport.sample_position = 0;
atomic_store(&engine->transport_rolling, 1);
atomic_store(&engine->transport_clock_count, 0);
atomic_store(&engine->transport_beat_position, 0);
atomic_store(&engine->transport_bar_position, 0);
atomic_store(&engine->transport_sample_position, 0);
} else if (status == 0xFC) { // MIDI Stop
engine->transport.rolling = false;
atomic_store(&engine->transport_rolling, 0);
} else if (status == 0xFB) { // MIDI Continue
engine->transport.rolling = true;
atomic_store(&engine->transport_rolling, 1);
}
// Pass through clock messages
@@ -89,7 +106,10 @@ static int process_callback(jack_nframes_t nframes, void *arg) {
if (status == 0x90 && channel == engine->control_channel && velocity > 0) {
int clip_index = note % MAX_CLIPS;
if (engine->quantize_mode != QUANTIZE_OFF && engine->transport.rolling) {
// Read quantize mode atomically (frontend may update it)
QuantizeMode current_quantize = (QuantizeMode)atomic_load(&engine->quantize_mode_atomic);
if (current_quantize != QUANTIZE_OFF && engine->transport.rolling) {
// Queue for quantization
jack_nframes_t trigger_time = midi_event.time;
queue_trigger(engine, clip_index, false, trigger_time);
@@ -128,7 +148,10 @@ static int process_callback(jack_nframes_t nframes, void *arg) {
if (status == 0x90 && velocity > 0) {
int scene_index = note % MAX_SCENES;
if (engine->quantize_mode != QUANTIZE_OFF && engine->transport.rolling) {
// Read quantize mode atomically (frontend may update it)
QuantizeMode current_quantize = (QuantizeMode)atomic_load(&engine->quantize_mode_atomic);
if (current_quantize != QUANTIZE_OFF && engine->transport.rolling) {
// Queue for quantization
jack_nframes_t trigger_time = midi_event.time;
queue_trigger(engine, scene_index, true, trigger_time);
@@ -261,6 +284,160 @@ static void process_queued_triggers(Engine *engine, jack_nframes_t nframes) {
}
}
// Initialize command queue
static void command_queue_init(CommandQueue *q) {
atomic_store(&q->write_index, 0);
atomic_store(&q->read_index, 0);
}
// Submit command from frontend thread (non-blocking)
int engine_submit_command(Engine *engine, CommandType type, int index, jack_nframes_t value) {
if (!engine) return -1;
CommandQueue *q = &engine->command_queue;
unsigned int write = atomic_load(&q->write_index);
unsigned int read = atomic_load(&q->read_index);
// Check if queue is full
if ((write - read) >= MAX_QUEUED_COMMANDS) {
fprintf(stderr, "Command queue full, dropping command\n");
return -1;
}
unsigned int slot = write % MAX_QUEUED_COMMANDS;
q->buffer[slot].type = type;
q->buffer[slot].index = index;
q->buffer[slot].value = value;
// Memory barrier ensures buffer write completes before write_index update
atomic_store(&q->write_index, write + 1);
return 0;
}
// Process pending commands (called from audio thread only)
void engine_process_commands(Engine *engine) {
if (!engine) return;
CommandQueue *q = &engine->command_queue;
unsigned int write = atomic_load(&q->write_index);
unsigned int read = atomic_load(&q->read_index);
while (read < write) {
unsigned int slot = read % MAX_QUEUED_COMMANDS;
Command cmd = q->buffer[slot];
// Process the command directly (we're in the audio thread)
switch (cmd.type) {
case CMD_TRIGGER_CLIP: {
if (cmd.index < 0 || cmd.index >= MAX_CLIPS) break;
Clip *clip = &engine->clips[cmd.index];
switch (clip->state) {
case CLIP_EMPTY:
clip->state = CLIP_RECORDING;
clip->write_position = 0;
clip->buffer_size = 0;
clip->read_position = 0;
printf("Clip %d (scene %d, channel %d): Recording started\n",
cmd.index, cmd.index / MAX_CHANNELS, cmd.index % MAX_CHANNELS);
break;
case CLIP_RECORDING:
clip->state = CLIP_LOOPING;
clip->buffer_size = clip->write_position;
clip->read_position = 0;
printf("Clip %d (scene %d, channel %d): Recording stopped, looping %zu samples\n",
cmd.index, cmd.index / MAX_CHANNELS, cmd.index % MAX_CHANNELS,
clip->buffer_size);
break;
case CLIP_LOOPING:
clip->state = CLIP_STOPPED;
clip->read_position = 0;
printf("Clip %d (scene %d, channel %d): Looping stopped\n",
cmd.index, cmd.index / MAX_CHANNELS, cmd.index % MAX_CHANNELS);
break;
case CLIP_STOPPED:
clip->state = CLIP_LOOPING;
clip->read_position = 0;
printf("Clip %d (scene %d, channel %d): Looping resumed\n",
cmd.index, cmd.index / MAX_CHANNELS, cmd.index % MAX_CHANNELS);
break;
}
break;
}
case CMD_TRIGGER_SCENE: {
if (cmd.index < 0 || cmd.index >= MAX_SCENES) break;
printf("Scene %d: Triggering all clips\n", cmd.index);
for (int ch = 0; ch < MAX_CHANNELS; ch++) {
int clip_idx = CLIP_INDEX(cmd.index, ch);
Clip *clip = &engine->clips[clip_idx];
switch (clip->state) {
case CLIP_EMPTY:
clip->state = CLIP_RECORDING;
clip->write_position = 0;
clip->buffer_size = 0;
clip->read_position = 0;
break;
case CLIP_RECORDING:
clip->state = CLIP_LOOPING;
clip->buffer_size = clip->write_position;
clip->read_position = 0;
break;
case CLIP_LOOPING:
clip->state = CLIP_STOPPED;
clip->read_position = 0;
break;
case CLIP_STOPPED:
clip->state = CLIP_LOOPING;
clip->read_position = 0;
break;
}
}
break;
}
case CMD_RESET_CLIP: {
if (cmd.index < 0 || cmd.index >= MAX_CLIPS) break;
Clip *clip = &engine->clips[cmd.index];
clip->state = CLIP_EMPTY;
clip->buffer_size = 0;
clip->write_position = 0;
clip->read_position = 0;
memset(clip->buffer, 0, MAX_BUFFER_SIZE * sizeof(float));
break;
}
case CMD_SET_QUANTIZE_MODE:
engine->quantize_mode = (QuantizeMode)cmd.index;
break;
case CMD_SET_QUANTIZE_THRESHOLD:
engine->quantize_threshold = cmd.value;
break;
case CMD_RESET_TRANSPORT:
engine->transport.rolling = false;
engine->transport.clock_count = 0;
engine->transport.beat_position = 0;
engine->transport.bar_position = 0;
engine->transport.sample_position = 0;
atomic_store(&engine->transport_rolling, 0);
atomic_store(&engine->transport_clock_count, 0);
atomic_store(&engine->transport_beat_position, 0);
atomic_store(&engine->transport_bar_position, 0);
atomic_store(&engine->transport_sample_position, 0);
break;
}
read++;
}
// Update read index after processing all commands
atomic_store(&q->read_index, read);
}
int engine_init(Engine *engine, const char *client_name) {
if (!engine || !client_name) return -1;
@@ -271,6 +448,18 @@ int engine_init(Engine *engine, const char *client_name) {
engine->quantize_threshold = 0;
engine->queued_triggers = NULL;
// Initialize command queue
command_queue_init(&engine->command_queue);
// Initialize atomic state mirrors
atomic_store(&engine->transport_rolling, 0);
atomic_store(&engine->transport_clock_count, 0);
atomic_store(&engine->transport_beat_position, 0);
atomic_store(&engine->transport_bar_position, 0);
atomic_store(&engine->transport_sample_position, 0);
atomic_store(&engine->quantize_mode_atomic, (int)QUANTIZE_OFF);
atomic_store(&engine->quantize_threshold_atomic, 0);
// Initialize transport
engine->transport.rolling = false;
engine->transport.clock_count = 0;
@@ -398,87 +587,45 @@ void engine_stop(Engine *engine) {
void engine_trigger_clip(Engine *engine, int clip_index) {
if (!engine || clip_index < 0 || clip_index >= MAX_CLIPS) return;
Clip *clip = &engine->clips[clip_index];
switch (clip->state) {
case CLIP_EMPTY:
// Start recording
clip->state = CLIP_RECORDING;
clip->write_position = 0;
clip->buffer_size = 0;
clip->read_position = 0;
printf("Clip %d (scene %d, channel %d): Recording started\n",
clip_index, clip_index / MAX_CHANNELS, clip_index % MAX_CHANNELS);
break;
case CLIP_RECORDING:
// Stop recording, start looping
clip->state = CLIP_LOOPING;
clip->buffer_size = clip->write_position;
clip->read_position = 0;
printf("Clip %d (scene %d, channel %d): Recording stopped, looping %zu samples\n",
clip_index, clip_index / MAX_CHANNELS, clip_index % MAX_CHANNELS,
clip->buffer_size);
break;
case CLIP_LOOPING:
// Stop looping
clip->state = CLIP_STOPPED;
clip->read_position = 0;
printf("Clip %d (scene %d, channel %d): Looping stopped\n",
clip_index, clip_index / MAX_CHANNELS, clip_index % MAX_CHANNELS);
break;
case CLIP_STOPPED:
// Start looping again
clip->state = CLIP_LOOPING;
clip->read_position = 0;
printf("Clip %d (scene %d, channel %d): Looping resumed\n",
clip_index, clip_index / MAX_CHANNELS, clip_index % MAX_CHANNELS);
break;
}
// Queue command for audio thread processing
engine_submit_command(engine, CMD_TRIGGER_CLIP, clip_index, 0);
}
void engine_trigger_scene(Engine *engine, int scene_index) {
if (!engine || scene_index < 0 || scene_index >= MAX_SCENES) return;
printf("Scene %d: Triggering all clips\n", scene_index);
for (int ch = 0; ch < MAX_CHANNELS; ch++) {
int clip_idx = CLIP_INDEX(scene_index, ch);
engine_trigger_clip(engine, clip_idx);
}
engine_submit_command(engine, CMD_TRIGGER_SCENE, scene_index, 0);
}
void engine_reset_clip(Engine *engine, int clip_index) {
if (!engine || clip_index < 0 || clip_index >= MAX_CLIPS) return;
Clip *clip = &engine->clips[clip_index];
clip->state = CLIP_EMPTY;
clip->buffer_size = 0;
clip->write_position = 0;
clip->read_position = 0;
memset(clip->buffer, 0, MAX_BUFFER_SIZE * sizeof(float));
engine_submit_command(engine, CMD_RESET_CLIP, clip_index, 0);
}
void engine_set_quantize_mode(Engine *engine, QuantizeMode mode) {
if (!engine) return;
engine->quantize_mode = mode;
// Atomically update the mode so audio thread sees it immediately
atomic_store(&engine->quantize_mode_atomic, (int)mode);
// Also queue for any additional processing
engine_submit_command(engine, CMD_SET_QUANTIZE_MODE, (int)mode, 0);
printf("Quantize mode set to: %s\n", quantize_mode_to_string(mode));
}
void engine_set_quantize_threshold(Engine *engine, jack_nframes_t samples) {
if (!engine) return;
engine->quantize_threshold = samples;
atomic_store(&engine->quantize_threshold_atomic, samples);
engine_submit_command(engine, CMD_SET_QUANTIZE_THRESHOLD, 0, samples);
}
void engine_reset_transport(Engine *engine) {
if (!engine) return;
engine->transport.rolling = false;
engine->transport.clock_count = 0;
engine->transport.beat_position = 0;
engine->transport.bar_position = 0;
engine->transport.sample_position = 0;
engine_submit_command(engine, CMD_RESET_TRANSPORT, 0, 0);
printf("Transport reset\n");
}