fix: split main command queue into per-source SPSC queues

Co-authored-by: aider (deepseek/deepseek-reasoner) <aider@aider.chat>
This commit is contained in:
Loic Coenen
2026-05-09 23:32:21 +00:00
parent de0389e144
commit 7edd95d06e
3 changed files with 57 additions and 16 deletions

View File

@@ -16,7 +16,8 @@
struct channel_t channels[MAX_CHANNELS]; struct channel_t channels[MAX_CHANNELS];
atomic_int channel_count = 0; atomic_int channel_count = 0;
int next_channel_id = 1; int next_channel_id = 1;
spsc_queue_t cmd_queue_main; spsc_queue_t cmd_queue_main_midi;
spsc_queue_t cmd_queue_main_fifo;
atomic_int global_rt_cycles = 0; atomic_int global_rt_cycles = 0;
jack_port_t *midi_control_port = NULL; jack_port_t *midi_control_port = NULL;
jack_port_t *midi_clock_port = NULL; jack_port_t *midi_clock_port = NULL;
@@ -218,7 +219,8 @@ void jack_shutdown_cb(void *arg) {
* ---------------------------------------------------------------- */ * ---------------------------------------------------------------- */
int looper_init(jack_client_t *client) { int looper_init(jack_client_t *client) {
queue_init(&cmd_queue); queue_init(&cmd_queue);
queue_init(&cmd_queue_main); queue_init(&cmd_queue_main_midi);
queue_init(&cmd_queue_main_fifo);
/* channel 0 */ /* channel 0 */
channels[0].active = 1; channels[0].active = 1;
atomic_store(&channels[0].state, STATE_IDLE); atomic_store(&channels[0].state, STATE_IDLE);
@@ -253,9 +255,36 @@ int looper_init(jack_client_t *client) {
* mainloop command processing * mainloop command processing
* ---------------------------------------------------------------- */ * ---------------------------------------------------------------- */
void looper_process_commands(jack_client_t *client) { void looper_process_commands(jack_client_t *client) {
/* Drain mainloop command queue (add/remove) */ /* Drain mainloop command queues (add/remove) */
command_t cmd; command_t cmd;
while (queue_pop(&cmd_queue_main, &cmd)) { while (queue_pop(&cmd_queue_main_midi, &cmd)) {
switch (cmd.type) {
case CMD_ADD_CHANNEL: {
int idx;
for (idx = 0; idx < MAX_CHANNELS; idx++)
if (!channels[idx].active)
break;
if (idx < MAX_CHANNELS)
channel_add(client, idx);
break;
}
case CMD_REMOVE_CHANNEL: {
int remove_idx = -1;
for (int idx = 1; idx < MAX_CHANNELS; idx++)
if (channels[idx].active)
remove_idx = idx;
if (remove_idx != -1) {
channel_remove(client, remove_idx);
pending_unregister_idx = remove_idx;
pending_unregister_cycle = atomic_load(&global_rt_cycles);
}
break;
}
default:
break;
}
}
while (queue_pop(&cmd_queue_main_fifo, &cmd)) {
switch (cmd.type) { switch (cmd.type) {
case CMD_ADD_CHANNEL: { case CMD_ADD_CHANNEL: {
int idx; int idx;

View File

@@ -10,7 +10,7 @@
extern atomic_int control_key_active; extern atomic_int control_key_active;
extern atomic_int bind_channel; extern atomic_int bind_channel;
extern spsc_queue_t cmd_queue; extern spsc_queue_t cmd_queue;
extern spsc_queue_t cmd_queue_main; extern spsc_queue_t cmd_queue_main_midi;
void midi_handle_events(void *port_buffer, jack_nframes_t nframes) { void midi_handle_events(void *port_buffer, jack_nframes_t nframes) {
(void)nframes; (void)nframes;
@@ -40,10 +40,16 @@ void midi_handle_events(void *port_buffer, jack_nframes_t nframes) {
queue_push(&cmd_queue, cmd); queue_push(&cmd_queue, cmd);
} else { } else {
switch (note) { switch (note) {
case 60: /* add channel not rtsafe, moved to FIFO pipe */ case 60:
case 61: /* remove channel not rtsafe, moved to FIFO pipe */ {
/* no operation here */ command_t cmd = { .type = CMD_ADD_CHANNEL, .channel = -1, .data = 0 };
break; queue_push(&cmd_queue_main_midi, cmd);
} break;
case 61:
{
command_t cmd = { .type = CMD_REMOVE_CHANNEL, .channel = -1, .data = 0 };
queue_push(&cmd_queue_main_midi, cmd);
} break;
case 62: case 62:
{ {
int bch = atomic_load(&bind_channel); int bch = atomic_load(&bind_channel);
@@ -69,10 +75,16 @@ void midi_handle_events(void *port_buffer, jack_nframes_t nframes) {
command_t cmd = { .type = CMD_CYCLE, .channel = 0, .data = 0 }; command_t cmd = { .type = CMD_CYCLE, .channel = 0, .data = 0 };
queue_push(&cmd_queue, cmd); queue_push(&cmd_queue, cmd);
} break; } break;
case 60: /* add channel not rtsafe, moved to FIFO pipe */ case 60:
case 61: /* remove channel not rtsafe, moved to FIFO pipe */ {
/* no operation here */ command_t cmd = { .type = CMD_ADD_CHANNEL, .channel = -1, .data = 0 };
break; queue_push(&cmd_queue_main_midi, cmd);
} break;
case 61:
{
command_t cmd = { .type = CMD_REMOVE_CHANNEL, .channel = -1, .data = 0 };
queue_push(&cmd_queue_main_midi, cmd);
} break;
default: default:
break; break;
} }

View File

@@ -15,7 +15,7 @@
/* forwarddeclare the global queues (defined in looper.c) */ /* forwarddeclare the global queues (defined in looper.c) */
extern spsc_queue_t cmd_queue; extern spsc_queue_t cmd_queue;
extern spsc_queue_t cmd_queue_main; extern spsc_queue_t cmd_queue_main_fifo;
static void *pipe_thread_func(void *arg) { static void *pipe_thread_func(void *arg) {
(void)arg; (void)arg;
@@ -33,10 +33,10 @@ static void *pipe_thread_func(void *arg) {
if (strcmp(line, "add") == 0) { if (strcmp(line, "add") == 0) {
command_t cmd = { .type = CMD_ADD_CHANNEL, .channel = -1, .data = 0 }; command_t cmd = { .type = CMD_ADD_CHANNEL, .channel = -1, .data = 0 };
queue_push(&cmd_queue_main, cmd); queue_push(&cmd_queue_main_fifo, cmd);
} else if (strcmp(line, "remove") == 0) { } else if (strcmp(line, "remove") == 0) {
command_t cmd = { .type = CMD_REMOVE_CHANNEL, .channel = -1, .data = 0 }; command_t cmd = { .type = CMD_REMOVE_CHANNEL, .channel = -1, .data = 0 };
queue_push(&cmd_queue_main, cmd); queue_push(&cmd_queue_main_fifo, cmd);
} else if (strncmp(line, "record ", 7) == 0) { } else if (strncmp(line, "record ", 7) == 0) {
int ch = atoi(line + 7); int ch = atoi(line + 7);
command_t cmd = { .type = CMD_CYCLE, .channel = ch, .data = 0 }; command_t cmd = { .type = CMD_CYCLE, .channel = ch, .data = 0 };