Compare commits

..

19 Commits
v0.69 ... v0.75

Author SHA1 Message Date
Devaev Maxim
5672d1aa75 Bump version: 0.74 → 0.75 2019-05-18 18:24:32 +03:00
Devaev Maxim
078097efec Added links to FreeBSD port 2019-05-18 18:22:37 +03:00
Devaev Maxim
3cd8338886 Issue #6: make install-strip and make uninstall 2019-05-18 17:58:23 +03:00
Maxim Devaev
d2b57cc7d5 Update README.md 2019-05-18 17:25:42 +03:00
Maxim Devaev
2c4f59c87a Update README.ru.md 2019-05-18 17:24:53 +03:00
Devaev Maxim
36eb7eeb76 minor help fix 2019-05-18 16:35:10 +03:00
Devaev Maxim
3b6544db8a cpu/encoder: removed TODO about jpeg error handling 2019-05-18 02:12:18 +03:00
Devaev Maxim
8033ab23ed Bump version: 0.73 → 0.74 2019-05-17 17:15:17 +03:00
Devaev Maxim
ddb3db8b20 Issue #5: Added libevent version check 2019-05-17 16:57:53 +03:00
Devaev Maxim
b3c8071edb Bump version: 0.72 → 0.73 2019-05-09 18:51:27 +03:00
Devaev Maxim
bc70faae09 fix 2019-05-05 04:26:25 +03:00
Devaev Maxim
ae7c4c91e0 Bump version: 0.71 → 0.72 2019-05-03 01:49:44 +03:00
Devaev Maxim
613baa4e1e better unix socket handling 2019-05-03 00:10:48 +03:00
Devaev Maxim
33a101b4f7 Bump version: 0.70 → 0.71 2019-05-01 21:00:22 +03:00
Devaev Maxim
9e9039c4e6 y != yes 2019-05-01 20:59:30 +03:00
Maxim Devaev
b3ceec51de Update README.md 2019-04-26 02:24:16 +03:00
Maxim Devaev
e3ac4ba6f5 Update README.ru.md 2019-04-26 02:23:54 +03:00
Devaev Maxim
fa6b8b44c1 Bump version: 0.69 → 0.70 2019-04-18 05:17:00 +03:00
Devaev Maxim
8cb7574af2 encoder refactoring 2019-04-18 05:09:32 +03:00
20 changed files with 126 additions and 111 deletions

View File

@@ -1,7 +1,7 @@
[bumpversion]
commit = True
tag = True
current_version = 0.69
current_version = 0.75
parse = (?P<major>\d+)\.(?P<minor>\d+)(\.(?P<patch>\d+)(\-(?P<release>[a-z]+))?)?
serialize =
{major}.{minor}

View File

@@ -28,6 +28,14 @@ install: $(PROG)
install -Dm755 $(PROG) $(DESTDIR)$(PREFIX)/bin/$(PROG)
install-strip: install
strip $(DESTDIR)$(PREFIX)/bin/$(PROG)
uninstall:
rm $(DESTDIR)$(PREFIX)/bin/$(PROG)
regen:
tools/make-jpeg-h.py src/http/data/blank.jpeg src/http/data/blank_jpeg.h BLANK
tools/make-html-h.py src/http/data/index.html src/http/data/index_html.h INDEX

View File

@@ -40,7 +40,8 @@ $ make
$ ./ustreamer --help
```
AUR has a package for Arch Linux: https://aur.archlinux.org/packages/ustreamer
AUR has a package for Arch Linux: https://aur.archlinux.org/packages/ustreamer
FreeBSD port: https://www.freshports.org/multimedia/ustreamer
-----
# Usage

View File

@@ -40,7 +40,8 @@ $ make
$ ./ustreamer --help
```
Для Arch Linux в AUR есть готовый пакет: https://aur.archlinux.org/packages/ustreamer
Для Arch Linux в AUR есть готовый пакет: https://aur.archlinux.org/packages/ustreamer
Порт для FreeBSD: https://www.freshports.org/multimedia/ustreamer
-----
# Использование

View File

@@ -3,7 +3,7 @@
pkgname=ustreamer
pkgver=0.69
pkgver=0.75
pkgrel=1
pkgdesc="Lightweight and fast MJPG-HTTP streamer"
url="https://github.com/pi-kvm/ustreamer"

View File

@@ -6,7 +6,7 @@
include $(TOPDIR)/rules.mk
PKG_NAME:=ustreamer
PKG_VERSION:=0.69
PKG_VERSION:=0.75
PKG_RELEASE:=1
PKG_MAINTAINER:=Maxim Devaev <mdevaev@gmail.com>

View File

@@ -22,4 +22,4 @@
#pragma once
#define VERSION "0.69"
#define VERSION "0.75"

View File

@@ -156,6 +156,8 @@ int device_open(struct device_t *dev) {
_device_open_alloc_picbufs(dev);
_device_apply_controls(dev);
dev->run->n_workers = min_u(dev->run->n_buffers, dev->n_workers);
LOG_DEBUG("Device fd=%d initialized", dev->run->fd);
return 0;
@@ -165,6 +167,8 @@ int device_open(struct device_t *dev) {
}
void device_close(struct device_t *dev) {
dev->run->n_workers = 0;
if (dev->run->pictures) {
LOG_DEBUG("Releasing picture buffers ...");
for (unsigned index = 0; index < dev->run->n_buffers && dev->run->pictures[index].data; ++index) {
@@ -318,7 +322,7 @@ static int _device_open_check_cap(struct device_t *dev) {
static int _device_open_dv_timings(struct device_t *dev) {
_device_apply_resolution(dev, dev->width, dev->height);
if (dev->dv_timings) {
LOG_DEBUG("Using DV-timings");
LOG_DEBUG("Using DV timings");
if (_device_apply_dv_timings(dev) < 0) {
return -1;

View File

@@ -63,7 +63,7 @@ struct device_runtime_t {
unsigned height;
unsigned format;
unsigned n_buffers;
// unsigned n_workers; // FIXME
unsigned n_workers;
struct hw_buffer_t *hw_buffers;
struct picture_t *pictures;
size_t max_raw_image_size;

View File

@@ -20,6 +20,7 @@
*****************************************************************************/
#include <stdbool.h>
#include <strings.h>
#include <assert.h>
@@ -66,48 +67,6 @@ struct encoder_t *encoder_init() {
return encoder;
}
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic push
void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) {
#pragma GCC diagnostic pop
assert(encoder->type != ENCODER_TYPE_UNKNOWN);
// XXX: Тут нет гонки, потому что encoder_prepare() запускается еще до существования других потоков
encoder->run->type = encoder->type;
encoder->run->quality = encoder->quality;
LOG_INFO("Using JPEG quality: %u%%", encoder->quality);
# ifdef WITH_OMX_ENCODER
if (encoder->run->type == ENCODER_TYPE_OMX) {
LOG_DEBUG("Preparing OMX JPEG encoder ...");
if (dev->n_workers > OMX_MAX_ENCODERS) {
LOG_INFO("OMX JPEG encoder sets limit for worker threads: %u", OMX_MAX_ENCODERS);
dev->n_workers = OMX_MAX_ENCODERS;
}
encoder->run->n_omxs = dev->n_workers;
A_CALLOC(encoder->run->omxs, encoder->run->n_omxs);
for (unsigned index = 0; index < encoder->run->n_omxs; ++index) {
if ((encoder->run->omxs[index] = omx_encoder_init()) == NULL) {
goto use_fallback;
}
}
}
# endif
return;
# pragma GCC diagnostic ignored "-Wunused-label"
# pragma GCC diagnostic push
use_fallback:
LOG_ERROR("Can't initialize selected encoder, using CPU instead it");
encoder->run->type = ENCODER_TYPE_CPU;
encoder->run->quality = encoder->quality;
# pragma GCC diagnostic pop
}
void encoder_destroy(struct encoder_t *encoder) {
# ifdef WITH_OMX_ENCODER
if (encoder->run->omxs) {
@@ -142,52 +101,87 @@ const char *encoder_type_to_string(enum encoder_type_t type) {
return _ENCODER_TYPES[0].name;
}
void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
assert(encoder->run->type != ENCODER_TYPE_UNKNOWN);
void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) {
enum encoder_type_t type = (encoder->run->cpu_forced ? ENCODER_TYPE_CPU : encoder->type);
unsigned quality = encoder->quality;
bool cpu_forced = false;
if (
(dev->run->format == V4L2_PIX_FMT_MJPEG || dev->run->format == V4L2_PIX_FMT_JPEG)
&& encoder->run->type != ENCODER_TYPE_HW
) {
if ((dev->run->format == V4L2_PIX_FMT_MJPEG || dev->run->format == V4L2_PIX_FMT_JPEG) && type != ENCODER_TYPE_HW) {
LOG_INFO("Switching to HW JPEG encoder because the input format is (M)JPEG");
A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = ENCODER_TYPE_HW;
A_MUTEX_UNLOCK(&encoder->run->mutex);
type = ENCODER_TYPE_HW;
}
if (encoder->run->type == ENCODER_TYPE_HW) {
if (type == ENCODER_TYPE_HW) {
if (dev->run->format != V4L2_PIX_FMT_MJPEG && dev->run->format != V4L2_PIX_FMT_JPEG) {
LOG_INFO("Switching to CPU JPEG encoder because the input format is not (M)JPEG");
goto use_fallback;
goto use_cpu;
}
if (hw_encoder_prepare_live(dev, encoder->quality) < 0) {
A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->quality = 0;
A_MUTEX_UNLOCK(&encoder->run->mutex);
LOG_INFO("Using JPEG quality: HW-default");
if (hw_encoder_prepare(dev, quality) < 0) {
quality = 0;
}
dev->run->n_workers = 1;
}
# ifdef WITH_OMX_ENCODER
else if (encoder->run->type == ENCODER_TYPE_OMX) {
else if (type == ENCODER_TYPE_OMX) {
LOG_DEBUG("Preparing OMX JPEG encoder ...");
if (dev->run->n_workers > OMX_MAX_ENCODERS) {
LOG_INFO("OMX JPEG encoder sets limit for worker threads: %u", OMX_MAX_ENCODERS);
dev->run->n_workers = OMX_MAX_ENCODERS;
}
if (encoder->run->omxs == NULL) {
A_CALLOC(encoder->run->omxs, OMX_MAX_ENCODERS);
}
// Начинаем с нуля и доинициализируем на следующих заходах при необходимости
for (; encoder->run->n_omxs < dev->run->n_workers; ++encoder->run->n_omxs) {
if ((encoder->run->omxs[encoder->run->n_omxs] = omx_encoder_init()) == NULL) {
LOG_ERROR("Can't initialize OMX JPEG encoder, falling back to CPU");
goto force_cpu;
}
}
for (unsigned index = 0; index < encoder->run->n_omxs; ++index) {
if (omx_encoder_prepare_live(encoder->run->omxs[index], dev, encoder->quality) < 0) {
if (omx_encoder_prepare(encoder->run->omxs[index], dev, quality) < 0) {
LOG_ERROR("Can't prepare OMX JPEG encoder, falling back to CPU");
goto use_fallback;
goto force_cpu;
}
}
}
# endif
return;
goto ok;
# pragma GCC diagnostic ignored "-Wunused-label"
# pragma GCC diagnostic push
force_cpu:
cpu_forced = true;
# pragma GCC diagnostic pop
use_cpu:
type = ENCODER_TYPE_CPU;
quality = encoder->quality;
ok:
if (quality == 0) {
LOG_INFO("Using JPEG quality: encoder default");
} else {
LOG_INFO("Using JPEG quality: %u%%", quality);
}
use_fallback:
A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = ENCODER_TYPE_CPU;
encoder->run->quality = encoder->quality;
encoder->run->type = type;
encoder->run->quality = quality;
if (cpu_forced) {
encoder->run->cpu_forced = true;
}
A_MUTEX_UNLOCK(&encoder->run->mutex);
}
#pragma GCC diagnostic ignored "-Wunused-label"
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic push
int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, unsigned worker_number, unsigned buf_index) {
#pragma GCC diagnostic pop
@@ -195,14 +189,14 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
assert(encoder->run->type != ENCODER_TYPE_UNKNOWN);
if (encoder->run->type == ENCODER_TYPE_CPU) {
cpu_encoder_compress_buffer(dev, buf_index, encoder->quality);
cpu_encoder_compress_buffer(dev, buf_index, encoder->run->quality);
} else if (encoder->run->type == ENCODER_TYPE_HW) {
hw_encoder_compress_buffer(dev, buf_index);
}
# ifdef WITH_OMX_ENCODER
else if (encoder->run->type == ENCODER_TYPE_OMX) {
if (omx_encoder_compress_buffer(encoder->run->omxs[worker_number], dev, buf_index) < 0) {
goto use_fallback;
goto error;
}
}
# endif
@@ -211,11 +205,10 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
# pragma GCC diagnostic ignored "-Wunused-label"
# pragma GCC diagnostic push
use_fallback:
LOG_INFO("Error while compressing, falling back to CPU");
error:
LOG_INFO("Error while compressing buffer, falling back to CPU");
A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = ENCODER_TYPE_CPU;
encoder->run->quality = encoder->quality;
encoder->run->cpu_forced = true;
A_MUTEX_UNLOCK(&encoder->run->mutex);
return -1;
# pragma GCC diagnostic pop

View File

@@ -22,6 +22,8 @@
#pragma once
#include <stdbool.h>
#include "pthread.h"
#include "tools.h"
@@ -51,6 +53,7 @@ enum encoder_type_t {
struct encoder_runtime_t {
enum encoder_type_t type;
unsigned quality;
bool cpu_forced;
pthread_mutex_t mutex;
# ifdef WITH_OMX_ENCODER
@@ -74,6 +77,4 @@ enum encoder_type_t encoder_parse_type(const char *str);
const char *encoder_type_to_string(enum encoder_type_t type);
void encoder_prepare(struct encoder_t *encoder, struct device_t *dev);
void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev);
int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, unsigned worker_number, unsigned buf_index);

View File

@@ -105,8 +105,6 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned
# undef WRITE_SCANLINES
// TODO: process jpeg errors:
// https://stackoverflow.com/questions/19857766/error-handling-in-libjpeg
jpeg_finish_compress(&jpeg);
jpeg_destroy_compress(&jpeg);

View File

@@ -44,7 +44,7 @@ static bool _is_huffman(const unsigned char *data);
static size_t _memcpy_with_huffman(unsigned char *dest, const unsigned char *src, size_t size);
int hw_encoder_prepare_live(struct device_t *dev, unsigned quality) {
int hw_encoder_prepare(struct device_t *dev, unsigned quality) {
struct v4l2_jpegcompression comp;
MEMSET_ZERO(comp);

View File

@@ -25,5 +25,5 @@
#include "../../device.h"
int hw_encoder_prepare_live(struct device_t *dev, unsigned quality);
int hw_encoder_prepare(struct device_t *dev, unsigned quality);
void hw_encoder_compress_buffer(struct device_t *dev, unsigned index);

View File

@@ -153,7 +153,7 @@ void omx_encoder_destroy(struct omx_encoder_t *omx) {
free(omx);
}
int omx_encoder_prepare_live(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality) {
int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality) {
if (component_set_state(&omx->encoder, OMX_StateIdle) < 0) {
return -1;
}

View File

@@ -53,5 +53,5 @@ struct omx_encoder_t {
struct omx_encoder_t *omx_encoder_init();
void omx_encoder_destroy(struct omx_encoder_t *omx);
int omx_encoder_prepare_live(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality);
int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality);
int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, unsigned index);

View File

@@ -145,6 +145,7 @@ static int _jpeg_read_geometry(FILE *fp, unsigned *width, unsigned *height) {
jpeg_create_decompress(&jpeg);
// https://stackoverflow.com/questions/19857766/error-handling-in-libjpeg
jpeg.err = jpeg_std_error((struct jpeg_error_mgr *)&jpeg_error);
jpeg_error.mgr.error_exit = _jpeg_error_handler;
if (setjmp(jpeg_error.jmp) < 0) {

View File

@@ -121,7 +121,10 @@ void http_server_destroy(struct http_server_t *server) {
close(server->run->unix_fd);
}
event_base_free(server->run->base);
# if LIBEVENT_VERSION_NUMBER >= 0x02010100
libevent_global_shutdown();
# endif
for (struct stream_client_t *client = server->run->stream_clients; client != NULL;) {
struct stream_client_t *next = client->next;
@@ -199,19 +202,25 @@ int http_server_listen(struct http_server_t *server) {
if (server->unix_path[0] != '\0') {
struct sockaddr_un unix_addr;
int unix_fd_flags;
LOG_DEBUG("Binding HTTP to UNIX socket '%s' ...", server->unix_path);
assert((server->run->unix_fd = socket(AF_UNIX, SOCK_STREAM, 0)));
assert((unix_fd_flags = fcntl(server->run->unix_fd, F_GETFL)) >= 0);
unix_fd_flags |= O_NONBLOCK;
assert(fcntl(server->run->unix_fd, F_SETFL, unix_fd_flags) >= 0);
# define MAX_SUN_PATH (sizeof(unix_addr.sun_path) - 1)
strncpy(unix_addr.sun_path, server->unix_path, 107);
unix_addr.sun_path[107] = '\0';
if (strlen(server->unix_path) > MAX_SUN_PATH) {
LOG_ERROR("UNIX socket path is too long, max=%zu", MAX_SUN_PATH);
return -1;
}
MEMSET_ZERO(unix_addr);
strncpy(unix_addr.sun_path, server->unix_path, MAX_SUN_PATH);
unix_addr.sun_family = AF_UNIX;
# undef MAX_SUN_PATH
assert((server->run->unix_fd = socket(AF_UNIX, SOCK_STREAM, 0)) >= 0);
assert(!evutil_make_socket_nonblocking(server->run->unix_fd));
if (server->unix_rm && unlink(server->unix_path) < 0) {
if (errno != ENOENT) {
LOG_PERROR("Can't remove old UNIX socket '%s'", server->unix_path);
@@ -266,7 +275,6 @@ static bool _http_get_param_true(struct evkeyvalq *params, const char *key) {
if ((value_str = evhttp_find_header(params, key)) != NULL) {
if (
value_str[0] == '1'
|| value_str[0] == 'y'
|| !evutil_ascii_strcasecmp(value_str, "true")
|| !evutil_ascii_strcasecmp(value_str, "yes")
) {

View File

@@ -129,18 +129,19 @@ static void _help(struct device_t *dev, struct encoder_t *encoder, struct http_s
printf(" -z|--min-frame-size <N> ──────── Drop frames smaller then this limit. Useful if the device\n");
printf(" produces small-sized garbage frames. Default: disabled.\n\n");
printf(" -n|--persistent ──────────────── Don't re-initialize device on timeout. Default: disabled.\n\n");
printf(" -t|--dv-timings ──────────────── Enable DV timings queriyng and events processing.\n");
printf(" Supports automatic resolution changing. Default: disabled.\n\n");
printf(" -t|--dv-timings ──────────────── Enable DV timings queriyng and events processing\n");
printf(" to automatic resolution change. Default: disabled.\n\n");
printf(" -b|--buffers <N> ─────────────── The number of buffers to receive data from the device.\n");
printf(" Each buffer may processed using an intermediate thread.\n");
printf(" Default: %u (the number of CPU cores (but not more 4) + 1).\n\n", dev->n_buffers);
printf(" -w|--workers <N> ─────────────── The number of worker threads. Default: %u (== --buffers).\n\n", dev->n_workers);
printf(" -w|--workers <N> ─────────────── The number of worker threads but not more than buffers.\n");
printf(" Default: %u (== --buffers).\n\n", dev->n_workers);
printf(" -q|--quality <N> ─────────────── Set quality of JPEG encoding from 1 to 100 (best). Default: %u.\n\n", encoder->quality);
printf(" -c|--encoder <type> ──────────── Use specified encoder. It may affects to workers number.\n");
printf(" Available: %s; default: CPU.\n\n", ENCODER_TYPES_STR);
printf(" --device-timeout <seconds> ───── Timeout for device querying. Default: %u\n\n", dev->timeout);
printf(" --device-timeout <seconds> ───── Timeout for device querying. Default: %u.\n\n", dev->timeout);
printf(" --device-error-delay <seconds> ─ Delay before trying to connect to the device again\n");
printf(" after an error (timeout for example). Default: %u\n\n", dev->error_delay);
printf(" after an error (timeout for example). Default: %u.\n\n", dev->error_delay);
printf("Image control options:\n");
printf("══════════════════════\n");
printf(" --brightness <N> ───────────── Set brightness. Default: no change.\n\n");
@@ -383,7 +384,6 @@ int main(int argc, char *argv[]) {
if ((exit_code = _parse_options(argc, argv, dev, encoder, server)) == 0) {
_install_signal_handlers();
encoder_prepare(encoder, dev);
pthread_t stream_loop_tid;
pthread_t server_loop_tid;

View File

@@ -119,7 +119,7 @@ void stream_loop(struct stream_t *stream) {
LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number);
} else {
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
if (
!atomic_load(&pool.workers[number].has_job) && (
free_worker_number == -1
@@ -336,7 +336,7 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
long double min_delay;
long double soft_delay;
for (unsigned number = 0; number < dev->n_workers; ++number) {
for (unsigned number = 0; number < dev->run->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
@@ -347,9 +347,9 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
# undef WORKER
}
avg_comp_time = sum_comp_time / dev->n_workers; // Среднее время работы воркеров
avg_comp_time = sum_comp_time / dev->run->n_workers; // Среднее время работы воркеров
min_delay = avg_comp_time / dev->n_workers; // Среднее время работы размазывается на N воркеров
min_delay = avg_comp_time / dev->run->n_workers; // Среднее время работы размазывается на N воркеров
if (dev->desired_fps > 0 && min_delay > 0) {
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
@@ -389,7 +389,7 @@ static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) {
goto error;
}
encoder_prepare_live(pool->encoder, stream->dev);
encoder_prepare(pool->encoder, stream->dev);
_stream_init_workers(stream, pool);
@@ -401,15 +401,15 @@ static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) {
}
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) {
LOG_INFO("Spawning %u workers ...", stream->dev->n_workers);
LOG_INFO("Spawning %u workers ...", stream->dev->run->n_workers);
atomic_store(&pool->workers_stop, false);
A_CALLOC(pool->workers, stream->dev->n_workers);
A_CALLOC(pool->workers, stream->dev->run->n_workers);
A_MUTEX_INIT(&pool->free_workers_mutex);
A_COND_INIT(&pool->free_workers_cond);
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
pool->free_workers += 1;
@@ -492,7 +492,7 @@ static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool
LOG_INFO("Destroying workers ...");
atomic_store(&pool->workers_stop, true);
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
A_MUTEX_LOCK(&WORKER(has_job_mutex));