diff --git a/Makefile.common b/Makefile.common index 41d7f859ad..a335a7c4c2 100644 --- a/Makefile.common +++ b/Makefile.common @@ -1965,7 +1965,9 @@ endif ifeq ($(HAVE_FFMPEG), 1) OBJ += record/drivers/record_ffmpeg.o \ - cores/libretro-ffmpeg/ffmpeg_core.o + cores/libretro-ffmpeg/ffmpeg_core.o \ + cores/libretro-ffmpeg/swsbuffer.o \ + cores/libretro-ffmpeg/tpool.o LIBS += $(AVCODEC_LIBS) $(AVFORMAT_LIBS) $(AVUTIL_LIBS) $(SWSCALE_LIBS) $(SWRESAMPLE_LIBS) $(FFMPEG_LIBS) DEFINES += -DHAVE_FFMPEG diff --git a/cores/libretro-ffmpeg/Makefile.common b/cores/libretro-ffmpeg/Makefile.common index 8fd4a6ed2e..87e2e5ef08 100644 --- a/cores/libretro-ffmpeg/Makefile.common +++ b/cores/libretro-ffmpeg/Makefile.common @@ -19,6 +19,8 @@ SWRESAMPLE_DIR := $(BASE_DIR)/libswresample INCFLAGS += -I$(BASE_DIR) -I$(CORE_DIR) -I$(LIBRETRO_COMM_DIR)/include -I$(LIBRETRO_COMM_DIR)/include/compat LIBRETRO_SOURCE += $(CORE_DIR)/ffmpeg_core.c \ + $(CORE_DIR)/swsbuffer.c \ + $(CORE_DIR)/tpool.c \ $(LIBRETRO_COMM_DIR)/queues/fifo_queue.c \ $(LIBRETRO_COMM_DIR)/rthreads/rthreads.c diff --git a/cores/libretro-ffmpeg/ffmpeg_core.c b/cores/libretro-ffmpeg/ffmpeg_core.c index f6a62040de..0627c623e8 100644 --- a/cores/libretro-ffmpeg/ffmpeg_core.c +++ b/cores/libretro-ffmpeg/ffmpeg_core.c @@ -44,9 +44,13 @@ extern "C" { #include #endif +#include +#include #include #include #include +#include "swsbuffer.h" +#include "tpool.h" #include #ifdef RARCH_INTERNAL @@ -56,10 +60,6 @@ extern "C" { #define CORE_PREFIX(s) s #endif -#ifndef PIX_FMT_RGB32 -#define PIX_FMT_RGB32 AV_PIX_FMT_RGB32 -#endif - #define PRINT_VERSION(s) log_cb(RETRO_LOG_INFO, "[FFMPEG] lib%s version:\t%d.%d.%d\n", #s, \ s ##_version() >> 16 & 0xFF, \ s ##_version() >> 8 & 0xFF, \ @@ -87,7 +87,11 @@ static AVFormatContext *fctx; static AVCodecContext *vctx; static int video_stream_index; static enum AVColorSpace colorspace; -static int sw_decoder_threads; + +static unsigned sw_decoder_threads; +static unsigned sw_sws_threads; +static swsbuffer_t *swsbuffer; +static tpool_t *tpool; #if LIBAVUTIL_VERSION_MAJOR > 55 static enum AVHWDeviceType hw_decoder; @@ -96,7 +100,6 @@ static enum AVPixelFormat pix_fmt; static bool force_sw_decoder; #endif - #define MAX_STREAMS 8 static AVCodecContext *actx[MAX_STREAMS]; static AVCodecContext *sctx[MAX_STREAMS]; @@ -277,10 +280,10 @@ void CORE_PREFIX(retro_set_environment)(retro_environment_t cb) { static const struct retro_variable vars[] = { #if LIBAVUTIL_VERSION_MAJOR > 55 - { "ffmpeg_hw_decoder", "Use Hardware decoder (restart); auto|off|" + { "ffmpeg_hw_decoder", "Use Hardware decoder (restart); off|auto|" "cuda|d3d11va|drm|dxva2|mediacodec|opencl|qsv|vaapi|vdpau|videotoolbox" }, #endif - { "ffmpeg_sw_decoder_threads", "Software decoder thread count (restart); 1|2|4|8|16" }, + { "ffmpeg_sw_decoder_threads", "Software decoder thread count (restart); auto|1|2|4|8|16" }, #if defined(HAVE_OPENGL) || defined(HAVE_OPENGLES) { "ffmpeg_temporal_interp", "Temporal Interpolation; disabled|enabled" }, #ifdef HAVE_GL_FFT @@ -443,12 +446,22 @@ static void check_variables(bool firststart) } #endif - sw_threads_var.key = "ffmpeg_sw_decoder_threads"; - if (CORE_PREFIX(environ_cb)(RETRO_ENVIRONMENT_GET_VARIABLE, &sw_threads_var) && sw_threads_var.value) + if (firststart) { - slock_lock(decode_thread_lock); - sw_decoder_threads = strtoul(sw_threads_var.value, NULL, 0); - slock_unlock(decode_thread_lock); + sw_threads_var.key = "ffmpeg_sw_decoder_threads"; + if (CORE_PREFIX(environ_cb)(RETRO_ENVIRONMENT_GET_VARIABLE, &sw_threads_var) && sw_threads_var.value) + { + if (string_is_equal(sw_threads_var.value, "auto")) + { + sw_decoder_threads = cpu_features_get_core_amount(); + } + else + { + sw_decoder_threads = strtoul(sw_threads_var.value, NULL, 0); + } + /* Scale the sws threads based on core count but use at min 2 and max 4 threads */ + sw_sws_threads = MIN(MAX(2, sw_decoder_threads / 2), 4); + } } } @@ -463,7 +476,6 @@ static void seek_frame(int seek_frames) frame_cnt += seek_frames; slock_lock(fifo_lock); - do_seek = true; seek_time = frame_cnt / media.interpolate_fps; @@ -487,8 +499,14 @@ static void seek_frame(int seek_frames) scond_signal(fifo_decode_cond); while (!decode_thread_dead && do_seek) + { + main_sleeping = true; scond_wait(fifo_cond, fifo_lock); + main_sleeping = false; + } + slock_unlock(fifo_lock); + } void CORE_PREFIX(retro_run)(void) @@ -826,7 +844,7 @@ static enum AVPixelFormat init_hw_decoder(struct AVCodecContext *ctx, const enum AVHWDeviceType type, const enum AVPixelFormat *pix_fmts) { - int ret; + int ret = 0; enum AVPixelFormat decoder_pix_fmt = AV_PIX_FMT_NONE; struct AVCodec *codec = avcodec_find_decoder(fctx->streams[video_stream_index]->codec->codec_id); @@ -891,7 +909,7 @@ exit: static enum AVPixelFormat auto_hw_decoder(AVCodecContext *ctx, const enum AVPixelFormat *pix_fmts) { - int ret; + int ret = 0; enum AVPixelFormat decoder_pix_fmt = AV_PIX_FMT_NONE; enum AVHWDeviceType type = AV_HWDEVICE_TYPE_NONE; @@ -932,6 +950,7 @@ static enum AVPixelFormat select_decoder(AVCodecContext *ctx, ctx->thread_type = FF_THREAD_FRAME; ctx->thread_count = sw_decoder_threads; + log_cb(RETRO_LOG_INFO, "[FFMPEG] Configured software decoding threads: %d\n", sw_decoder_threads); format = fctx->streams[video_stream_index]->codec->pix_fmt; @@ -965,7 +984,7 @@ static enum AVPixelFormat get_format(AVCodecContext *ctx, static bool open_codec(AVCodecContext **ctx, enum AVMediaType type, unsigned index) { - int ret; + int ret = 0; AVCodec *codec = avcodec_find_decoder(fctx->streams[index]->codec->codec_id); if (!codec) @@ -1001,13 +1020,8 @@ static bool codec_is_image(enum AVCodecID id) { switch (id) { -#ifdef OLD_FFMPEG_API - case CODEC_ID_MJPEG: - case CODEC_ID_PNG: -#else case AV_CODEC_ID_MJPEG: case AV_CODEC_ID_PNG: -#endif return true; default: @@ -1021,11 +1035,7 @@ static bool codec_id_is_ttf(enum AVCodecID id) { switch (id) { -#ifdef OLD_FFMPEG_API - case CODEC_ID_TTF: -#else case AV_CODEC_ID_TTF: -#endif return true; default: @@ -1040,12 +1050,8 @@ static bool codec_id_is_ass(enum AVCodecID id) { switch (id) { -#ifdef OLD_FFMPEG_API - case CODEC_ID_SSA: -#else case AV_CODEC_ID_ASS: case AV_CODEC_ID_SSA: -#endif return true; default: break; @@ -1088,7 +1094,7 @@ static bool open_codecs(void) break; case AVMEDIA_TYPE_VIDEO: - if ( !vctx + if (!vctx && !codec_is_image(fctx->streams[i]->codec->codec_id)) { if (!open_codec(&vctx, type, i)) @@ -1098,7 +1104,7 @@ static bool open_codecs(void) case AVMEDIA_TYPE_SUBTITLE: #ifdef HAVE_SSA - if ( subtitle_streams_num < MAX_STREAMS + if (subtitle_streams_num < MAX_STREAMS && codec_id_is_ass(fctx->streams[i]->codec->codec_id)) { int size; @@ -1271,23 +1277,114 @@ static void render_ass_img(AVFrame *conv_frame, ASS_Image *img) } #endif +static void sws_worker_thread(void *arg) +{ + int ret = 0; + AVFrame *tmp_frame = NULL; + sws_context_t *ctx = (sws_context_t*) arg; + +#if LIBAVUTIL_VERSION_MAJOR > 55 + if (hw_decoding_enabled) + tmp_frame = ctx->hw_source; + else +#endif + tmp_frame = ctx->source; + + ctx->sws = sws_getCachedContext(ctx->sws, + media.width, media.height, tmp_frame->format, + media.width, media.height, PIX_FMT_RGB32, + SWS_POINT, NULL, NULL, NULL); + + set_colorspace(ctx->sws, media.width, media.height, + av_frame_get_colorspace(tmp_frame), + av_frame_get_color_range(tmp_frame)); + + if ((ret = sws_scale(ctx->sws, (const uint8_t *const*)tmp_frame->data, + tmp_frame->linesize, 0, media.height, + (uint8_t * const*)ctx->target->data, ctx->target->linesize)) < 0) + { + log_cb(RETRO_LOG_ERROR, "[FFMPEG] Error while scaling image: %s\n", av_err2str(ret)); + } + + swsbuffer_finish_slot(swsbuffer, ctx); +} #ifdef HAVE_SSA -static void decode_video(AVCodecContext *ctx, AVPacket *pkt, AVFrame *conv_frame, size_t frame_size, struct SwsContext **sws, ASS_Track *ass_track_active) +static void add_frame_to_fifo(size_t frame_size, ASS_Track *ass_track_active) #else -static void decode_video(AVCodecContext *ctx, AVPacket *pkt, AVFrame *conv_frame, size_t frame_size, struct SwsContext **sws) +static void add_frame_to_fifo(size_t frame_size) #endif { - int ret; - AVFrame *frame = NULL; - AVFrame *sw_frame = NULL; - AVFrame *tmp_frame = NULL; + sws_context_t *ctx = NULL; - if (!(frame = av_frame_alloc()) || !(sw_frame = av_frame_alloc())) + swsbuffer_get_finished_slot(swsbuffer, &ctx); + size_t decoded_size; + int64_t pts = ctx->source->best_effort_timestamp; + double video_time = pts * av_q2d(fctx->streams[video_stream_index]->time_base); + +#ifdef HAVE_SSA + if (ass_render && ass_track_active) { - log_cb(RETRO_LOG_ERROR, "[FFMPEG] Can not alloc frames\n"); - return; + int change = 0; + ASS_Image *img = ass_render_frame(ass_render, ass_track_active, + 1000 * video_time, &change); + + /* + * Do it on CPU for now. + * We're in a thread anyways, so shouldn't really matter. + */ + render_ass_img(ctx->target, img); } +#endif + + decoded_size = frame_size + sizeof(pts); + + slock_lock(fifo_lock); + while (!decode_thread_dead && (video_decode_fifo != NULL) + && fifo_write_avail(video_decode_fifo) < decoded_size) + { + if (!main_sleeping) + scond_wait(fifo_decode_cond, fifo_lock); + else + { + fifo_clear(video_decode_fifo); + break; + } + } + + decode_last_video_time = video_time; + if (!decode_thread_dead) + { + int stride; + unsigned y; + const uint8_t *src = NULL; + + fifo_write(video_decode_fifo, &pts, sizeof(pts)); + src = ctx->target->data[0]; + stride = ctx->target->linesize[0]; + + for (y = 0; y < media.height; y++, src += stride) + fifo_write(video_decode_fifo, src, media.width * sizeof(uint32_t)); + } + scond_signal(fifo_cond); + slock_unlock(fifo_lock); + + av_frame_unref(ctx->source); +#if LIBAVUTIL_VERSION_MAJOR > 55 + av_frame_unref(ctx->hw_source); +#endif + + swsbuffer_open_slot(swsbuffer, ctx); +} + +#ifdef HAVE_SSA +static void decode_video(AVCodecContext *ctx, AVPacket *pkt, size_t frame_size, ASS_Track *ass_track_active) +#else +static void decode_video(AVCodecContext *ctx, AVPacket *pkt, size_t frame_size) +#endif +{ + int ret = 0; + sws_context_t *sws_ctx = NULL; if ((ret = avcodec_send_packet(ctx, pkt)) < 0) { @@ -1295,108 +1392,64 @@ static void decode_video(AVCodecContext *ctx, AVPacket *pkt, AVFrame *conv_frame return; } - while(true) + /* Stop decoding thread until swsbuffer is not full again */ + while (!swsbuffer_has_open_slot(swsbuffer)) { - ret = avcodec_receive_frame(ctx, frame); + while(swsbuffer_has_finished_slot(swsbuffer)) + { + #ifdef HAVE_SSA + add_frame_to_fifo(frame_size, ass_track_active); + #else + add_frame_to_fifo(frame_size); + #endif + } + } + + while (swsbuffer_has_open_slot(swsbuffer)) + { + swsbuffer_get_open_slot(swsbuffer, &sws_ctx); + + ret = avcodec_receive_frame(ctx, sws_ctx->source); if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { - break; + ret = -42; + goto end; } else if (ret < 0) { log_cb(RETRO_LOG_ERROR, "[FFMPEG] Error while reading video frame: %s\n", av_err2str(ret)); - goto fail; + goto end; } #if LIBAVUTIL_VERSION_MAJOR > 55 if (hw_decoding_enabled) - { /* Copy data from VRAM to RAM */ - if ((ret = av_hwframe_transfer_data(sw_frame, frame, 0)) < 0) + if ((ret = av_hwframe_transfer_data(sws_ctx->hw_source, sws_ctx->source, 0)) < 0) { log_cb(RETRO_LOG_ERROR, "[FFMPEG] Error transferring the data to system memory: %s\n", av_err2str(ret)); - goto fail; + goto end; } - tmp_frame = sw_frame; - } - else #endif - tmp_frame = frame; - *sws = sws_getCachedContext(*sws, - media.width, media.height, tmp_frame->format, - media.width, media.height, PIX_FMT_RGB32, - SWS_BICUBIC, NULL, NULL, NULL); + tpool_add_work(tpool, sws_worker_thread, sws_ctx); - set_colorspace(*sws, media.width, media.height, - av_frame_get_colorspace(tmp_frame), av_frame_get_color_range(tmp_frame)); - - if ((ret = sws_scale(*sws, (const uint8_t * const*)tmp_frame->data, - tmp_frame->linesize, 0, media.height, - conv_frame->data, conv_frame->linesize)) < 0) + while(swsbuffer_has_finished_slot(swsbuffer)) { - log_cb(RETRO_LOG_ERROR, "[FFMPEG] Error while scaling image: %s\n", av_err2str(ret)); - goto fail; - } - - size_t decoded_size; - int64_t pts = frame->best_effort_timestamp; - double video_time = pts * av_q2d(fctx->streams[video_stream_index]->time_base); - #ifdef HAVE_SSA - if (ass_render && ass_track_active) - { - int change = 0; - ASS_Image *img = ass_render_frame(ass_render, ass_track_active, - 1000 * video_time, &change); - - /* Do it on CPU for now. - * We're in a thread anyways, so shouldn't really matter. */ - render_ass_img(conv_frame, img); - } + add_frame_to_fifo(frame_size, ass_track_active); +#else + add_frame_to_fifo(frame_size); #endif - - decoded_size = frame_size + sizeof(pts); - slock_lock(fifo_lock); - - while (!decode_thread_dead && (video_decode_fifo != NULL) - && fifo_write_avail(video_decode_fifo) < decoded_size) - { - if (!main_sleeping) - scond_wait(fifo_decode_cond, fifo_lock); - else - { - fifo_clear(video_decode_fifo); - break; - } } - decode_last_video_time = video_time; - if (!decode_thread_dead) - { - int stride; - unsigned y; - const uint8_t *src = NULL; - - fifo_write(video_decode_fifo, &pts, sizeof(pts)); - src = conv_frame->data[0]; - stride = conv_frame->linesize[0]; - - for (y = 0; y < media.height; y++, src += stride) - fifo_write(video_decode_fifo, src, media.width * sizeof(uint32_t)); - } - scond_signal(fifo_cond); - slock_unlock(fifo_lock); - - fail: - av_frame_unref(frame); - av_frame_unref(sw_frame); + end: if (ret < 0) + { + swsbuffer_return_open_slot(swsbuffer, sws_ctx); break; + } } - av_frame_free(&frame); - av_frame_free(&sw_frame); return; } @@ -1404,9 +1457,9 @@ static int16_t *decode_audio(AVCodecContext *ctx, AVPacket *pkt, AVFrame *frame, int16_t *buffer, size_t *buffer_cap, SwrContext *swr) { - int ret; - int64_t pts; - size_t required_buffer; + int ret = 0; + int64_t pts = 0; + size_t required_buffer = 0; if ((ret = avcodec_send_packet(ctx, pkt)) < 0) { @@ -1498,14 +1551,10 @@ static void decode_thread(void *data) { unsigned i; struct SwrContext *swr[audio_streams_num]; - struct SwsContext *sws = NULL; AVFrame *aud_frame = NULL; - void *conv_frame_buf = NULL; size_t frame_size = 0; int16_t *audio_buffer = NULL; size_t audio_buffer_cap = 0; - AVFrame *conv_frame = NULL; - (void)data; @@ -1527,10 +1576,9 @@ static void decode_thread(void *data) if (video_stream_index >= 0) { frame_size = avpicture_get_size(PIX_FMT_RGB32, media.width, media.height); - conv_frame = av_frame_alloc(); - conv_frame_buf = av_malloc(frame_size); - avpicture_fill((AVPicture*)conv_frame, (const uint8_t*)conv_frame_buf, - PIX_FMT_RGB32, media.width, media.height); + swsbuffer = swsbuffer_create(sw_sws_threads, frame_size, media.width, media.height); + tpool = tpool_create(sw_sws_threads); + log_cb(RETRO_LOG_INFO, "[FFMPEG] Configured filtering threads: %d\n", sw_sws_threads); } while (!decode_thread_dead) @@ -1553,6 +1601,12 @@ static void decode_thread(void *data) if (seek) { + if (video_stream_index >= 0) + { + tpool_wait(tpool); + swsbuffer_clear(swsbuffer); + } + decode_thread_seek(seek_time_thread); slock_lock(fifo_lock); @@ -1585,9 +1639,9 @@ static void decode_thread(void *data) if (pkt.stream_index == video_stream_index) #ifdef HAVE_SSA - decode_video(vctx, &pkt, conv_frame, frame_size, &sws, ass_track_active); + decode_video(vctx, &pkt, frame_size, ass_track_active); #else - decode_video(vctx, &pkt, conv_frame, frame_size, &sws); + decode_video(vctx, &pkt, frame_size); #endif else if (pkt.stream_index == audio_stream && actx_active) { @@ -1626,19 +1680,21 @@ static void decode_thread(void *data) av_free_packet(&pkt); } - sws_freeContext(sws); - for (i = 0; (int)i < audio_streams_num; i++) swr_free(&swr[i]); - if (vctx->hw_device_ctx) + if (vctx && vctx->hw_device_ctx) av_buffer_unref(&vctx->hw_device_ctx); av_frame_free(&aud_frame); - av_frame_free(&conv_frame); - av_freep(&conv_frame_buf); av_freep(&audio_buffer); + if (video_stream_index >= 0) + { + tpool_destroy(tpool); + swsbuffer_destroy(swsbuffer); + } + slock_lock(fifo_lock); decode_thread_dead = true; scond_signal(fifo_cond); @@ -1837,7 +1893,7 @@ void CORE_PREFIX(retro_unload_game)(void) bool CORE_PREFIX(retro_load_game)(const struct retro_game_info *info) { - int ret; + int ret = 0; bool is_fft = false; enum retro_pixel_format fmt = RETRO_PIXEL_FORMAT_XRGB8888; @@ -1900,8 +1956,10 @@ bool CORE_PREFIX(retro_load_game)(const struct retro_game_info *info) if (video_stream_index >= 0 || is_fft) { - video_decode_fifo = fifo_new(media.width - * media.height * sizeof(uint32_t) * 32); + /* video fifo is 2 frames deep */ + video_decode_fifo = fifo_new( + media.width * media.height * sizeof(uint32_t) * 2 + ); #if defined(HAVE_OPENGL) || defined(HAVE_OPENGLES) use_gl = true; @@ -1924,8 +1982,10 @@ bool CORE_PREFIX(retro_load_game)(const struct retro_game_info *info) } if (audio_streams_num > 0) { - unsigned buffer_seconds = video_stream_index >= 0 ? 20 : 1; - audio_decode_fifo = fifo_new(buffer_seconds * media.sample_rate * sizeof(int16_t) * 2); + /* audio fifo is 1 second deep */ + audio_decode_fifo = fifo_new( + media.sample_rate * sizeof(int16_t) * 2 + ); } fifo_cond = scond_new(); diff --git a/cores/libretro-ffmpeg/swsbuffer.c b/cores/libretro-ffmpeg/swsbuffer.c new file mode 100644 index 0000000000..d480c91c3b --- /dev/null +++ b/cores/libretro-ffmpeg/swsbuffer.c @@ -0,0 +1,185 @@ +#include + +#include + +#include "swsbuffer.h" + +enum kbStatus +{ + KB_OPEN, + KB_IN_PROGRESS, + KB_FINISHED +}; + +struct swsbuffer +{ + sws_context_t *buffer; + enum kbStatus *status; + size_t size; + slock_t *lock; + int64_t head; + int64_t tail; +}; + +swsbuffer_t *swsbuffer_create(size_t num, int frame_size, int width, int height) +{ + swsbuffer_t *b = malloc(sizeof (swsbuffer_t)); + if (b == NULL) + return NULL; + + b->status = malloc(sizeof(enum kbStatus) * num); + if (b->status == NULL) + return NULL; + for (int i = 0; i < num; i++) + b->status[i] = KB_OPEN; + + b->lock = slock_new(); + if (b->lock == NULL) + return NULL; + + b->buffer = malloc(sizeof(sws_context_t) * num); + if (b->buffer == NULL) + return NULL; + for (int i = 0; i < num; i++) + { + b->buffer[i].index = i; + b->buffer[i].sws = sws_alloc_context(); + b->buffer[i].source = av_frame_alloc(); +#if LIBAVUTIL_VERSION_MAJOR > 55 + b->buffer[i].hw_source = av_frame_alloc(); +#endif + b->buffer[i].target = av_frame_alloc(); + b->buffer[i].frame_buf = av_malloc(frame_size); + + avpicture_fill((AVPicture*)b->buffer[i].target, (const uint8_t*)b->buffer[i].frame_buf, + PIX_FMT_RGB32, width, height); + } + + b->size = num; + b->head = 0; + b->tail = 0; + return b; +} + +void swsbuffer_destroy(swsbuffer_t *swsbuffer) +{ + if (swsbuffer != NULL) { + slock_free(swsbuffer->lock); + free(swsbuffer->status); + for (int i = 0; i < swsbuffer->size; i++) + { +#if LIBAVUTIL_VERSION_MAJOR > 55 + av_frame_free(&swsbuffer->buffer[i].hw_source); +#endif + av_frame_free(&swsbuffer->buffer[i].source); + av_frame_free(&swsbuffer->buffer[i].target); + av_freep(&swsbuffer->buffer[i].frame_buf); + sws_freeContext(swsbuffer->buffer[i].sws); + } + free(swsbuffer->buffer); + free(swsbuffer); + } +} + +void swsbuffer_clear(swsbuffer_t *swsbuffer) +{ + slock_lock(swsbuffer->lock); + + swsbuffer->head = 0; + swsbuffer->tail = 0; + for (int i = 0; i < swsbuffer->size; i++) + swsbuffer->status[i] = KB_OPEN; + + slock_unlock(swsbuffer->lock); +} + +void swsbuffer_get_open_slot(swsbuffer_t *swsbuffer, sws_context_t **context) +{ + slock_lock(swsbuffer->lock); + + if (swsbuffer->status[swsbuffer->head] == KB_OPEN) + { + *context = &swsbuffer->buffer[swsbuffer->head]; + swsbuffer->status[swsbuffer->head] = KB_IN_PROGRESS; + swsbuffer->head++; + swsbuffer->head %= swsbuffer->size; + } + + slock_unlock(swsbuffer->lock); +} + +void swsbuffer_return_open_slot(swsbuffer_t *swsbuffer, sws_context_t *context) +{ + slock_lock(swsbuffer->lock); + + if (swsbuffer->status[context->index] == KB_IN_PROGRESS) + { + swsbuffer->status[context->index] = KB_OPEN; + swsbuffer->head--; + swsbuffer->head %= swsbuffer->size; + } + + slock_unlock(swsbuffer->lock); +} + +void swsbuffer_open_slot(swsbuffer_t *swsbuffer, sws_context_t *context) +{ + slock_lock(swsbuffer->lock); + + if (swsbuffer->status[context->index] == KB_FINISHED) + { + swsbuffer->status[context->index] = KB_OPEN; + swsbuffer->tail++; + swsbuffer->tail %= (swsbuffer->size); + } + + slock_unlock(swsbuffer->lock); +} + +void swsbuffer_get_finished_slot(swsbuffer_t *swsbuffer, sws_context_t **context) +{ + slock_lock(swsbuffer->lock); + + if (swsbuffer->status[swsbuffer->tail] == KB_FINISHED) + *context = &swsbuffer->buffer[swsbuffer->tail]; + + slock_unlock(swsbuffer->lock); +} + +void swsbuffer_finish_slot(swsbuffer_t *swsbuffer, sws_context_t *context) +{ + slock_lock(swsbuffer->lock); + + if (swsbuffer->status[context->index] == KB_IN_PROGRESS) + swsbuffer->status[context->index] = KB_FINISHED; + + slock_unlock(swsbuffer->lock); +} + +bool swsbuffer_has_open_slot(swsbuffer_t *swsbuffer) +{ + bool ret = false; + + slock_lock(swsbuffer->lock); + + if (swsbuffer->status[swsbuffer->head] == KB_OPEN) + ret = true; + + slock_unlock(swsbuffer->lock); + + return ret; +} + +bool swsbuffer_has_finished_slot(swsbuffer_t *swsbuffer) +{ + bool ret = false; + + slock_lock(swsbuffer->lock); + + if (swsbuffer->status[swsbuffer->tail] == KB_FINISHED) + ret = true; + + slock_unlock(swsbuffer->lock); + + return ret; +} diff --git a/cores/libretro-ffmpeg/swsbuffer.h b/cores/libretro-ffmpeg/swsbuffer.h new file mode 100644 index 0000000000..1e44d17521 --- /dev/null +++ b/cores/libretro-ffmpeg/swsbuffer.h @@ -0,0 +1,166 @@ +#ifndef __LIBRETRO_SDK_SWSBUFFER_H__ +#define __LIBRETRO_SDK_SWSBUFFER_H__ + +#include + +#include + +#include +#include + +#include +#include + +RETRO_BEGIN_DECLS + +#ifndef PIX_FMT_RGB32 +#define PIX_FMT_RGB32 AV_PIX_FMT_RGB32 +#endif + +/** + * sws_context + * + * Context object for the sws worker threads. + * + */ +struct sws_context +{ + int index; + struct SwsContext *sws; + AVFrame *source; +#if LIBAVUTIL_VERSION_MAJOR > 55 + AVFrame *hw_source; +#endif + AVFrame *target; + void *frame_buf; +}; +typedef struct sws_context sws_context_t; + +/** + * swsbuffer + * + * The swsbuffer is a ring buffer, that can be used as a + * buffer for many workers while keeping the order. + * + * It is thread safe in a sensem that it is designed to work + * with one work coordinator, that allocates work slots for + * workers threads to work on and later collect the work + * product in the same order, as the slots were allocated. + * + */ +struct swsbuffer; +typedef struct swsbuffer swsbuffer_t; + +/** + * swsbuffer_create: + * @num : Size of the buffer. + * @frame_size : Size of the target frame. + * @width : Width of the target frame. + * @height : Height of the target frame. + * + * Create a swsbuffer. + * + * Returns: swsbuffer. + */ +swsbuffer_t *swsbuffer_create(size_t num, int frame_size, int width, int height); + +/** + * swsbuffer_destroy: + * @swsbuffer : sswsbuffer. + * + * Destory a swsbuffer. + * + * Does also free the buffer allocated with swsbuffer_create(). + * User has to shut down any external worker threads that may have + * a reference to this swsbuffer. + * + **/ +void swsbuffer_destroy(swsbuffer_t *swsbuffer); + +/** + * swsbuffer_clear: + * @swsbuffer : sswsbuffer. + * + * Clears a swsbuffer. + * + **/ +void swsbuffer_clear(swsbuffer_t *swsbuffer); + +/** + * swsbuffer_get_open_slot: + * @swsbuffer : sswsbuffer. + * @contex : sws context. + * + * Returns the next open context inside the ring buffer + * and it's index. The status of the slot will be marked as + * 'in progress' until slot is marked as finished with + * swsbuffer_finish_slot(); + * + **/ +void swsbuffer_get_open_slot(swsbuffer_t *swsbuffer, sws_context_t **context); + +/** + * swsbuffer_return_open_slot: + * @swsbuffer : sswsbuffer. + * @contex : sws context. + * + * Marks the given sws context that is "in progress" as "open" again. + * + **/ +void swsbuffer_return_open_slot(swsbuffer_t *swsbuffer, sws_context_t *context); + +/** + * swsbuffer_open_slot: + * @swsbuffer : sswsbuffer. + * @context : sws context. + * + * Sets the status of the given context from "finished" to "open". + * The slot is then available for producers to claim again with swsbuffer_get_open_slot(). + **/ +void swsbuffer_open_slot(swsbuffer_t *swsbuffer, sws_context_t *context); + +/** + * swsbuffer_get_finished_slot: + * @swsbuffer : sswsbuffer. + * @context : sws context. + * + * Returns a reference for the next context inside + * the ring buffer. User needs to use swsbuffer_open_slot() + * to open the slot in the ringbuffer for the next + * work assignment. User is free to re-allocate or + * re-use the context. + * + */ +void swsbuffer_get_finished_slot(swsbuffer_t *swsbuffer, sws_context_t **context); + +/** + * swsbuffer_finish_slot: + * @swsbuffer : sswsbuffer. + * @context : sws context. + * + * Sets the status of the given context from "in progress" to "finished". + * This is normally done by a producer. User can then retrieve the finished work + * context by calling swsbuffer_get_finished_slot(). + */ +void swsbuffer_finish_slot(swsbuffer_t *swsbuffer, sws_context_t *context); + +/** + * swsbuffer_has_open_slot: + * @swsbuffer : sswsbuffer. + * + * Returns true if the buffer has a open slot available. + */ +bool swsbuffer_has_open_slot(swsbuffer_t *swsbuffer); + +/** + * swsbuffer_has_finished_slot: + * @swsbuffer : sswsbuffer. + * + * Returns true if the buffers next slot is finished and a + * context available. + */ +bool swsbuffer_has_finished_slot(swsbuffer_t *swsbuffer); + +RETRO_END_DECLS + +#endif diff --git a/cores/libretro-ffmpeg/tpool.c b/cores/libretro-ffmpeg/tpool.c new file mode 100644 index 0000000000..a729372b00 --- /dev/null +++ b/cores/libretro-ffmpeg/tpool.c @@ -0,0 +1,264 @@ +/* The MIT License + * + * Copyright (c) 2010-2019 The RetroArch team + * Copyright (c) 2017 John Schember + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE + */ + +#include +#include + +#include "tpool.h" + +/* Work object which will sit in a queue + * waiting for the pool to process it. + * + * It is a singly linked list acting as a FIFO queue. */ +struct tpool_work { + thread_func_t func; /* Function to be called. */ + void *arg; /* Data to be passed to func. */ + struct tpool_work *next; /* Next work item in the queue. */ +}; +typedef struct tpool_work tpool_work_t; + +struct tpool { + tpool_work_t *work_first; /* First work item in the work queue. */ + tpool_work_t *work_last; /* Last work item in the work queue. */ + slock_t *work_mutex; /* Mutex protecting inserting and removing work from the work queue. */ + scond_t *work_cond; /* Conditional to signal when there is work to process. */ + scond_t *working_cond; /* Conditional to signal when there is no work processing. + This will also signal when there are no threads running. */ + size_t working_cnt; /* The number of threads processing work (Not waiting for work). */ + size_t thread_cnt; /* Total number of threads within the pool. */ + bool stop; /* Marker to tell the work threads to exit. */ +}; + +static tpool_work_t *tpool_work_create(thread_func_t func, void *arg) +{ + tpool_work_t *work; + + if (func == NULL) + return NULL; + + work = calloc(1, sizeof(*work)); + work->func = func; + work->arg = arg; + work->next = NULL; + return work; +} + +static void tpool_work_destroy(tpool_work_t *work) +{ + if (work == NULL) + return; + free(work); +} + +/* Pull the first work item out of the queue. */ +static tpool_work_t *tpool_work_get(tpool_t *tp) +{ + tpool_work_t *work; + + if (tp == NULL) + return NULL; + + work = tp->work_first; + if (work == NULL) + return NULL; + + if (work->next == NULL) + { + tp->work_first = NULL; + tp->work_last = NULL; + } else { + tp->work_first = work->next; + } + + return work; +} + +static void tpool_worker(void *arg) +{ + tpool_t *tp = arg; + tpool_work_t *work; + + while (true) + { + slock_lock(tp->work_mutex); + /* Keep running until told to stop. */ + if (tp->stop) + break; + + /* If there is no work in the queue wait in the conditional until + * there is work to take. */ + if (tp->work_first == NULL) + scond_wait(tp->work_cond, tp->work_mutex); + + /* Try to pull work from the queue. */ + work = tpool_work_get(tp); + tp->working_cnt++; + slock_unlock(tp->work_mutex); + + /* Call the work function and let it process. + * + * work can legitimately be NULL. Since multiple threads from the pool + * will wake when there is work, a thread might not get any work. 1 + * piece of work and 2 threads, both will wake but with 1 only work 1 + * will get the work and the other won't. + * + * working_cnt has been increment and work could be NULL. While it's + * not true there is work processing the thread is considered working + * because it's not waiting in the conditional. Pedantic but... + */ + if (work != NULL) + { + work->func(work->arg); + tpool_work_destroy(work); + } + + slock_lock(tp->work_mutex); + tp->working_cnt--; + /* Since we're in a lock no work can be added or removed form the queue. + * Also, the working_cnt can't be changed (except the thread holding the lock). + * At this point if there isn't any work processing and if there is no work + * signal this is the case. */ + if (!tp->stop && tp->working_cnt == 0 && tp->work_first == NULL) + scond_signal(tp->working_cond); + slock_unlock(tp->work_mutex); + } + + tp->thread_cnt--; + if (tp->thread_cnt == 0) + scond_signal(tp->working_cond); + slock_unlock(tp->work_mutex); +} + +tpool_t *tpool_create(size_t num) +{ + tpool_t *tp; + sthread_t *thread; + size_t i; + + if (num == 0) + num = 2; + + tp = calloc(1, sizeof(*tp)); + tp->thread_cnt = num; + + tp->work_mutex = slock_new(); + tp->work_cond = scond_new(); + tp->working_cond = scond_new(); + + tp->work_first = NULL; + tp->work_last = NULL; + + /* Create the requested number of thread and detach them. */ + for (i = 0; i < num; i++) + { + thread = sthread_create(tpool_worker, tp); + sthread_detach(thread); + } + + return tp; +} + +void tpool_destroy(tpool_t *tp) +{ + tpool_work_t *work; + tpool_work_t *work2; + + if (tp == NULL) + return; + + /* Take all work out of the queue and destroy it. */ + slock_lock(tp->work_mutex); + work = tp->work_first; + while (work != NULL) + { + work2 = work->next; + tpool_work_destroy(work); + work = work2; + } + /* Tell the worker threads to stop. */ + tp->stop = true; + scond_broadcast(tp->work_cond); + slock_unlock(tp->work_mutex); + + /* Wait for all threads to stop. */ + tpool_wait(tp); + + slock_free(tp->work_mutex); + scond_free(tp->work_cond); + scond_free(tp->working_cond); + + free(tp); +} + +bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg) +{ + tpool_work_t *work; + + if (tp == NULL) + return false; + + work = tpool_work_create(func, arg); + if (work == NULL) + return false; + + slock_lock(tp->work_mutex); + if (tp->work_first == NULL) + { + tp->work_first = work; + tp->work_last = tp->work_first; + } + else + { + tp->work_last->next = work; + tp->work_last = work; + } + + scond_broadcast(tp->work_cond); + slock_unlock(tp->work_mutex); + + return true; +} + +void tpool_wait(tpool_t *tp) +{ + if (tp == NULL) + return; + + slock_lock(tp->work_mutex); + while (true) + { + /* working_cond is dual use. It signals when we're not stopping but the + * working_cnt is 0 indicating there isn't any work processing. If we + * are stopping it will trigger when there aren't any threads running. */ + if ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0)) + { + scond_wait(tp->working_cond, tp->work_mutex); + } + else + { + break; + } + } + slock_unlock(tp->work_mutex); +} diff --git a/cores/libretro-ffmpeg/tpool.h b/cores/libretro-ffmpeg/tpool.h new file mode 100644 index 0000000000..3fd4505b7d --- /dev/null +++ b/cores/libretro-ffmpeg/tpool.h @@ -0,0 +1,93 @@ +/* The MIT License + * + * Copyright (c) 2010-2019 The RetroArch team + * Copyright (c) 2017 John Schember + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE + */ + +#ifndef __LIBRETRO_SDK_TPOOL_H__ +#define __LIBRETRO_SDK_TPOOL_H__ + +#include + +#include + +#include +#include + +RETRO_BEGIN_DECLS + +struct tpool; +typedef struct tpool tpool_t; + +/** + * (*thread_func_t): + * @arg : Argument. + * + * Callback function that the pool will call to do work. + **/ +typedef void (*thread_func_t)(void *arg); + +/** + * tpool_create: + * @num : Number of threads the pool should have. + * If 0 defaults to 2. + * + * Create a thread pool. + * + * Returns: pool. + */ +tpool_t *tpool_create(size_t num); + +/** + * tpool_destroy: + * @tp : Thread pool. + * + * Destory a thread pool + * The pool can be destroyed while there is outstanding work to process. All + * outstanding unprocessed work will be discareded. There may be a delay before + * this function returns because it will block for work that is processing to + * complete. + **/ +void tpool_destroy(tpool_t *tp); + +/** + * tpool_add_work: + * @tp : Thread pool. + * @func : Function the pool should call. + * @arg : Argument to pass to func. + * + * Add work to a thread pool. + * + * Returns: true if work was added, otherwise false. + **/ +bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg); + +/** + * tpool_wait: + * @tp Thread pool. + * + * Wait for all work in the pool to be completed. + */ +void tpool_wait(tpool_t *tp); + +RETRO_END_DECLS + +#endif diff --git a/libretro-common/rthreads/rthreads.c b/libretro-common/rthreads/rthreads.c index 5b3e5d6557..6fa0b310a8 100644 --- a/libretro-common/rthreads/rthreads.c +++ b/libretro-common/rthreads/rthreads.c @@ -268,7 +268,9 @@ int sthread_detach(sthread_t *thread) free(thread); return 0; #else - return pthread_detach(thread->id); + int ret = pthread_detach(thread->id); + free(thread); + return ret; #endif }