Merge pull request #9869 from hasenbanck/frame-conversion

FFMPEG core frame based color conversion
This commit is contained in:
Twinaphex 2019-12-18 15:55:24 +01:00 committed by GitHub
commit 4db4d5e8bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 915 additions and 141 deletions

View File

@ -1965,7 +1965,9 @@ endif
ifeq ($(HAVE_FFMPEG), 1)
OBJ += record/drivers/record_ffmpeg.o \
cores/libretro-ffmpeg/ffmpeg_core.o
cores/libretro-ffmpeg/ffmpeg_core.o \
cores/libretro-ffmpeg/swsbuffer.o \
cores/libretro-ffmpeg/tpool.o
LIBS += $(AVCODEC_LIBS) $(AVFORMAT_LIBS) $(AVUTIL_LIBS) $(SWSCALE_LIBS) $(SWRESAMPLE_LIBS) $(FFMPEG_LIBS)
DEFINES += -DHAVE_FFMPEG

View File

@ -19,6 +19,8 @@ SWRESAMPLE_DIR := $(BASE_DIR)/libswresample
INCFLAGS += -I$(BASE_DIR) -I$(CORE_DIR) -I$(LIBRETRO_COMM_DIR)/include -I$(LIBRETRO_COMM_DIR)/include/compat
LIBRETRO_SOURCE += $(CORE_DIR)/ffmpeg_core.c \
$(CORE_DIR)/swsbuffer.c \
$(CORE_DIR)/tpool.c \
$(LIBRETRO_COMM_DIR)/queues/fifo_queue.c \
$(LIBRETRO_COMM_DIR)/rthreads/rthreads.c

View File

@ -44,9 +44,13 @@ extern "C" {
#include <glsym/glsym.h>
#endif
#include <features/features_cpu.h>
#include <retro_miscellaneous.h>
#include <rthreads/rthreads.h>
#include <queues/fifo_queue.h>
#include <string/stdstring.h>
#include "swsbuffer.h"
#include "tpool.h"
#include <libretro.h>
#ifdef RARCH_INTERNAL
@ -56,10 +60,6 @@ extern "C" {
#define CORE_PREFIX(s) s
#endif
#ifndef PIX_FMT_RGB32
#define PIX_FMT_RGB32 AV_PIX_FMT_RGB32
#endif
#define PRINT_VERSION(s) log_cb(RETRO_LOG_INFO, "[FFMPEG] lib%s version:\t%d.%d.%d\n", #s, \
s ##_version() >> 16 & 0xFF, \
s ##_version() >> 8 & 0xFF, \
@ -87,7 +87,11 @@ static AVFormatContext *fctx;
static AVCodecContext *vctx;
static int video_stream_index;
static enum AVColorSpace colorspace;
static int sw_decoder_threads;
static unsigned sw_decoder_threads;
static unsigned sw_sws_threads;
static swsbuffer_t *swsbuffer;
static tpool_t *tpool;
#if LIBAVUTIL_VERSION_MAJOR > 55
static enum AVHWDeviceType hw_decoder;
@ -96,7 +100,6 @@ static enum AVPixelFormat pix_fmt;
static bool force_sw_decoder;
#endif
#define MAX_STREAMS 8
static AVCodecContext *actx[MAX_STREAMS];
static AVCodecContext *sctx[MAX_STREAMS];
@ -277,10 +280,10 @@ void CORE_PREFIX(retro_set_environment)(retro_environment_t cb)
{
static const struct retro_variable vars[] = {
#if LIBAVUTIL_VERSION_MAJOR > 55
{ "ffmpeg_hw_decoder", "Use Hardware decoder (restart); auto|off|"
{ "ffmpeg_hw_decoder", "Use Hardware decoder (restart); off|auto|"
"cuda|d3d11va|drm|dxva2|mediacodec|opencl|qsv|vaapi|vdpau|videotoolbox" },
#endif
{ "ffmpeg_sw_decoder_threads", "Software decoder thread count (restart); 1|2|4|8|16" },
{ "ffmpeg_sw_decoder_threads", "Software decoder thread count (restart); auto|1|2|4|8|16" },
#if defined(HAVE_OPENGL) || defined(HAVE_OPENGLES)
{ "ffmpeg_temporal_interp", "Temporal Interpolation; disabled|enabled" },
#ifdef HAVE_GL_FFT
@ -443,12 +446,22 @@ static void check_variables(bool firststart)
}
#endif
sw_threads_var.key = "ffmpeg_sw_decoder_threads";
if (CORE_PREFIX(environ_cb)(RETRO_ENVIRONMENT_GET_VARIABLE, &sw_threads_var) && sw_threads_var.value)
if (firststart)
{
slock_lock(decode_thread_lock);
sw_decoder_threads = strtoul(sw_threads_var.value, NULL, 0);
slock_unlock(decode_thread_lock);
sw_threads_var.key = "ffmpeg_sw_decoder_threads";
if (CORE_PREFIX(environ_cb)(RETRO_ENVIRONMENT_GET_VARIABLE, &sw_threads_var) && sw_threads_var.value)
{
if (string_is_equal(sw_threads_var.value, "auto"))
{
sw_decoder_threads = cpu_features_get_core_amount();
}
else
{
sw_decoder_threads = strtoul(sw_threads_var.value, NULL, 0);
}
/* Scale the sws threads based on core count but use at min 2 and max 4 threads */
sw_sws_threads = MIN(MAX(2, sw_decoder_threads / 2), 4);
}
}
}
@ -463,7 +476,6 @@ static void seek_frame(int seek_frames)
frame_cnt += seek_frames;
slock_lock(fifo_lock);
do_seek = true;
seek_time = frame_cnt / media.interpolate_fps;
@ -487,8 +499,14 @@ static void seek_frame(int seek_frames)
scond_signal(fifo_decode_cond);
while (!decode_thread_dead && do_seek)
{
main_sleeping = true;
scond_wait(fifo_cond, fifo_lock);
main_sleeping = false;
}
slock_unlock(fifo_lock);
}
void CORE_PREFIX(retro_run)(void)
@ -826,7 +844,7 @@ static enum AVPixelFormat init_hw_decoder(struct AVCodecContext *ctx,
const enum AVHWDeviceType type,
const enum AVPixelFormat *pix_fmts)
{
int ret;
int ret = 0;
enum AVPixelFormat decoder_pix_fmt = AV_PIX_FMT_NONE;
struct AVCodec *codec = avcodec_find_decoder(fctx->streams[video_stream_index]->codec->codec_id);
@ -891,7 +909,7 @@ exit:
static enum AVPixelFormat auto_hw_decoder(AVCodecContext *ctx,
const enum AVPixelFormat *pix_fmts)
{
int ret;
int ret = 0;
enum AVPixelFormat decoder_pix_fmt = AV_PIX_FMT_NONE;
enum AVHWDeviceType type = AV_HWDEVICE_TYPE_NONE;
@ -932,6 +950,7 @@ static enum AVPixelFormat select_decoder(AVCodecContext *ctx,
ctx->thread_type = FF_THREAD_FRAME;
ctx->thread_count = sw_decoder_threads;
log_cb(RETRO_LOG_INFO, "[FFMPEG] Configured software decoding threads: %d\n", sw_decoder_threads);
format = fctx->streams[video_stream_index]->codec->pix_fmt;
@ -965,7 +984,7 @@ static enum AVPixelFormat get_format(AVCodecContext *ctx,
static bool open_codec(AVCodecContext **ctx, enum AVMediaType type, unsigned index)
{
int ret;
int ret = 0;
AVCodec *codec = avcodec_find_decoder(fctx->streams[index]->codec->codec_id);
if (!codec)
@ -1001,13 +1020,8 @@ static bool codec_is_image(enum AVCodecID id)
{
switch (id)
{
#ifdef OLD_FFMPEG_API
case CODEC_ID_MJPEG:
case CODEC_ID_PNG:
#else
case AV_CODEC_ID_MJPEG:
case AV_CODEC_ID_PNG:
#endif
return true;
default:
@ -1021,11 +1035,7 @@ static bool codec_id_is_ttf(enum AVCodecID id)
{
switch (id)
{
#ifdef OLD_FFMPEG_API
case CODEC_ID_TTF:
#else
case AV_CODEC_ID_TTF:
#endif
return true;
default:
@ -1040,12 +1050,8 @@ static bool codec_id_is_ass(enum AVCodecID id)
{
switch (id)
{
#ifdef OLD_FFMPEG_API
case CODEC_ID_SSA:
#else
case AV_CODEC_ID_ASS:
case AV_CODEC_ID_SSA:
#endif
return true;
default:
break;
@ -1088,7 +1094,7 @@ static bool open_codecs(void)
break;
case AVMEDIA_TYPE_VIDEO:
if ( !vctx
if (!vctx
&& !codec_is_image(fctx->streams[i]->codec->codec_id))
{
if (!open_codec(&vctx, type, i))
@ -1098,7 +1104,7 @@ static bool open_codecs(void)
case AVMEDIA_TYPE_SUBTITLE:
#ifdef HAVE_SSA
if ( subtitle_streams_num < MAX_STREAMS
if (subtitle_streams_num < MAX_STREAMS
&& codec_id_is_ass(fctx->streams[i]->codec->codec_id))
{
int size;
@ -1271,23 +1277,114 @@ static void render_ass_img(AVFrame *conv_frame, ASS_Image *img)
}
#endif
static void sws_worker_thread(void *arg)
{
int ret = 0;
AVFrame *tmp_frame = NULL;
sws_context_t *ctx = (sws_context_t*) arg;
#if LIBAVUTIL_VERSION_MAJOR > 55
if (hw_decoding_enabled)
tmp_frame = ctx->hw_source;
else
#endif
tmp_frame = ctx->source;
ctx->sws = sws_getCachedContext(ctx->sws,
media.width, media.height, tmp_frame->format,
media.width, media.height, PIX_FMT_RGB32,
SWS_POINT, NULL, NULL, NULL);
set_colorspace(ctx->sws, media.width, media.height,
av_frame_get_colorspace(tmp_frame),
av_frame_get_color_range(tmp_frame));
if ((ret = sws_scale(ctx->sws, (const uint8_t *const*)tmp_frame->data,
tmp_frame->linesize, 0, media.height,
(uint8_t * const*)ctx->target->data, ctx->target->linesize)) < 0)
{
log_cb(RETRO_LOG_ERROR, "[FFMPEG] Error while scaling image: %s\n", av_err2str(ret));
}
swsbuffer_finish_slot(swsbuffer, ctx);
}
#ifdef HAVE_SSA
static void decode_video(AVCodecContext *ctx, AVPacket *pkt, AVFrame *conv_frame, size_t frame_size, struct SwsContext **sws, ASS_Track *ass_track_active)
static void add_frame_to_fifo(size_t frame_size, ASS_Track *ass_track_active)
#else
static void decode_video(AVCodecContext *ctx, AVPacket *pkt, AVFrame *conv_frame, size_t frame_size, struct SwsContext **sws)
static void add_frame_to_fifo(size_t frame_size)
#endif
{
int ret;
AVFrame *frame = NULL;
AVFrame *sw_frame = NULL;
AVFrame *tmp_frame = NULL;
sws_context_t *ctx = NULL;
if (!(frame = av_frame_alloc()) || !(sw_frame = av_frame_alloc()))
swsbuffer_get_finished_slot(swsbuffer, &ctx);
size_t decoded_size;
int64_t pts = ctx->source->best_effort_timestamp;
double video_time = pts * av_q2d(fctx->streams[video_stream_index]->time_base);
#ifdef HAVE_SSA
if (ass_render && ass_track_active)
{
log_cb(RETRO_LOG_ERROR, "[FFMPEG] Can not alloc frames\n");
return;
int change = 0;
ASS_Image *img = ass_render_frame(ass_render, ass_track_active,
1000 * video_time, &change);
/*
* Do it on CPU for now.
* We're in a thread anyways, so shouldn't really matter.
*/
render_ass_img(ctx->target, img);
}
#endif
decoded_size = frame_size + sizeof(pts);
slock_lock(fifo_lock);
while (!decode_thread_dead && (video_decode_fifo != NULL)
&& fifo_write_avail(video_decode_fifo) < decoded_size)
{
if (!main_sleeping)
scond_wait(fifo_decode_cond, fifo_lock);
else
{
fifo_clear(video_decode_fifo);
break;
}
}
decode_last_video_time = video_time;
if (!decode_thread_dead)
{
int stride;
unsigned y;
const uint8_t *src = NULL;
fifo_write(video_decode_fifo, &pts, sizeof(pts));
src = ctx->target->data[0];
stride = ctx->target->linesize[0];
for (y = 0; y < media.height; y++, src += stride)
fifo_write(video_decode_fifo, src, media.width * sizeof(uint32_t));
}
scond_signal(fifo_cond);
slock_unlock(fifo_lock);
av_frame_unref(ctx->source);
#if LIBAVUTIL_VERSION_MAJOR > 55
av_frame_unref(ctx->hw_source);
#endif
swsbuffer_open_slot(swsbuffer, ctx);
}
#ifdef HAVE_SSA
static void decode_video(AVCodecContext *ctx, AVPacket *pkt, size_t frame_size, ASS_Track *ass_track_active)
#else
static void decode_video(AVCodecContext *ctx, AVPacket *pkt, size_t frame_size)
#endif
{
int ret = 0;
sws_context_t *sws_ctx = NULL;
if ((ret = avcodec_send_packet(ctx, pkt)) < 0)
{
@ -1295,108 +1392,64 @@ static void decode_video(AVCodecContext *ctx, AVPacket *pkt, AVFrame *conv_frame
return;
}
while(true)
/* Stop decoding thread until swsbuffer is not full again */
while (!swsbuffer_has_open_slot(swsbuffer))
{
ret = avcodec_receive_frame(ctx, frame);
while(swsbuffer_has_finished_slot(swsbuffer))
{
#ifdef HAVE_SSA
add_frame_to_fifo(frame_size, ass_track_active);
#else
add_frame_to_fifo(frame_size);
#endif
}
}
while (swsbuffer_has_open_slot(swsbuffer))
{
swsbuffer_get_open_slot(swsbuffer, &sws_ctx);
ret = avcodec_receive_frame(ctx, sws_ctx->source);
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
{
break;
ret = -42;
goto end;
}
else if (ret < 0)
{
log_cb(RETRO_LOG_ERROR, "[FFMPEG] Error while reading video frame: %s\n", av_err2str(ret));
goto fail;
goto end;
}
#if LIBAVUTIL_VERSION_MAJOR > 55
if (hw_decoding_enabled)
{
/* Copy data from VRAM to RAM */
if ((ret = av_hwframe_transfer_data(sw_frame, frame, 0)) < 0)
if ((ret = av_hwframe_transfer_data(sws_ctx->hw_source, sws_ctx->source, 0)) < 0)
{
log_cb(RETRO_LOG_ERROR, "[FFMPEG] Error transferring the data to system memory: %s\n", av_err2str(ret));
goto fail;
goto end;
}
tmp_frame = sw_frame;
}
else
#endif
tmp_frame = frame;
*sws = sws_getCachedContext(*sws,
media.width, media.height, tmp_frame->format,
media.width, media.height, PIX_FMT_RGB32,
SWS_BICUBIC, NULL, NULL, NULL);
tpool_add_work(tpool, sws_worker_thread, sws_ctx);
set_colorspace(*sws, media.width, media.height,
av_frame_get_colorspace(tmp_frame), av_frame_get_color_range(tmp_frame));
if ((ret = sws_scale(*sws, (const uint8_t * const*)tmp_frame->data,
tmp_frame->linesize, 0, media.height,
conv_frame->data, conv_frame->linesize)) < 0)
while(swsbuffer_has_finished_slot(swsbuffer))
{
log_cb(RETRO_LOG_ERROR, "[FFMPEG] Error while scaling image: %s\n", av_err2str(ret));
goto fail;
}
size_t decoded_size;
int64_t pts = frame->best_effort_timestamp;
double video_time = pts * av_q2d(fctx->streams[video_stream_index]->time_base);
#ifdef HAVE_SSA
if (ass_render && ass_track_active)
{
int change = 0;
ASS_Image *img = ass_render_frame(ass_render, ass_track_active,
1000 * video_time, &change);
/* Do it on CPU for now.
* We're in a thread anyways, so shouldn't really matter. */
render_ass_img(conv_frame, img);
}
add_frame_to_fifo(frame_size, ass_track_active);
#else
add_frame_to_fifo(frame_size);
#endif
decoded_size = frame_size + sizeof(pts);
slock_lock(fifo_lock);
while (!decode_thread_dead && (video_decode_fifo != NULL)
&& fifo_write_avail(video_decode_fifo) < decoded_size)
{
if (!main_sleeping)
scond_wait(fifo_decode_cond, fifo_lock);
else
{
fifo_clear(video_decode_fifo);
break;
}
}
decode_last_video_time = video_time;
if (!decode_thread_dead)
{
int stride;
unsigned y;
const uint8_t *src = NULL;
fifo_write(video_decode_fifo, &pts, sizeof(pts));
src = conv_frame->data[0];
stride = conv_frame->linesize[0];
for (y = 0; y < media.height; y++, src += stride)
fifo_write(video_decode_fifo, src, media.width * sizeof(uint32_t));
}
scond_signal(fifo_cond);
slock_unlock(fifo_lock);
fail:
av_frame_unref(frame);
av_frame_unref(sw_frame);
end:
if (ret < 0)
{
swsbuffer_return_open_slot(swsbuffer, sws_ctx);
break;
}
}
av_frame_free(&frame);
av_frame_free(&sw_frame);
return;
}
@ -1404,9 +1457,9 @@ static int16_t *decode_audio(AVCodecContext *ctx, AVPacket *pkt,
AVFrame *frame, int16_t *buffer, size_t *buffer_cap,
SwrContext *swr)
{
int ret;
int64_t pts;
size_t required_buffer;
int ret = 0;
int64_t pts = 0;
size_t required_buffer = 0;
if ((ret = avcodec_send_packet(ctx, pkt)) < 0)
{
@ -1498,14 +1551,10 @@ static void decode_thread(void *data)
{
unsigned i;
struct SwrContext *swr[audio_streams_num];
struct SwsContext *sws = NULL;
AVFrame *aud_frame = NULL;
void *conv_frame_buf = NULL;
size_t frame_size = 0;
int16_t *audio_buffer = NULL;
size_t audio_buffer_cap = 0;
AVFrame *conv_frame = NULL;
(void)data;
@ -1527,10 +1576,9 @@ static void decode_thread(void *data)
if (video_stream_index >= 0)
{
frame_size = avpicture_get_size(PIX_FMT_RGB32, media.width, media.height);
conv_frame = av_frame_alloc();
conv_frame_buf = av_malloc(frame_size);
avpicture_fill((AVPicture*)conv_frame, (const uint8_t*)conv_frame_buf,
PIX_FMT_RGB32, media.width, media.height);
swsbuffer = swsbuffer_create(sw_sws_threads, frame_size, media.width, media.height);
tpool = tpool_create(sw_sws_threads);
log_cb(RETRO_LOG_INFO, "[FFMPEG] Configured filtering threads: %d\n", sw_sws_threads);
}
while (!decode_thread_dead)
@ -1553,6 +1601,12 @@ static void decode_thread(void *data)
if (seek)
{
if (video_stream_index >= 0)
{
tpool_wait(tpool);
swsbuffer_clear(swsbuffer);
}
decode_thread_seek(seek_time_thread);
slock_lock(fifo_lock);
@ -1585,9 +1639,9 @@ static void decode_thread(void *data)
if (pkt.stream_index == video_stream_index)
#ifdef HAVE_SSA
decode_video(vctx, &pkt, conv_frame, frame_size, &sws, ass_track_active);
decode_video(vctx, &pkt, frame_size, ass_track_active);
#else
decode_video(vctx, &pkt, conv_frame, frame_size, &sws);
decode_video(vctx, &pkt, frame_size);
#endif
else if (pkt.stream_index == audio_stream && actx_active)
{
@ -1626,19 +1680,21 @@ static void decode_thread(void *data)
av_free_packet(&pkt);
}
sws_freeContext(sws);
for (i = 0; (int)i < audio_streams_num; i++)
swr_free(&swr[i]);
if (vctx->hw_device_ctx)
if (vctx && vctx->hw_device_ctx)
av_buffer_unref(&vctx->hw_device_ctx);
av_frame_free(&aud_frame);
av_frame_free(&conv_frame);
av_freep(&conv_frame_buf);
av_freep(&audio_buffer);
if (video_stream_index >= 0)
{
tpool_destroy(tpool);
swsbuffer_destroy(swsbuffer);
}
slock_lock(fifo_lock);
decode_thread_dead = true;
scond_signal(fifo_cond);
@ -1837,7 +1893,7 @@ void CORE_PREFIX(retro_unload_game)(void)
bool CORE_PREFIX(retro_load_game)(const struct retro_game_info *info)
{
int ret;
int ret = 0;
bool is_fft = false;
enum retro_pixel_format fmt = RETRO_PIXEL_FORMAT_XRGB8888;
@ -1900,8 +1956,10 @@ bool CORE_PREFIX(retro_load_game)(const struct retro_game_info *info)
if (video_stream_index >= 0 || is_fft)
{
video_decode_fifo = fifo_new(media.width
* media.height * sizeof(uint32_t) * 32);
/* video fifo is 2 frames deep */
video_decode_fifo = fifo_new(
media.width * media.height * sizeof(uint32_t) * 2
);
#if defined(HAVE_OPENGL) || defined(HAVE_OPENGLES)
use_gl = true;
@ -1924,8 +1982,10 @@ bool CORE_PREFIX(retro_load_game)(const struct retro_game_info *info)
}
if (audio_streams_num > 0)
{
unsigned buffer_seconds = video_stream_index >= 0 ? 20 : 1;
audio_decode_fifo = fifo_new(buffer_seconds * media.sample_rate * sizeof(int16_t) * 2);
/* audio fifo is 1 second deep */
audio_decode_fifo = fifo_new(
media.sample_rate * sizeof(int16_t) * 2
);
}
fifo_cond = scond_new();

View File

@ -0,0 +1,185 @@
#include <libavformat/avformat.h>
#include <rthreads/rthreads.h>
#include "swsbuffer.h"
enum kbStatus
{
KB_OPEN,
KB_IN_PROGRESS,
KB_FINISHED
};
struct swsbuffer
{
sws_context_t *buffer;
enum kbStatus *status;
size_t size;
slock_t *lock;
int64_t head;
int64_t tail;
};
swsbuffer_t *swsbuffer_create(size_t num, int frame_size, int width, int height)
{
swsbuffer_t *b = malloc(sizeof (swsbuffer_t));
if (b == NULL)
return NULL;
b->status = malloc(sizeof(enum kbStatus) * num);
if (b->status == NULL)
return NULL;
for (int i = 0; i < num; i++)
b->status[i] = KB_OPEN;
b->lock = slock_new();
if (b->lock == NULL)
return NULL;
b->buffer = malloc(sizeof(sws_context_t) * num);
if (b->buffer == NULL)
return NULL;
for (int i = 0; i < num; i++)
{
b->buffer[i].index = i;
b->buffer[i].sws = sws_alloc_context();
b->buffer[i].source = av_frame_alloc();
#if LIBAVUTIL_VERSION_MAJOR > 55
b->buffer[i].hw_source = av_frame_alloc();
#endif
b->buffer[i].target = av_frame_alloc();
b->buffer[i].frame_buf = av_malloc(frame_size);
avpicture_fill((AVPicture*)b->buffer[i].target, (const uint8_t*)b->buffer[i].frame_buf,
PIX_FMT_RGB32, width, height);
}
b->size = num;
b->head = 0;
b->tail = 0;
return b;
}
void swsbuffer_destroy(swsbuffer_t *swsbuffer)
{
if (swsbuffer != NULL) {
slock_free(swsbuffer->lock);
free(swsbuffer->status);
for (int i = 0; i < swsbuffer->size; i++)
{
#if LIBAVUTIL_VERSION_MAJOR > 55
av_frame_free(&swsbuffer->buffer[i].hw_source);
#endif
av_frame_free(&swsbuffer->buffer[i].source);
av_frame_free(&swsbuffer->buffer[i].target);
av_freep(&swsbuffer->buffer[i].frame_buf);
sws_freeContext(swsbuffer->buffer[i].sws);
}
free(swsbuffer->buffer);
free(swsbuffer);
}
}
void swsbuffer_clear(swsbuffer_t *swsbuffer)
{
slock_lock(swsbuffer->lock);
swsbuffer->head = 0;
swsbuffer->tail = 0;
for (int i = 0; i < swsbuffer->size; i++)
swsbuffer->status[i] = KB_OPEN;
slock_unlock(swsbuffer->lock);
}
void swsbuffer_get_open_slot(swsbuffer_t *swsbuffer, sws_context_t **context)
{
slock_lock(swsbuffer->lock);
if (swsbuffer->status[swsbuffer->head] == KB_OPEN)
{
*context = &swsbuffer->buffer[swsbuffer->head];
swsbuffer->status[swsbuffer->head] = KB_IN_PROGRESS;
swsbuffer->head++;
swsbuffer->head %= swsbuffer->size;
}
slock_unlock(swsbuffer->lock);
}
void swsbuffer_return_open_slot(swsbuffer_t *swsbuffer, sws_context_t *context)
{
slock_lock(swsbuffer->lock);
if (swsbuffer->status[context->index] == KB_IN_PROGRESS)
{
swsbuffer->status[context->index] = KB_OPEN;
swsbuffer->head--;
swsbuffer->head %= swsbuffer->size;
}
slock_unlock(swsbuffer->lock);
}
void swsbuffer_open_slot(swsbuffer_t *swsbuffer, sws_context_t *context)
{
slock_lock(swsbuffer->lock);
if (swsbuffer->status[context->index] == KB_FINISHED)
{
swsbuffer->status[context->index] = KB_OPEN;
swsbuffer->tail++;
swsbuffer->tail %= (swsbuffer->size);
}
slock_unlock(swsbuffer->lock);
}
void swsbuffer_get_finished_slot(swsbuffer_t *swsbuffer, sws_context_t **context)
{
slock_lock(swsbuffer->lock);
if (swsbuffer->status[swsbuffer->tail] == KB_FINISHED)
*context = &swsbuffer->buffer[swsbuffer->tail];
slock_unlock(swsbuffer->lock);
}
void swsbuffer_finish_slot(swsbuffer_t *swsbuffer, sws_context_t *context)
{
slock_lock(swsbuffer->lock);
if (swsbuffer->status[context->index] == KB_IN_PROGRESS)
swsbuffer->status[context->index] = KB_FINISHED;
slock_unlock(swsbuffer->lock);
}
bool swsbuffer_has_open_slot(swsbuffer_t *swsbuffer)
{
bool ret = false;
slock_lock(swsbuffer->lock);
if (swsbuffer->status[swsbuffer->head] == KB_OPEN)
ret = true;
slock_unlock(swsbuffer->lock);
return ret;
}
bool swsbuffer_has_finished_slot(swsbuffer_t *swsbuffer)
{
bool ret = false;
slock_lock(swsbuffer->lock);
if (swsbuffer->status[swsbuffer->tail] == KB_FINISHED)
ret = true;
slock_unlock(swsbuffer->lock);
return ret;
}

View File

@ -0,0 +1,166 @@
#ifndef __LIBRETRO_SDK_SWSBUFFER_H__
#define __LIBRETRO_SDK_SWSBUFFER_H__
#include <retro_common_api.h>
#include <boolean.h>
#include <libavutil/frame.h>
#include <libswscale/swscale.h>
#include <retro_inline.h>
#include <retro_miscellaneous.h>
RETRO_BEGIN_DECLS
#ifndef PIX_FMT_RGB32
#define PIX_FMT_RGB32 AV_PIX_FMT_RGB32
#endif
/**
* sws_context
*
* Context object for the sws worker threads.
*
*/
struct sws_context
{
int index;
struct SwsContext *sws;
AVFrame *source;
#if LIBAVUTIL_VERSION_MAJOR > 55
AVFrame *hw_source;
#endif
AVFrame *target;
void *frame_buf;
};
typedef struct sws_context sws_context_t;
/**
* swsbuffer
*
* The swsbuffer is a ring buffer, that can be used as a
* buffer for many workers while keeping the order.
*
* It is thread safe in a sensem that it is designed to work
* with one work coordinator, that allocates work slots for
* workers threads to work on and later collect the work
* product in the same order, as the slots were allocated.
*
*/
struct swsbuffer;
typedef struct swsbuffer swsbuffer_t;
/**
* swsbuffer_create:
* @num : Size of the buffer.
* @frame_size : Size of the target frame.
* @width : Width of the target frame.
* @height : Height of the target frame.
*
* Create a swsbuffer.
*
* Returns: swsbuffer.
*/
swsbuffer_t *swsbuffer_create(size_t num, int frame_size, int width, int height);
/**
* swsbuffer_destroy:
* @swsbuffer : sswsbuffer.
*
* Destory a swsbuffer.
*
* Does also free the buffer allocated with swsbuffer_create().
* User has to shut down any external worker threads that may have
* a reference to this swsbuffer.
*
**/
void swsbuffer_destroy(swsbuffer_t *swsbuffer);
/**
* swsbuffer_clear:
* @swsbuffer : sswsbuffer.
*
* Clears a swsbuffer.
*
**/
void swsbuffer_clear(swsbuffer_t *swsbuffer);
/**
* swsbuffer_get_open_slot:
* @swsbuffer : sswsbuffer.
* @contex : sws context.
*
* Returns the next open context inside the ring buffer
* and it's index. The status of the slot will be marked as
* 'in progress' until slot is marked as finished with
* swsbuffer_finish_slot();
*
**/
void swsbuffer_get_open_slot(swsbuffer_t *swsbuffer, sws_context_t **context);
/**
* swsbuffer_return_open_slot:
* @swsbuffer : sswsbuffer.
* @contex : sws context.
*
* Marks the given sws context that is "in progress" as "open" again.
*
**/
void swsbuffer_return_open_slot(swsbuffer_t *swsbuffer, sws_context_t *context);
/**
* swsbuffer_open_slot:
* @swsbuffer : sswsbuffer.
* @context : sws context.
*
* Sets the status of the given context from "finished" to "open".
* The slot is then available for producers to claim again with swsbuffer_get_open_slot().
**/
void swsbuffer_open_slot(swsbuffer_t *swsbuffer, sws_context_t *context);
/**
* swsbuffer_get_finished_slot:
* @swsbuffer : sswsbuffer.
* @context : sws context.
*
* Returns a reference for the next context inside
* the ring buffer. User needs to use swsbuffer_open_slot()
* to open the slot in the ringbuffer for the next
* work assignment. User is free to re-allocate or
* re-use the context.
*
*/
void swsbuffer_get_finished_slot(swsbuffer_t *swsbuffer, sws_context_t **context);
/**
* swsbuffer_finish_slot:
* @swsbuffer : sswsbuffer.
* @context : sws context.
*
* Sets the status of the given context from "in progress" to "finished".
* This is normally done by a producer. User can then retrieve the finished work
* context by calling swsbuffer_get_finished_slot().
*/
void swsbuffer_finish_slot(swsbuffer_t *swsbuffer, sws_context_t *context);
/**
* swsbuffer_has_open_slot:
* @swsbuffer : sswsbuffer.
*
* Returns true if the buffer has a open slot available.
*/
bool swsbuffer_has_open_slot(swsbuffer_t *swsbuffer);
/**
* swsbuffer_has_finished_slot:
* @swsbuffer : sswsbuffer.
*
* Returns true if the buffers next slot is finished and a
* context available.
*/
bool swsbuffer_has_finished_slot(swsbuffer_t *swsbuffer);
RETRO_END_DECLS
#endif

View File

@ -0,0 +1,264 @@
/* The MIT License
*
* Copyright (c) 2010-2019 The RetroArch team
* Copyright (c) 2017 John Schember <john@nachtimwald.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE
*/
#include <stdlib.h>
#include <rthreads/rthreads.h>
#include "tpool.h"
/* Work object which will sit in a queue
* waiting for the pool to process it.
*
* It is a singly linked list acting as a FIFO queue. */
struct tpool_work {
thread_func_t func; /* Function to be called. */
void *arg; /* Data to be passed to func. */
struct tpool_work *next; /* Next work item in the queue. */
};
typedef struct tpool_work tpool_work_t;
struct tpool {
tpool_work_t *work_first; /* First work item in the work queue. */
tpool_work_t *work_last; /* Last work item in the work queue. */
slock_t *work_mutex; /* Mutex protecting inserting and removing work from the work queue. */
scond_t *work_cond; /* Conditional to signal when there is work to process. */
scond_t *working_cond; /* Conditional to signal when there is no work processing.
This will also signal when there are no threads running. */
size_t working_cnt; /* The number of threads processing work (Not waiting for work). */
size_t thread_cnt; /* Total number of threads within the pool. */
bool stop; /* Marker to tell the work threads to exit. */
};
static tpool_work_t *tpool_work_create(thread_func_t func, void *arg)
{
tpool_work_t *work;
if (func == NULL)
return NULL;
work = calloc(1, sizeof(*work));
work->func = func;
work->arg = arg;
work->next = NULL;
return work;
}
static void tpool_work_destroy(tpool_work_t *work)
{
if (work == NULL)
return;
free(work);
}
/* Pull the first work item out of the queue. */
static tpool_work_t *tpool_work_get(tpool_t *tp)
{
tpool_work_t *work;
if (tp == NULL)
return NULL;
work = tp->work_first;
if (work == NULL)
return NULL;
if (work->next == NULL)
{
tp->work_first = NULL;
tp->work_last = NULL;
} else {
tp->work_first = work->next;
}
return work;
}
static void tpool_worker(void *arg)
{
tpool_t *tp = arg;
tpool_work_t *work;
while (true)
{
slock_lock(tp->work_mutex);
/* Keep running until told to stop. */
if (tp->stop)
break;
/* If there is no work in the queue wait in the conditional until
* there is work to take. */
if (tp->work_first == NULL)
scond_wait(tp->work_cond, tp->work_mutex);
/* Try to pull work from the queue. */
work = tpool_work_get(tp);
tp->working_cnt++;
slock_unlock(tp->work_mutex);
/* Call the work function and let it process.
*
* work can legitimately be NULL. Since multiple threads from the pool
* will wake when there is work, a thread might not get any work. 1
* piece of work and 2 threads, both will wake but with 1 only work 1
* will get the work and the other won't.
*
* working_cnt has been increment and work could be NULL. While it's
* not true there is work processing the thread is considered working
* because it's not waiting in the conditional. Pedantic but...
*/
if (work != NULL)
{
work->func(work->arg);
tpool_work_destroy(work);
}
slock_lock(tp->work_mutex);
tp->working_cnt--;
/* Since we're in a lock no work can be added or removed form the queue.
* Also, the working_cnt can't be changed (except the thread holding the lock).
* At this point if there isn't any work processing and if there is no work
* signal this is the case. */
if (!tp->stop && tp->working_cnt == 0 && tp->work_first == NULL)
scond_signal(tp->working_cond);
slock_unlock(tp->work_mutex);
}
tp->thread_cnt--;
if (tp->thread_cnt == 0)
scond_signal(tp->working_cond);
slock_unlock(tp->work_mutex);
}
tpool_t *tpool_create(size_t num)
{
tpool_t *tp;
sthread_t *thread;
size_t i;
if (num == 0)
num = 2;
tp = calloc(1, sizeof(*tp));
tp->thread_cnt = num;
tp->work_mutex = slock_new();
tp->work_cond = scond_new();
tp->working_cond = scond_new();
tp->work_first = NULL;
tp->work_last = NULL;
/* Create the requested number of thread and detach them. */
for (i = 0; i < num; i++)
{
thread = sthread_create(tpool_worker, tp);
sthread_detach(thread);
}
return tp;
}
void tpool_destroy(tpool_t *tp)
{
tpool_work_t *work;
tpool_work_t *work2;
if (tp == NULL)
return;
/* Take all work out of the queue and destroy it. */
slock_lock(tp->work_mutex);
work = tp->work_first;
while (work != NULL)
{
work2 = work->next;
tpool_work_destroy(work);
work = work2;
}
/* Tell the worker threads to stop. */
tp->stop = true;
scond_broadcast(tp->work_cond);
slock_unlock(tp->work_mutex);
/* Wait for all threads to stop. */
tpool_wait(tp);
slock_free(tp->work_mutex);
scond_free(tp->work_cond);
scond_free(tp->working_cond);
free(tp);
}
bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg)
{
tpool_work_t *work;
if (tp == NULL)
return false;
work = tpool_work_create(func, arg);
if (work == NULL)
return false;
slock_lock(tp->work_mutex);
if (tp->work_first == NULL)
{
tp->work_first = work;
tp->work_last = tp->work_first;
}
else
{
tp->work_last->next = work;
tp->work_last = work;
}
scond_broadcast(tp->work_cond);
slock_unlock(tp->work_mutex);
return true;
}
void tpool_wait(tpool_t *tp)
{
if (tp == NULL)
return;
slock_lock(tp->work_mutex);
while (true)
{
/* working_cond is dual use. It signals when we're not stopping but the
* working_cnt is 0 indicating there isn't any work processing. If we
* are stopping it will trigger when there aren't any threads running. */
if ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0))
{
scond_wait(tp->working_cond, tp->work_mutex);
}
else
{
break;
}
}
slock_unlock(tp->work_mutex);
}

View File

@ -0,0 +1,93 @@
/* The MIT License
*
* Copyright (c) 2010-2019 The RetroArch team
* Copyright (c) 2017 John Schember <john@nachtimwald.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE
*/
#ifndef __LIBRETRO_SDK_TPOOL_H__
#define __LIBRETRO_SDK_TPOOL_H__
#include <retro_common_api.h>
#include <boolean.h>
#include <retro_inline.h>
#include <retro_miscellaneous.h>
RETRO_BEGIN_DECLS
struct tpool;
typedef struct tpool tpool_t;
/**
* (*thread_func_t):
* @arg : Argument.
*
* Callback function that the pool will call to do work.
**/
typedef void (*thread_func_t)(void *arg);
/**
* tpool_create:
* @num : Number of threads the pool should have.
* If 0 defaults to 2.
*
* Create a thread pool.
*
* Returns: pool.
*/
tpool_t *tpool_create(size_t num);
/**
* tpool_destroy:
* @tp : Thread pool.
*
* Destory a thread pool
* The pool can be destroyed while there is outstanding work to process. All
* outstanding unprocessed work will be discareded. There may be a delay before
* this function returns because it will block for work that is processing to
* complete.
**/
void tpool_destroy(tpool_t *tp);
/**
* tpool_add_work:
* @tp : Thread pool.
* @func : Function the pool should call.
* @arg : Argument to pass to func.
*
* Add work to a thread pool.
*
* Returns: true if work was added, otherwise false.
**/
bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg);
/**
* tpool_wait:
* @tp Thread pool.
*
* Wait for all work in the pool to be completed.
*/
void tpool_wait(tpool_t *tp);
RETRO_END_DECLS
#endif

View File

@ -268,7 +268,9 @@ int sthread_detach(sthread_t *thread)
free(thread);
return 0;
#else
return pthread_detach(thread->id);
int ret = pthread_detach(thread->id);
free(thread);
return ret;
#endif
}