Skip to content

Commit 0c5b2ee

Browse files
authored
[Proxying][NFC] Move task_queue to a separate header (emscripten-core#17954)
In preparation for future work adding a fast path for notifying proxying queues using `Atomics.waitAsync` instead of `postMessage`, move the `task_queue` implementation out of proxying.c and into its own header and implementation file. Using `Atomics.waitAsync` in place of `postMessage` will require threads to have new message queues to replace the browser's `postMessage` queues, and there is no reason to implement a new kind of message queue when we can just reuse the existing `task_queue`. Now that `task_queue` is not just a private implementation detail of `em_proxying_queue`, also change its name (and the name of its associated functions) to use the `em_` prefix to avoid name collisions with potential user-level task queues.
1 parent 91926db commit 0c5b2ee

8 files changed

Lines changed: 207 additions & 163 deletions

File tree

system/lib/libc/musl/src/internal/proxying_notification_state.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
* found in the LICENSE file.
66
*/
77

8+
#pragma once
9+
810
// Flag values used when creating postMessage notifications and when freeing
911
// proxying queues. New postMessages are created for new work unless the
1012
// relevant task queue is in state NOTIFICATION_PENDING and proxying queues can

system/lib/pthread/em_task_queue.c

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2021 The Emscripten Authors. All rights reserved.
3+
* Emscripten is available under two separate licenses, the MIT license and the
4+
* University of Illinois/NCSA Open Source License. Both these licenses can be
5+
* found in the LICENSE file.
6+
*/
7+
8+
#include <stdlib.h>
9+
#include <string.h>
10+
11+
#include "em_task_queue.h"
12+
#include "proxying_notification_state.h"
13+
14+
#define EM_TASK_QUEUE_INITIAL_CAPACITY 128
15+
16+
em_task_queue* em_task_queue_create(pthread_t thread) {
17+
em_task_queue* queue = malloc(sizeof(em_task_queue));
18+
if (queue == NULL) {
19+
return NULL;
20+
}
21+
task* tasks = malloc(sizeof(task) * EM_TASK_QUEUE_INITIAL_CAPACITY);
22+
if (tasks == NULL) {
23+
free(queue);
24+
return NULL;
25+
}
26+
*queue = (em_task_queue){.notification = NOTIFICATION_NONE,
27+
.mutex = PTHREAD_MUTEX_INITIALIZER,
28+
.thread = thread,
29+
.processing = 0,
30+
.tasks = tasks,
31+
.capacity = EM_TASK_QUEUE_INITIAL_CAPACITY,
32+
.head = 0,
33+
.tail = 0};
34+
return queue;
35+
}
36+
37+
void em_task_queue_destroy(em_task_queue* queue) {
38+
pthread_mutex_destroy(&queue->mutex);
39+
free(queue->tasks);
40+
free(queue);
41+
}
42+
43+
// Not thread safe. Returns 1 on success and 0 on failure.
44+
static int em_task_queue_grow(em_task_queue* queue) {
45+
// Allocate a larger task queue.
46+
int new_capacity = queue->capacity * 2;
47+
task* new_tasks = malloc(sizeof(task) * new_capacity);
48+
if (new_tasks == NULL) {
49+
return 0;
50+
}
51+
// Copy the tasks such that the head of the queue is at the beginning of the
52+
// buffer. There are two cases to handle: either the queue wraps around the
53+
// end of the old buffer or it does not.
54+
int queued_tasks;
55+
if (queue->head <= queue->tail) {
56+
// No wrap. Copy the tasks in one chunk.
57+
queued_tasks = queue->tail - queue->head;
58+
memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * queued_tasks);
59+
} else {
60+
// Wrap. Copy `first_queued` tasks up to the end of the old buffer and
61+
// `last_queued` tasks at the beginning of the old buffer.
62+
int first_queued = queue->capacity - queue->head;
63+
int last_queued = queue->tail;
64+
queued_tasks = first_queued + last_queued;
65+
memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * first_queued);
66+
memcpy(new_tasks + first_queued, queue->tasks, sizeof(task) * last_queued);
67+
}
68+
free(queue->tasks);
69+
queue->tasks = new_tasks;
70+
queue->capacity = new_capacity;
71+
queue->head = 0;
72+
queue->tail = queued_tasks;
73+
return 1;
74+
}
75+
76+
void em_task_queue_execute(em_task_queue* queue) {
77+
queue->processing = 1;
78+
pthread_mutex_lock(&queue->mutex);
79+
while (!em_task_queue_is_empty(queue)) {
80+
task t = em_task_queue_dequeue(queue);
81+
// Unlock while the task is running to allow more work to be queued in
82+
// parallel.
83+
pthread_mutex_unlock(&queue->mutex);
84+
t.func(t.arg);
85+
pthread_mutex_lock(&queue->mutex);
86+
}
87+
pthread_mutex_unlock(&queue->mutex);
88+
queue->processing = 0;
89+
}
90+
91+
int em_task_queue_enqueue(em_task_queue* queue, task t) {
92+
if (em_task_queue_is_full(queue) && !em_task_queue_grow(queue)) {
93+
return 0;
94+
}
95+
queue->tasks[queue->tail] = t;
96+
queue->tail = (queue->tail + 1) % queue->capacity;
97+
return 1;
98+
}
99+
100+
task em_task_queue_dequeue(em_task_queue* queue) {
101+
task t = queue->tasks[queue->head];
102+
queue->head = (queue->head + 1) % queue->capacity;
103+
return t;
104+
}

system/lib/pthread/em_task_queue.h

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2021 The Emscripten Authors. All rights reserved.
3+
* Emscripten is available under two separate licenses, the MIT license and the
4+
* University of Illinois/NCSA Open Source License. Both these licenses can be
5+
* found in the LICENSE file.
6+
*/
7+
8+
#pragma once
9+
10+
#include <pthread.h>
11+
12+
#include "proxying_notification_state.h"
13+
14+
// A task is an arbitrary function combined with some arbitrary state.
15+
typedef struct task {
16+
void (*func)(void*);
17+
void* arg;
18+
} task;
19+
20+
// A task queue holding tasks to be processed by a particular thread.
21+
typedef struct em_task_queue {
22+
// Flag encoding the state of postMessage notifications for this task queue.
23+
// Accessed directly from JS, so must be the first member.
24+
_Atomic notification_state notification;
25+
// Protects all modifications to mutable `em_task_queue` state.
26+
pthread_mutex_t mutex;
27+
// The target thread for this em_task_queue. Immutable and accessible without
28+
// acquiring the mutex.
29+
pthread_t thread;
30+
// Recursion guard. Only accessed on the target thread, so there's no need to
31+
// hold the lock when accessing it. TODO: We disallow recursive processing
32+
// because that's what the old proxying API does, so it is safer to start with
33+
// the same behavior. Experiment with relaxing this restriction once the old
34+
// API uses these queues as well.
35+
int processing;
36+
// Ring buffer of tasks of size `capacity`. New tasks are enqueued at
37+
// `tail` and dequeued at `head`.
38+
task* tasks;
39+
int capacity;
40+
int head;
41+
int tail;
42+
} em_task_queue;
43+
44+
// Send a postMessage notification containing the em_task_queue pointer to the
45+
// target thread so it will execute the queue when it returns to the event loop.
46+
// Also pass in the current thread and main thread ids to minimize calls back
47+
// into Wasm.
48+
extern int _emscripten_notify_task_queue(pthread_t target_thread,
49+
pthread_t curr_thread,
50+
pthread_t main_thread,
51+
em_task_queue* queue);
52+
53+
em_task_queue* em_task_queue_create(pthread_t thread);
54+
55+
void em_task_queue_destroy(em_task_queue* queue);
56+
57+
// Execute tasks until an empty queue is observed.
58+
void em_task_queue_execute(em_task_queue* queue);
59+
60+
// Not thread safe.
61+
static inline int em_task_queue_is_empty(em_task_queue* queue) {
62+
return queue->head == queue->tail;
63+
}
64+
65+
// Not thread safe.
66+
static inline int em_task_queue_is_full(em_task_queue* queue) {
67+
return queue->head == (queue->tail + 1) % queue->capacity;
68+
}
69+
70+
// Not thread safe. Returns 1 on success and 0 on failure.
71+
int em_task_queue_enqueue(em_task_queue* queue, task t);
72+
73+
// Not thread safe. Assumes the queue is not empty.
74+
task em_task_queue_dequeue(em_task_queue* queue);

0 commit comments

Comments
 (0)