2 * Permission is hereby granted, free of charge, to any person obtaining a copy
3 * of this software and associated documentation files (the "Software"), to deal
4 * in the Software without restriction, including without limitation the rights
5 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6 * copies of the Software, and to permit persons to whom the Software is
7 * furnished to do so, subject to the following conditions:
9 * The above copyright notice and this permission notice shall be included in
10 * all copies or substantial portions of the Software.
12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 * Thread message API test
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/frame.h"
28 #include "libavutil/threadmessage.h"
29 #include "libavutil/thread.h" // not public
35 AVThreadMessageQueue *queue;
38 /* same as sender_data but shuffled for testing purpose */
39 struct receiver_data {
43 AVThreadMessageQueue *queue;
48 // we add some junk in the message to make sure the message size is >
53 #define MAGIC 0xdeadc0de
55 static void free_frame(void *arg)
57 struct message *msg = arg;
58 av_assert0(msg->magic == MAGIC);
59 av_frame_free(&msg->frame);
62 static void *sender_thread(void *arg)
65 struct sender_data *wd = arg;
67 av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
68 for (i = 0; i < wd->workload; i++) {
69 if (rand() % wd->workload < wd->workload / 10) {
70 av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
71 av_thread_message_flush(wd->queue);
74 AVDictionary *meta = NULL;
75 struct message msg = {
77 .frame = av_frame_alloc(),
81 ret = AVERROR(ENOMEM);
85 /* we add some metadata to identify the frames */
86 val = av_asprintf("frame %d/%d from sender %d",
87 i + 1, wd->workload, wd->id);
89 av_frame_free(&msg.frame);
90 ret = AVERROR(ENOMEM);
93 ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
95 av_frame_free(&msg.frame);
98 msg.frame->metadata = meta;
100 /* allocate a real frame in order to simulate "real" work */
101 msg.frame->format = AV_PIX_FMT_RGBA;
102 msg.frame->width = 320;
103 msg.frame->height = 240;
104 ret = av_frame_get_buffer(msg.frame, 32);
106 av_frame_free(&msg.frame);
110 /* push the frame in the common queue */
111 av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
112 wd->id, i + 1, wd->workload, msg.frame);
113 ret = av_thread_message_queue_send(wd->queue, &msg, 0);
115 av_frame_free(&msg.frame);
120 av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
121 wd->id, av_err2str(ret));
122 av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
126 static void *receiver_thread(void *arg)
129 struct receiver_data *rd = arg;
131 for (i = 0; i < rd->workload; i++) {
132 if (rand() % rd->workload < rd->workload / 10) {
133 av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
134 "discarding %d message(s)\n", rd->id,
135 av_thread_message_queue_nb_elems(rd->queue));
136 av_thread_message_flush(rd->queue);
140 AVDictionaryEntry *e;
142 ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
145 av_assert0(msg.magic == MAGIC);
146 meta = msg.frame->metadata;
147 e = av_dict_get(meta, "sig", NULL, 0);
148 av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
149 av_frame_free(&msg.frame);
153 av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
154 av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
159 static int get_workload(int minv, int maxv)
161 return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
164 int main(int ac, char **av)
168 int nb_senders, sender_min_load, sender_max_load;
169 int nb_receivers, receiver_min_load, receiver_max_load;
170 struct sender_data *senders;
171 struct receiver_data *receivers;
172 AVThreadMessageQueue *queue = NULL;
175 av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
176 "<nb_senders> <sender_min_send> <sender_max_send> "
177 "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
181 max_queue_size = atoi(av[1]);
182 nb_senders = atoi(av[2]);
183 sender_min_load = atoi(av[3]);
184 sender_max_load = atoi(av[4]);
185 nb_receivers = atoi(av[5]);
186 receiver_min_load = atoi(av[6]);
187 receiver_max_load = atoi(av[7]);
189 if (max_queue_size <= 0 ||
190 nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
191 nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
192 av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
196 av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
197 "%d receivers receiving [%d-%d]\n", max_queue_size,
198 nb_senders, sender_min_load, sender_max_load,
199 nb_receivers, receiver_min_load, receiver_max_load);
201 senders = av_mallocz_array(nb_senders, sizeof(*senders));
202 receivers = av_mallocz_array(nb_receivers, sizeof(*receivers));
203 if (!senders || !receivers) {
204 ret = AVERROR(ENOMEM);
208 ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
212 av_thread_message_queue_set_free_func(queue, free_frame);
214 #define SPAWN_THREADS(type) do { \
215 for (i = 0; i < nb_##type##s; i++) { \
216 struct type##_data *td = &type##s[i]; \
220 td->workload = get_workload(type##_min_load, type##_max_load); \
222 ret = pthread_create(&td->tid, NULL, type##_thread, td); \
224 const int err = AVERROR(ret); \
225 av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
226 " thread: %s\n", av_err2str(err)); \
232 #define WAIT_THREADS(type) do { \
233 for (i = 0; i < nb_##type##s; i++) { \
234 struct type##_data *td = &type##s[i]; \
236 ret = pthread_join(td->tid, NULL); \
238 const int err = AVERROR(ret); \
239 av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
240 " thread: %s\n", av_err2str(err)); \
246 SPAWN_THREADS(receiver);
247 SPAWN_THREADS(sender);
249 WAIT_THREADS(sender);
250 WAIT_THREADS(receiver);
253 av_thread_message_queue_free(&queue);
255 av_freep(&receivers);
257 if (ret < 0 && ret != AVERROR_EOF) {
258 av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));