Expand thread.c - replace pthread with thread.c wrappers

This commit is contained in:
twinaphex 2013-11-01 04:49:29 +01:00
parent 334fe12d75
commit e4c1ebf344
4 changed files with 108 additions and 104 deletions

View File

@ -108,8 +108,8 @@ static int rsnd_update_server_info(rsound_t *rd);
static int rsnd_poll(struct pollfd *fd, int numfd, int timeout);
static void rsnd_sleep(int msec);
static void* rsnd_cb_thread(void *thread_data);
static void* rsnd_thread(void *thread_data);
static void rsnd_cb_thread(void *thread_data);
static void rsnd_thread(void *thread_data);
#ifdef __CELLOS_LV2__
static int init_count = 0;
@ -747,15 +747,15 @@ static void rsnd_drain(rsound_t *rd)
delta *= rd->rate * rd->channels * rd->samplesize;
delta /= 1000000;
/* Calculates the amount of data we have in our virtual buffer. Only used to calculate delay. */
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
rd->bytes_in_buffer = (int)((int64_t)rd->total_written + (int64_t)fifo_read_avail(rd->fifo_buffer) - delta);
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
}
else
{
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
rd->bytes_in_buffer = fifo_read_avail(rd->fifo_buffer);
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
}
}
@ -771,44 +771,43 @@ static size_t rsnd_fill_buffer(rsound_t *rd, const char *buf, size_t size)
if ( !rd->thread_active )
return 0;
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
if ( fifo_write_avail(rd->fifo_buffer) >= size )
{
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
break;
}
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
/* Sleeps until we can write to the FIFO. */
pthread_mutex_lock(&rd->thread.cond_mutex);
pthread_cond_signal(&rd->thread.cond);
slock_lock(rd->thread.cond_mutex);
scond_signal(rd->thread.cond);
RSD_DEBUG("[RSound] rsnd_fill_buffer: Going to sleep.\n");
pthread_cond_wait(&rd->thread.cond, &rd->thread.cond_mutex);
scond_wait(rd->thread.cond, rd->thread.cond_mutex);
RSD_DEBUG("[RSound] rsnd_fill_buffer: Woke up.\n");
pthread_mutex_unlock(&rd->thread.cond_mutex);
slock_unlock(rd->thread.cond_mutex);
}
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
fifo_write(rd->fifo_buffer, buf, size);
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
//RSD_DEBUG("[RSound] fill_buffer: Wrote to buffer.\n");
/* Send signal to thread that buffer has been updated */
//RSD_DEBUG("[RSound] fill_buffer: Waking up thread.\n");
pthread_cond_signal(&rd->thread.cond);
scond_signal(rd->thread.cond);
return size;
}
static int rsnd_start_thread(rsound_t *rd)
{
int rc;
if ( !rd->thread_active )
{
rd->thread_active = 1;
rc = pthread_create(&rd->thread.threadId, NULL, rd->audio_callback ? rsnd_cb_thread : rsnd_thread, rd);
if ( rc < 0 )
rd->thread.thread = (sthread_t*)sthread_create(rd->audio_callback ? rsnd_cb_thread : rsnd_thread, rd);
if ( !rd->thread.thread )
{
rd->thread_active = 0;
RSD_ERR("[RSound] Failed to create thread.");
@ -828,15 +827,13 @@ static int rsnd_stop_thread(rsound_t *rd)
RSD_DEBUG("[RSound] Shutting down thread.\n");
pthread_mutex_lock(&rd->thread.cond_mutex);
slock_lock(rd->thread.cond_mutex);
rd->thread_active = 0;
pthread_cond_signal(&rd->thread.cond);
pthread_mutex_unlock(&rd->thread.cond_mutex);
scond_signal(rd->thread.cond);
slock_unlock(rd->thread.cond_mutex);
if ( pthread_join(rd->thread.threadId, NULL) < 0 )
RSD_WARN("[RSound] *** Warning, did not terminate thread. ***\n");
else
RSD_DEBUG("[RSound] Thread joined successfully.\n");
sthread_join(rd->thread.thread);
RSD_DEBUG("[RSound] Thread joined successfully.\n");
return 0;
}
@ -857,10 +854,10 @@ static size_t rsnd_get_delay(rsound_t *rd)
/* Adds the backend latency to the calculated latency. */
ptr += (int)rd->backend_info.latency;
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
ptr += rd->delay_offset;
RSD_DEBUG("Offset: %d.\n", rd->delay_offset);
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
if ( ptr < 0 )
ptr = 0;
@ -871,9 +868,9 @@ static size_t rsnd_get_delay(rsound_t *rd)
static size_t rsnd_get_ptr(rsound_t *rd)
{
int ptr;
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
ptr = fifo_read_avail(rd->fifo_buffer);
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
return ptr;
}
@ -1049,9 +1046,9 @@ static int rsnd_update_server_info(rsound_t *rd)
int delay = rsd_delay(rd);
int delta = (int)(client_ptr - serv_ptr);
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
delta += fifo_read_avail(rd->fifo_buffer);
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
RSD_DEBUG("[RSound] Delay: %d, Delta: %d.\n", delay, delta);
@ -1065,9 +1062,9 @@ static int rsnd_update_server_info(rsound_t *rd)
else if ( offset_delta > max_offset )
offset_delta = max_offset;
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
rd->delay_offset += offset_delta;
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
RSD_DEBUG("[RSound] Changed offset-delta: %d.\n", offset_delta);
}
}
@ -1081,7 +1078,7 @@ static int rsnd_update_server_info(rsound_t *rd)
break
/* The blocking thread */
static void* rsnd_thread ( void * thread_data )
static void rsnd_thread ( void * thread_data )
{
/* We share data between thread and callable functions */
rsound_t *rd = thread_data;
@ -1105,18 +1102,18 @@ static void* rsnd_thread ( void * thread_data )
}
/* If the buffer is empty or we've stopped the stream, jump out of this for loop */
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
if ( fifo_read_avail(rd->fifo_buffer) < rd->backend_info.chunk_size || !rd->thread_active )
{
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
break;
}
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
_TEST_CANCEL();
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
fifo_read(rd->fifo_buffer, buffer, sizeof(buffer));
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
rc = rsnd_send_chunk(rd->conn.socket, buffer, sizeof(buffer), 1);
/* If this happens, we should make sure that subsequent and current calls to rsd_write() will fail. */
@ -1126,29 +1123,29 @@ static void* rsnd_thread ( void * thread_data )
rsnd_reset(rd);
/* Wakes up a potentially sleeping fill_buffer() */
pthread_cond_signal(&rd->thread.cond);
scond_signal(rd->thread.cond);
/* This thread will not be joined, so detach. */
pthread_detach(pthread_self());
pthread_exit(NULL);
sthread_detach(rd->thread.thread);
sthread_exit(rd->thread.thread);
}
/* If this was the first write, set the start point for the timer. */
if ( !rd->has_written )
{
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
rd->start_time = rsnd_get_time_usec();
rd->has_written = 1;
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
}
/* Increase the total_written counter. Used in rsnd_drain() */
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
rd->total_written += rc;
pthread_mutex_unlock(&rd->thread.mutex);
slock_unlock(rd->thread.mutex);
/* Buffer has decreased, signal fill_buffer() */
pthread_cond_signal(&rd->thread.cond);
scond_signal(rd->thread.cond);
}
@ -1159,32 +1156,31 @@ static void* rsnd_thread ( void * thread_data )
// There is a very slim change of getting a deadlock using the cond_wait scheme.
// This solution is rather dirty, but avoids complete deadlocks at the very least.
pthread_mutex_lock(&rd->thread.cond_mutex);
pthread_cond_signal(&rd->thread.cond);
slock_lock(rd->thread.cond_mutex);
scond_signal(rd->thread.cond);
if ( rd->thread_active )
{
RSD_DEBUG("[RSound] Thread going to sleep.\n");
pthread_cond_wait(&rd->thread.cond, &rd->thread.cond_mutex);
scond_wait(rd->thread.cond, rd->thread.cond_mutex);
RSD_DEBUG("[RSound] Thread woke up.\n");
}
pthread_mutex_unlock(&rd->thread.cond_mutex);
slock_unlock(rd->thread.cond_mutex);
RSD_DEBUG("[RSound] Thread unlocked cond_mutex.\n");
}
/* Abort request, chap. */
else
{
pthread_cond_signal(&rd->thread.cond);
pthread_exit(NULL);
scond_signal(rd->thread.cond);
sthread_exit(rd->thread.thread);
}
}
return NULL;
}
/* Callback thread */
static void* rsnd_cb_thread(void *thread_data)
static void rsnd_cb_thread(void *thread_data)
{
rsound_t *rd = thread_data;
size_t read_size = rd->backend_info.chunk_size;
@ -1208,9 +1204,9 @@ static void* rsnd_cb_thread(void *thread_data)
if (ret < 0)
{
rsnd_reset(rd);
pthread_detach(pthread_self());
sthread_detach(rd->thread.thread);
rd->error_callback(rd->cb_data);
pthread_exit(NULL);
sthread_exit(rd->thread.thread);
}
has_read += ret;
@ -1237,9 +1233,9 @@ static void* rsnd_cb_thread(void *thread_data)
if (ret != (ssize_t)rd->backend_info.chunk_size)
{
rsnd_reset(rd);
pthread_detach(pthread_self());
sthread_detach(rd->thread.thread);
rd->error_callback(rd->cb_data);
pthread_exit(NULL);
sthread_exit(rd->thread.thread);
}
/* If this was the first write, set the start point for the timer. */
@ -1260,8 +1256,7 @@ static void* rsnd_cb_thread(void *thread_data)
if (rd->has_written)
rsd_delay_wait(rd);
}
pthread_exit(NULL);
return NULL;
sthread_exit(rd->thread.thread);
}
static int rsnd_reset(rsound_t *rd)
@ -1273,7 +1268,7 @@ static int rsnd_reset(rsound_t *rd)
socketclose(rd->conn.ctl_socket);
/* Pristine stuff, baby! */
pthread_mutex_lock(&rd->thread.mutex);
slock_lock(rd->thread.mutex);
rd->conn.socket = -1;
rd->conn.ctl_socket = -1;
rd->total_written = 0;
@ -1282,8 +1277,8 @@ static int rsnd_reset(rsound_t *rd)
rd->bytes_in_buffer = 0;
rd->thread_active = 0;
rd->delay_offset = 0;
pthread_mutex_unlock(&rd->thread.mutex);
pthread_cond_signal(&rd->thread.cond);
slock_unlock(rd->thread.mutex);
scond_signal(rd->thread.cond);
return 0;
}
@ -1308,9 +1303,7 @@ size_t rsd_write( rsound_t *rsound, const void* buf, size_t size)
{
assert(rsound != NULL);
if ( !rsound->ready_for_data )
{
return 0;
}
size_t result;
size_t max_write = (rsound->buffer_size - rsound->backend_info.chunk_size)/2;
@ -1559,10 +1552,10 @@ int rsd_init(rsound_t** rsound)
(*rsound)->conn.socket = -1;
(*rsound)->conn.ctl_socket = -1;
pthread_mutex_init(&(*rsound)->thread.mutex, NULL);
pthread_mutex_init(&(*rsound)->thread.cond_mutex, NULL);
pthread_mutex_init(&(*rsound)->cb_lock, NULL);
pthread_cond_init(&(*rsound)->thread.cond, NULL);
(*rsound)->thread.mutex = slock_new();
(*rsound)->thread.cond_mutex = slock_new();
(*rsound)->cb_lock = slock_new();
(*rsound)->thread.cond = scond_new();
// Assumes default of S16_LE samples.
int format = RSD_S16_LE;
@ -1630,12 +1623,12 @@ void rsd_set_callback(rsound_t *rsound, rsd_audio_callback_t audio_cb, rsd_error
void rsd_callback_lock(rsound_t *rsound)
{
pthread_mutex_lock(&rsound->cb_lock);
slock_lock(rsound->cb_lock);
}
void rsd_callback_unlock(rsound_t *rsound)
{
pthread_mutex_unlock(&rsound->cb_lock);
slock_unlock(rsound->cb_lock);
}
int rsd_free(rsound_t *rsound)
@ -1648,31 +1641,10 @@ int rsd_free(rsound_t *rsound)
if (rsound->port)
free(rsound->port);
int err = pthread_mutex_destroy(&rsound->thread.mutex);
if (err != 0 )
{
RSD_WARN("Error: %s\n", strerror(err));
return -1;
}
if ( (err = pthread_mutex_destroy(&rsound->thread.cond_mutex)) != 0 )
{
RSD_WARN("Error: %s\n", strerror(err));
return -1;
}
if ( (err = pthread_mutex_destroy(&rsound->cb_lock)) != 0 )
{
RSD_WARN("Error: %s\n", strerror(err));
return -1;
}
if ( (err = pthread_cond_destroy(&rsound->thread.cond)) != 0 )
{
RSD_WARN("Error: %s\n", strerror(err));
return -1;
}
slock_free(rsound->thread.mutex);
slock_free(rsound->thread.cond_mutex);
slock_free(rsound->cb_lock);
scond_free(rsound->thread.cond);
free(rsound);

View File

@ -20,9 +20,15 @@
extern "C" {
#endif
//#define STANDALONE
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#ifdef STANDALONE
#include "thread.h"
#else
#include "../thread.h"
#endif
#include <sys/time.h>
#include <time.h>
#include <stdint.h>
@ -169,10 +175,10 @@ extern "C" {
int samplesize;
struct {
pthread_t threadId;
pthread_mutex_t mutex;
pthread_mutex_t cond_mutex;
pthread_cond_t cond;
sthread_t *thread;
slock_t *mutex;
slock_t *cond_mutex;
scond_t *cond;
} thread;
char identity[256];
@ -181,7 +187,7 @@ extern "C" {
rsd_error_callback_t error_callback;
size_t cb_max_size;
void *cb_data;
pthread_mutex_t cb_lock;
slock_t *cb_lock;
} rsound_t;
/* -- API --

View File

@ -1,5 +1,6 @@
/* RetroArch - A frontend for libretro.
* Copyright (C) 2010-2013 - Hans-Kristian Arntzen
* Copyright (C) 2011-2013 - Daniel De Matteis
*
* RetroArch is free software: you can redistribute it and/or modify it under the terms
* of the GNU General Public License as published by the Free Software Found-
@ -83,6 +84,12 @@ sthread_t *sthread_create(void (*thread_func)(void*), void *userdata)
return thread;
}
int sthread_detach(sthread_t *thread)
{
CloseHandle(thread->thread);
return 0;
}
void sthread_join(sthread_t *thread)
{
WaitForSingleObject(thread->thread, INFINITE);
@ -90,6 +97,12 @@ void sthread_join(sthread_t *thread)
free(thread);
}
/* FIXME - what to do here? */
void sthread_exit(sthread_t *thread)
{
(void)thread;
}
struct slock
{
CRITICAL_SECTION lock;
@ -222,12 +235,23 @@ sthread_t *sthread_create(void (*thread_func)(void*), void *userdata)
return thr;
}
int sthread_detach(sthread_t *thread)
{
(void)thread;
return pthread_detach(pthread_self());
}
void sthread_join(sthread_t *thread)
{
pthread_join(thread->id, NULL);
free(thread);
}
void sthread_exit(sthread_t *thread)
{
pthread_exit(NULL);
}
struct slock
{
pthread_mutex_t lock;

View File

@ -25,7 +25,9 @@ typedef struct sthread sthread_t;
// Threading
sthread_t *sthread_create(void (*thread_func)(void*), void *userdata);
int sthread_detach(sthread_t *thread);
void sthread_join(sthread_t *thread);
void sthread_exit(sthread_t *thread);
// Mutexes
typedef struct slock slock_t;