From 392dabbc0fab9c41c5243bb556db93032afe63ef Mon Sep 17 00:00:00 2001 From: Loic Coenen Date: Sat, 9 May 2026 21:31:54 +0000 Subject: [PATCH] feat: add command queue and FIFO pipe for unified input handling Co-authored-by: aider (deepseek/deepseek-reasoner) --- src/command.h | 19 +++++++++++++ src/looper.c | 45 +++++++++++++++++++++++++++++++ src/main.c | 7 +++++ src/midi.c | 56 +++++++++++++------------------------- src/pipe.c | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/pipe.h | 9 +++++++ src/queue.c | 30 +++++++++++++++++++++ src/queue.h | 31 +++++++++++++++++++++ 8 files changed, 234 insertions(+), 38 deletions(-) create mode 100644 src/command.h create mode 100644 src/pipe.c create mode 100644 src/pipe.h create mode 100644 src/queue.c create mode 100644 src/queue.h diff --git a/src/command.h b/src/command.h new file mode 100644 index 0000000..717e13b --- /dev/null +++ b/src/command.h @@ -0,0 +1,19 @@ +#ifndef COMMAND_H +#define COMMAND_H + +typedef enum { + CMD_CYCLE, // toggle record/stop for a channel + CMD_STOP, // force to idle + // CMD_LOOP_TOGGLE not needed, CYCLE covers it + CMD_BIND_CHANNEL, // bind a channel index (data = channel) + CMD_UNBIND, // reset bind to channel 0 + // ADD and REMOVE are still driven via atomics for now +} cmd_type_t; + +typedef struct { + cmd_type_t type; + int channel; // which channel; -1 means "current/bound" + int data; // extra parameter (e.g. bind channel number) +} command_t; + +#endif diff --git a/src/looper.c b/src/looper.c index 610f08a..cbc9dd2 100644 --- a/src/looper.c +++ b/src/looper.c @@ -9,6 +9,8 @@ #include #include #include +#include "command.h" +#include "queue.h" /* Global state (shared across files) */ struct channel_t channels[MAX_CHANNELS]; @@ -20,10 +22,46 @@ jack_port_t *midi_control_port = NULL; jack_port_t *midi_clock_port = NULL; atomic_int control_key_active = 0; atomic_int bind_channel = 0; +spsc_queue_t cmd_queue; /* Deferred removal index (1 second grace) */ static int pending_unregister_idx = -1; +static void apply_command(command_t cmd) { + switch (cmd.type) { + case CMD_CYCLE: + if (cmd.channel >= 0 && cmd.channel < MAX_CHANNELS) { + int cur = atomic_load(&channels[cmd.channel].state); + int next; + switch (cur) { + case STATE_IDLE: next = STATE_RECORD; break; + case STATE_RECORD: next = STATE_LOOPING; break; + case STATE_LOOPING: next = STATE_PAUSED; break; + case STATE_PAUSED: next = STATE_LOOPING; break; + default: next = STATE_IDLE; break; + } + atomic_store(&channels[cmd.channel].state, next); + } + break; + case CMD_STOP: + if (cmd.channel >= 0 && cmd.channel < MAX_CHANNELS) + atomic_store(&channels[cmd.channel].state, STATE_IDLE); + else { + for (int i = 0; i < MAX_CHANNELS; i++) + atomic_store(&channels[i].state, STATE_IDLE); + } + break; + case CMD_BIND_CHANNEL: + atomic_store(&bind_channel, cmd.data); + break; + case CMD_UNBIND: + atomic_store(&bind_channel, 0); + break; + default: + break; + } +} + /* ---------------------------------------------------------------- * process callback * ---------------------------------------------------------------- */ @@ -37,6 +75,12 @@ int process_callback(jack_nframes_t nframes, void *arg) { } } + /* drain RT‑safe commands */ + command_t cmd; + while (queue_pop(&cmd_queue, &cmd)) { + apply_command(cmd); + } + /* process each active channel */ for (int c = 0; c < MAX_CHANNELS; c++) { if (!atomic_load(&channels[c].active)) @@ -171,6 +215,7 @@ void jack_shutdown_cb(void *arg) { * looper initialisation * ---------------------------------------------------------------- */ int looper_init(jack_client_t *client) { + queue_init(&cmd_queue); /* channel 0 */ channels[0].active = 1; atomic_store(&channels[0].state, STATE_IDLE); diff --git a/src/main.c b/src/main.c index 61220f7..321455a 100644 --- a/src/main.c +++ b/src/main.c @@ -1,5 +1,6 @@ // cppcheck-suppress missingIncludeSystem #include "looper.h" +#include "pipe.h" #include #include #include @@ -33,6 +34,12 @@ int main(int argc, char *argv[]) { return 1; } + if (pipe_start_reader() != 0) { + fprintf(stderr, "pipe reader initialisation failed\n"); + jack_client_close(client); + return 1; + } + if (jack_activate(client)) { fprintf(stderr, "Cannot activate client\n"); jack_client_close(client); diff --git a/src/midi.c b/src/midi.c index bac71c5..eee1bb3 100644 --- a/src/midi.c +++ b/src/midi.c @@ -1,6 +1,8 @@ // cppcheck-suppress missingIncludeSystem #include "midi.h" #include "channel.h" +#include "command.h" +#include "queue.h" #include #include #include @@ -9,6 +11,7 @@ extern atomic_int control_key_active; extern atomic_int cmd_add; extern atomic_int cmd_remove; extern atomic_int bind_channel; +extern spsc_queue_t cmd_queue; void midi_handle_events(void *port_buffer, jack_nframes_t nframes) { (void)nframes; @@ -34,7 +37,8 @@ void midi_handle_events(void *port_buffer, jack_nframes_t nframes) { if (ck) { atomic_store(&control_key_active, 0); if (note < 16) { - atomic_store(&bind_channel, note); + command_t cmd = { .type = CMD_BIND_CHANNEL, .channel = -1, .data = note }; + queue_push(&cmd_queue, cmd); } else { switch (note) { case 60: @@ -43,30 +47,19 @@ void midi_handle_events(void *port_buffer, jack_nframes_t nframes) { case 61: atomic_store(&cmd_remove, 1); break; - case 62: /* trigger looper – channel via bind_channel */ + case 62: { int bch = atomic_load(&bind_channel); if (bch >= 0 && bch < MAX_CHANNELS) { - int cur = atomic_load(&channels[bch].state); - switch (cur) { - case STATE_IDLE: - atomic_store(&channels[bch].state, STATE_RECORD); - break; - case STATE_RECORD: - atomic_store(&channels[bch].state, STATE_LOOPING); - break; - case STATE_LOOPING: - atomic_store(&channels[bch].state, STATE_PAUSED); - break; - case STATE_PAUSED: - atomic_store(&channels[bch].state, STATE_LOOPING); - break; - } + command_t cmd = { .type = CMD_CYCLE, .channel = bch, .data = 0 }; + queue_push(&cmd_queue, cmd); } } break; - case 63: /* unbind – reset bind to channel 0 */ - atomic_store(&bind_channel, 0); - break; + case 63: + { + command_t cmd = { .type = CMD_UNBIND, .channel = -1, .data = 0 }; + queue_push(&cmd_queue, cmd); + } break; default: break; } @@ -74,24 +67,11 @@ void midi_handle_events(void *port_buffer, jack_nframes_t nframes) { } else { /* direct mapping */ switch (note) { - case 1: /* toggle channel 0 */ - { - int cur0 = atomic_load(&channels[0].state); - switch (cur0) { - case STATE_IDLE: - atomic_store(&channels[0].state, STATE_RECORD); - break; - case STATE_RECORD: - atomic_store(&channels[0].state, STATE_LOOPING); - break; - case STATE_LOOPING: - atomic_store(&channels[0].state, STATE_PAUSED); - break; - case STATE_PAUSED: - atomic_store(&channels[0].state, STATE_LOOPING); - break; - } - } break; + case 1: + { + command_t cmd = { .type = CMD_CYCLE, .channel = 0, .data = 0 }; + queue_push(&cmd_queue, cmd); + } break; case 60: atomic_store(&cmd_add, 1); break; diff --git a/src/pipe.c b/src/pipe.c new file mode 100644 index 0000000..8062659 --- /dev/null +++ b/src/pipe.c @@ -0,0 +1,75 @@ +#include "pipe.h" +#include "queue.h" +#include "command.h" +#include +#include +#include +#include +#include +#include +#include +#include + +#define FIFO_PATH "/tmp/looper_cmd" +#define LINE_MAX 256 + +/* forward‑declare the global queue (defined in looper.c) */ +extern spsc_queue_t cmd_queue; + +/* external atomic flags for add/remove (defined in looper.c) */ +extern atomic_int cmd_add; +extern atomic_int cmd_remove; + +static void *pipe_thread_func(void *arg) { + (void)arg; + FILE *fifo = fopen(FIFO_PATH, "r"); + if (!fifo) { + perror("fopen fifo"); + return NULL; + } + char line[LINE_MAX]; + while (fgets(line, sizeof(line), fifo)) { + /* strip newline */ + size_t len = strlen(line); + if (len > 0 && line[len-1] == '\n') + line[len-1] = '\0'; + + if (strcmp(line, "add") == 0) { + atomic_store(&cmd_add, 1); + } else if (strcmp(line, "remove") == 0) { + atomic_store(&cmd_remove, 1); + } else if (strncmp(line, "record ", 7) == 0) { + int ch = atoi(line + 7); + command_t cmd = { .type = CMD_CYCLE, .channel = ch, .data = 0 }; + queue_push(&cmd_queue, cmd); + } else if (strcmp(line, "stop") == 0) { + command_t cmd = { .type = CMD_STOP, .channel = -1, .data = 0 }; + queue_push(&cmd_queue, cmd); + } else if (strncmp(line, "bind ", 5) == 0) { + int ch = atoi(line + 5); + command_t cmd = { .type = CMD_BIND_CHANNEL, .channel = -1, .data = ch }; + queue_push(&cmd_queue, cmd); + } else if (strcmp(line, "unbind") == 0) { + command_t cmd = { .type = CMD_UNBIND, .channel = -1, .data = 0 }; + queue_push(&cmd_queue, cmd); + } + /* ignore unknown lines */ + } + fclose(fifo); + return NULL; +} + +int pipe_start_reader(void) { + /* create FIFO if it doesn't exist */ + if (mkfifo(FIFO_PATH, 0666) != 0 && errno != EEXIST) { + perror("mkfifo"); + return -1; + } + pthread_t tid; + if (pthread_create(&tid, NULL, pipe_thread_func, NULL) != 0) { + perror("pthread_create"); + return -1; + } + pthread_detach(tid); /* we don't need to join */ + return 0; +} diff --git a/src/pipe.h b/src/pipe.h new file mode 100644 index 0000000..f1a8307 --- /dev/null +++ b/src/pipe.h @@ -0,0 +1,9 @@ +#ifndef PIPE_H +#define PIPE_H + +/* Start the FIFO reader thread. + * Creates /tmp/looper_cmd (or aborts on error). + * Returns 0 on success, -1 on failure. */ +int pipe_start_reader(void); + +#endif diff --git a/src/queue.c b/src/queue.c new file mode 100644 index 0000000..bca85c6 --- /dev/null +++ b/src/queue.c @@ -0,0 +1,30 @@ +#include "queue.h" +#include +#include + +void queue_init(spsc_queue_t *q) { + /* nothing to allocate, just ensure head/tail start at 0 */ + q->head = 0; + q->tail = 0; +} + +bool queue_push(spsc_queue_t *q, command_t cmd) { + int h = atomic_load_explicit(&q->head, memory_order_relaxed); + int t = atomic_load_explicit(&q->tail, memory_order_acquire); + int next = (h + 1) % QUEUE_CAPACITY; + if (next == t) + return false; /* queue full */ + q->buffer[h] = cmd; + atomic_store_explicit(&q->head, next, memory_order_release); + return true; +} + +bool queue_pop(spsc_queue_t *q, command_t *cmd) { + int t = atomic_load_explicit(&q->tail, memory_order_relaxed); + int h = atomic_load_explicit(&q->head, memory_order_acquire); + if (t == h) + return false; /* queue empty */ + *cmd = q->buffer[t]; + atomic_store_explicit(&q->tail, (t + 1) % QUEUE_CAPACITY, memory_order_release); + return true; +} diff --git a/src/queue.h b/src/queue.h new file mode 100644 index 0000000..e0da752 --- /dev/null +++ b/src/queue.h @@ -0,0 +1,31 @@ +#ifndef QUEUE_H +#define QUEUE_H + +#include "command.h" +#include + +/* Fixed‑size lock‑free SPSC queue (single producer, single consumer). + * The queue is safe for one thread writing (producer) and one thread + * reading (consumer). No locks, no dynamic memory allocation. + * Must be initialised before first use. All operations are RT‑safe. */ + +#define QUEUE_CAPACITY 256 + +typedef struct { + command_t buffer[QUEUE_CAPACITY]; + /* head: index where next element will be written (producer only) + * tail: index of next element to read (consumer only) */ + int head; + int tail; +} spsc_queue_t; + +/* Initialise queue (must be called once before any push/pop). */ +void queue_init(spsc_queue_t *q); + +/* Push a command. Returns true on success, false if queue full. */ +bool queue_push(spsc_queue_t *q, command_t cmd); + +/* Pop a command. Returns true if a command was retrieved, false if empty. */ +bool queue_pop(spsc_queue_t *q, command_t *cmd); + +#endif