diff --git a/python/ustreamer.c b/python/ustreamer.c index e8a5b8f..8c5a274 100644 --- a/python/ustreamer.c +++ b/python/ustreamer.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -17,27 +18,50 @@ #include "../src/libs/memsinksh.h" // No sources again +typedef struct { + uint64_t id; + long double ts; + + uint8_t *data; + size_t used; + size_t allocated; + + unsigned width; + unsigned height; + unsigned format; + unsigned stride; + + bool online; + + long double grab_ts; + long double encode_begin_ts; + long double encode_end_ts; +} tmp_frame_s; + typedef struct { PyObject_HEAD char *obj; double lock_timeout; double wait_timeout; + double drop_same_frames; int fd; memsink_shared_s *mem; - uint8_t *tmp_data; - size_t tmp_data_allocated; - uint64_t last_id; - PyObject *frame; // PyDict + tmp_frame_s *tmp_frame; + PyObject *dict_frame; // PyDict } MemsinkObject; +#define MEM(_next) self->mem->_next +#define TMP(_next) self->tmp_frame->_next + + static void MemsinkObject_destroy_internals(MemsinkObject *self) { - if (self->frame != NULL) { - Py_DECREF(self->frame); - self->frame = NULL; + if (self->dict_frame != NULL) { + Py_DECREF(self->dict_frame); + self->dict_frame = NULL; } if (self->mem != NULL) { munmap(self->mem, sizeof(memsink_shared_s)); @@ -47,10 +71,12 @@ static void MemsinkObject_destroy_internals(MemsinkObject *self) { close(self->fd); self->fd = -1; } - if (self->tmp_data) { - free(self->tmp_data); - self->tmp_data = NULL; - self->tmp_data_allocated = 0; + if (self->tmp_frame) { + if (TMP(data)) { + free(TMP(data)); + } + free(self->tmp_frame); + self->tmp_frame = NULL; } } @@ -58,27 +84,29 @@ static int MemsinkObject_init(MemsinkObject *self, PyObject *args, PyObject *kwa self->lock_timeout = 1; self->wait_timeout = 1; - static char *kws[] = {"obj", "lock_timeout", "wait_timeout", NULL}; + static char *kws[] = {"obj", "lock_timeout", "wait_timeout", "drop_same_frames", NULL}; if (!PyArg_ParseTupleAndKeywords( - args, kwargs, "s|dd", kws, - &self->obj, &self->lock_timeout, &self->wait_timeout)) { + args, kwargs, "s|ddd", kws, + &self->obj, &self->lock_timeout, &self->wait_timeout, &self->drop_same_frames)) { return -1; } -# define SET_TIMEOUT(_timeout) { \ - if (self->_timeout <= 0) { \ - PyErr_SetString(PyExc_ValueError, #_timeout " must be > 0"); \ +# define SET_DOUBLE(_field, _cond) { \ + if (!(self->_field _cond)) { \ + PyErr_SetString(PyExc_ValueError, #_field " must be " #_cond); \ return -1; \ } \ } - SET_TIMEOUT(lock_timeout); - SET_TIMEOUT(wait_timeout); + SET_DOUBLE(lock_timeout, > 0); + SET_DOUBLE(wait_timeout, > 0); + SET_DOUBLE(drop_same_frames, >= 0); -# undef CHECK_TIMEOUT +# undef SET_DOUBLE - self->tmp_data_allocated = 512 * 1024; - A_REALLOC(self->tmp_data, self->tmp_data_allocated); + A_CALLOC(self->tmp_frame, 1); + TMP(allocated) = 512 * 1024; + A_REALLOC(TMP(data), TMP(allocated)); if ((self->fd = shm_open(self->obj, O_RDWR, 0)) == -1) { PyErr_SetFromErrno(PyExc_OSError); @@ -102,7 +130,7 @@ static int MemsinkObject_init(MemsinkObject *self, PyObject *args, PyObject *kwa goto error; } - if ((self->frame = PyDict_New()) == NULL) { + if ((self->dict_frame = PyDict_New()) == NULL) { goto error; } @@ -138,8 +166,6 @@ static PyObject *MemsinkObject_exit(MemsinkObject *self, PyObject *Py_UNUSED(ign return PyObject_CallMethod((PyObject *)self, "close", ""); } -#define MEM(_next) self->mem->_next - static int wait_frame(MemsinkObject *self) { long double deadline_ts = get_now_monotonic() + self->wait_timeout; @@ -149,21 +175,47 @@ static int wait_frame(MemsinkObject *self) { return -1; \ } + long double now; do { Py_BEGIN_ALLOW_THREADS int retval = flock_timedwait_monotonic(self->fd, self->lock_timeout); + now = get_now_monotonic(); + if (retval < 0 && errno != EWOULDBLOCK) { RETURN_OS_ERROR; + } else if (retval == 0) { - if (MEM(magic) == MEMSINK_MAGIC && MEM(version) == MEMSINK_VERSION && MEM(id) != self->last_id) { + if (MEM(magic) == MEMSINK_MAGIC && MEM(version) == MEMSINK_VERSION && TMP(id) != MEM(id)) { + if (self->drop_same_frames > 0) { +# define CMP(_field) (TMP(_field) == MEM(_field)) + if ( + CMP(used) + && CMP(width) + && CMP(height) + && CMP(format) + && CMP(stride) + && CMP(online) + && (TMP(ts) + self->drop_same_frames > now) + && !memcmp(TMP(data), MEM(data), MEM(used)) + ) { + TMP(id) = MEM(id); + goto drop; + } +# undef CMP + } + Py_BLOCK_THREADS return 0; } + if (flock(self->fd, LOCK_UN) < 0) { RETURN_OS_ERROR; } } + + drop: + if (usleep(1000) < 0) { RETURN_OS_ERROR; } @@ -173,7 +225,7 @@ static int wait_frame(MemsinkObject *self) { if (PyErr_CheckSignals() < 0) { return -1; } - } while (get_now_monotonic() < deadline_ts); + } while (now < deadline_ts); # undef RETURN_OS_ERROR @@ -192,65 +244,66 @@ static PyObject *MemsinkObject_wait_frame(MemsinkObject *self, PyObject *Py_UNUS default: return NULL; } -# define COPY(_type, _field) _type tmp_##_field = MEM(_field) - COPY(unsigned, width); - COPY(unsigned, height); - COPY(unsigned, format); - COPY(unsigned, stride); - COPY(bool, online); - COPY(double, grab_ts); - COPY(double, encode_begin_ts); - COPY(double, encode_end_ts); - COPY(unsigned, used); +# define COPY(_field) TMP(_field) = MEM(_field) + COPY(width); + COPY(height); + COPY(format); + COPY(stride); + COPY(online); + COPY(grab_ts); + COPY(encode_begin_ts); + COPY(encode_end_ts); + COPY(used); # undef COPY - // Временный буффер используется для скорейшего разблокирования синка - if (self->tmp_data_allocated < MEM(used)) { + if (TMP(allocated) < MEM(used)) { size_t size = MEM(used) + (512 * 1024); - A_REALLOC(self->tmp_data, size); - self->tmp_data_allocated = size; + A_REALLOC(TMP(data), size); + TMP(allocated) = size; } - memcpy(self->tmp_data, MEM(data), MEM(used)); + memcpy(TMP(data), MEM(data), MEM(used)); + TMP(used) = MEM(used); - MEM(last_client_ts) = get_now_monotonic(); - self->last_id = MEM(id); + TMP(id) = MEM(id); + TMP(ts) = get_now_monotonic(); + MEM(last_client_ts) = TMP(ts); if (flock(self->fd, LOCK_UN) < 0) { return PyErr_SetFromErrno(PyExc_OSError); } - PyDict_Clear(self->frame); + PyDict_Clear(self->dict_frame); # define SET_VALUE(_key, _maker) { \ PyObject *_tmp = _maker; \ if (_tmp == NULL) { \ return NULL; \ } \ - if (PyDict_SetItemString(self->frame, _key, _tmp) < 0) { \ + if (PyDict_SetItemString(self->dict_frame, _key, _tmp) < 0) { \ Py_DECREF(_tmp); \ return NULL; \ } \ Py_DECREF(_tmp); \ } +# define SET_NUMBER(_key, _from, _to) SET_VALUE(#_key, Py##_to##_From##_from(TMP(_key))) - SET_VALUE("width", PyLong_FromLong(tmp_width)); - SET_VALUE("height", PyLong_FromLong(tmp_height)); - SET_VALUE("format", PyLong_FromLong(tmp_format)); - SET_VALUE("stride", PyLong_FromLong(tmp_stride)); - SET_VALUE("online", PyBool_FromLong(tmp_online)); - SET_VALUE("grab_ts", PyFloat_FromDouble(tmp_grab_ts)); - SET_VALUE("encode_begin_ts", PyFloat_FromDouble(tmp_encode_begin_ts)); - SET_VALUE("encode_end_ts", PyFloat_FromDouble(tmp_encode_end_ts)); - SET_VALUE("data", PyBytes_FromStringAndSize((const char *)self->tmp_data, tmp_used)); + SET_NUMBER(width, Long, Long); + SET_NUMBER(height, Long, Long); + SET_NUMBER(format, Long, Long); + SET_NUMBER(stride, Long, Long); + SET_NUMBER(online, Long, Bool); + SET_NUMBER(grab_ts, Double, Float); + SET_NUMBER(encode_begin_ts, Double, Float); + SET_NUMBER(encode_end_ts, Double, Float); + SET_VALUE("data", PyBytes_FromStringAndSize((const char *)TMP(data), TMP(used))); +# undef SET_NUMBER # undef SET_VALUE - Py_INCREF(self->frame); - return self->frame; + Py_INCREF(self->dict_frame); + return self->dict_frame; } -#undef MEM - static PyObject *MemsinkObject_is_opened(MemsinkObject *self, PyObject *Py_UNUSED(ignored)) { return PyBool_FromLong(self->mem != NULL && self->fd > 0); } @@ -263,6 +316,7 @@ static PyObject *MemsinkObject_is_opened(MemsinkObject *self, PyObject *Py_UNUSE FIELD_GETTER(obj, String, Unicode) FIELD_GETTER(lock_timeout, Double, Float) FIELD_GETTER(wait_timeout, Double, Float) +FIELD_GETTER(drop_same_frames, Double, Float) #undef FIELD_GETTER @@ -283,6 +337,7 @@ static PyGetSetDef MemsinkObject_getsets[] = { ADD_GETTER(obj), ADD_GETTER(lock_timeout), ADD_GETTER(wait_timeout), + ADD_GETTER(drop_same_frames), {}, # undef ADD_GETTER }; @@ -324,3 +379,6 @@ PyMODINIT_FUNC PyInit_ustreamer(void) { // cppcheck-suppress unusedFunction return module; } + +#undef TMP +#undef MEM