guard task struct read/writes across threads with a mutex, fixes several data races found by ThreadSanitizer and helgrind

This commit is contained in:
Brad Parker 2016-12-29 00:50:18 -05:00
parent 67f1cca625
commit 41f40acfc4
14 changed files with 290 additions and 106 deletions

View File

@ -202,6 +202,36 @@ void task_queue_retriever_info_free(task_retriever_info_t *list);
void task_queue_cancel_task(void *task);
void task_set_finished(retro_task_t *task, bool finished);
void task_set_mute(retro_task_t *task, bool mute);
void task_set_error(retro_task_t *task, char *error);
void task_set_progress(retro_task_t *task, int8_t progress);
void task_set_title(retro_task_t *task, char *title);
void task_set_data(retro_task_t *task, void *data);
void task_set_cancelled(retro_task_t *task, bool cancelled);
void task_free_title(retro_task_t *task);
bool task_get_cancelled(retro_task_t *task);
bool task_get_finished(retro_task_t *task);
bool task_get_mute(retro_task_t *task);
char* task_get_error(retro_task_t *task);
int8_t task_get_progress(retro_task_t *task);
char* task_get_title(retro_task_t *task);
void* task_get_data(retro_task_t *task);
RETRO_END_DECLS
#endif

View File

@ -28,6 +28,11 @@
#ifdef HAVE_THREADS
#include <rthreads/rthreads.h>
#define SLOCK_LOCK(x) slock_lock(x)
#define SLOCK_UNLOCK(x) slock_unlock(x)
#else
#define SLOCK_LOCK(x)
#define SLOCK_UNLOCK(x)
#endif
typedef struct
@ -280,6 +285,8 @@ static struct retro_task_impl impl_regular = {
#ifdef HAVE_THREADS
static slock_t *running_lock = NULL;
static slock_t *finished_lock = NULL;
static slock_t *property_lock = NULL;
static slock_t *queue_lock = NULL;
static scond_t *worker_cond = NULL;
static sthread_t *worker_thread = NULL;
static bool worker_continue = true; /* use running_lock when touching it */
@ -287,17 +294,25 @@ static bool worker_continue = true; /* use running_lock when touching it */
static void task_queue_remove(task_queue_t *queue, retro_task_t *task)
{
retro_task_t *t = NULL;
retro_task_t *front = NULL;
slock_lock(queue_lock);
front = queue->front;
slock_unlock(queue_lock);
/* Remove first element if needed */
if (task == queue->front)
if (task == front)
{
slock_lock(queue_lock);
queue->front = task->next;
front = task->next;
slock_unlock(queue_lock);
task->next = NULL;
return;
}
/* Parse queue */
t = queue->front;
t = front;
while (t && t->next)
{
/* Remove task and update queue */
@ -343,6 +358,7 @@ static void retro_task_threaded_gather(void)
{
retro_task_t *task = NULL;
slock_lock(property_lock);
slock_lock(running_lock);
for (task = tasks_running.front; task; task = task->next)
task_queue_push_progress(task);
@ -352,6 +368,7 @@ static void retro_task_threaded_gather(void)
slock_lock(finished_lock);
retro_task_internal_gather();
slock_unlock(finished_lock);
slock_unlock(property_lock);
}
static void retro_task_threaded_wait(void)
@ -418,6 +435,7 @@ static void threaded_worker(void *userdata)
for (;;)
{
retro_task_t *task = NULL;
bool finished = false;
if (!worker_continue)
break; /* should we keep running until all tasks finished? */
@ -437,12 +455,16 @@ static void threaded_worker(void *userdata)
task->handler(task);
slock_lock(property_lock);
finished = task->finished;
slock_unlock(property_lock);
slock_lock(running_lock);
task_queue_remove(&tasks_running, task);
slock_unlock(running_lock);
/* Update queue */
if (!task->finished)
if (!finished)
{
/* Re-add task to running queue */
retro_task_threaded_push_running(task);
@ -461,6 +483,8 @@ static void retro_task_threaded_init(void)
{
running_lock = slock_new();
finished_lock = slock_new();
property_lock = slock_new();
queue_lock = slock_new();
worker_cond = scond_new();
slock_lock(running_lock);
@ -482,11 +506,15 @@ static void retro_task_threaded_deinit(void)
scond_free(worker_cond);
slock_free(running_lock);
slock_free(finished_lock);
slock_free(property_lock);
slock_free(queue_lock);
worker_thread = NULL;
worker_cond = NULL;
running_lock = NULL;
finished_lock = NULL;
property_lock = NULL;
queue_lock = NULL;
}
static struct retro_task_impl impl_threaded = {
@ -574,9 +602,13 @@ bool task_queue_ctl(enum task_queue_ctl_state state, void *data)
/* Ignore this task if a related one is already running */
if (task->type == TASK_TYPE_BLOCKING)
{
retro_task_t *running = tasks_running.front;
retro_task_t *running = NULL;
bool found = false;
SLOCK_LOCK(queue_lock);
running = tasks_running.front;
SLOCK_UNLOCK(queue_lock);
for (; running; running = running->next)
{
if (running->type == TASK_TYPE_BLOCKING)
@ -646,3 +678,137 @@ void task_queue_retriever_info_free(task_retriever_info_t *list)
}
}
void task_set_finished(retro_task_t *task, bool finished)
{
SLOCK_LOCK(property_lock);
task->finished = finished;
SLOCK_UNLOCK(property_lock);
}
void task_set_mute(retro_task_t *task, bool mute)
{
SLOCK_LOCK(property_lock);
task->mute = mute;
SLOCK_UNLOCK(property_lock);
}
void task_set_error(retro_task_t *task, char *error)
{
SLOCK_LOCK(property_lock);
task->error = error;
SLOCK_UNLOCK(property_lock);
}
void task_set_progress(retro_task_t *task, int8_t progress)
{
SLOCK_LOCK(property_lock);
task->progress = progress;
SLOCK_UNLOCK(property_lock);
}
void task_set_title(retro_task_t *task, char *title)
{
SLOCK_LOCK(property_lock);
task->title = title;
SLOCK_UNLOCK(property_lock);
}
void task_set_data(retro_task_t *task, void *data)
{
SLOCK_LOCK(running_lock);
task->task_data = data;
SLOCK_UNLOCK(running_lock);
}
void task_set_cancelled(retro_task_t *task, bool cancelled)
{
SLOCK_LOCK(running_lock);
task->cancelled = cancelled;
SLOCK_UNLOCK(running_lock);
}
void task_free_title(retro_task_t *task)
{
SLOCK_LOCK(property_lock);
if (task->title)
free(task->title);
task->title = NULL;
SLOCK_UNLOCK(property_lock);
}
void* task_get_data(retro_task_t *task)
{
void *data = NULL;
SLOCK_LOCK(running_lock);
data = task->task_data;
SLOCK_UNLOCK(running_lock);
return data;
}
bool task_get_cancelled(retro_task_t *task)
{
bool cancelled = false;
SLOCK_LOCK(running_lock);
cancelled = task->cancelled;
SLOCK_UNLOCK(running_lock);
return cancelled;
}
bool task_get_finished(retro_task_t *task)
{
bool finished = false;
SLOCK_LOCK(property_lock);
finished = task->finished;
SLOCK_UNLOCK(property_lock);
return finished;
}
bool task_get_mute(retro_task_t *task)
{
bool mute = false;
SLOCK_LOCK(property_lock);
mute = task->mute;
SLOCK_UNLOCK(property_lock);
return mute;
}
char* task_get_error(retro_task_t *task)
{
char *error = NULL;
SLOCK_LOCK(property_lock);
error = task->error;
SLOCK_UNLOCK(property_lock);
return error;
}
int8_t task_get_progress(retro_task_t *task)
{
int8_t progress = 0;
SLOCK_LOCK(property_lock);
progress = task->progress;
SLOCK_UNLOCK(property_lock);
return progress;
}
char* task_get_title(retro_task_t *task)
{
char *title = NULL;
SLOCK_LOCK(property_lock);
title = task->title;
SLOCK_UNLOCK(property_lock);
return title;
}

View File

@ -156,7 +156,7 @@ static void input_autoconfigure_joypad_add(config_file_t *conf,
string_is_empty(display_name) ? params->name : display_name);
if(!remote_is_bound)
task->title = strdup(msg);
task_set_title(task, strdup(msg));
remote_is_bound = true;
}
else
@ -167,7 +167,7 @@ static void input_autoconfigure_joypad_add(config_file_t *conf,
params->idx);
if (!block_osd_spam)
task->title = strdup(msg);
task_set_title(task, strdup(msg));
}
if (!string_is_empty(params->name))
@ -296,7 +296,7 @@ static void input_autoconfigure_connect_handler(retro_task_t *task)
if (!params || string_is_empty(params->name))
{
free(params);
task->finished = true;
task_set_finished(task, true);
return;
}
@ -313,19 +313,22 @@ static void input_autoconfigure_connect_handler(retro_task_t *task)
snprintf(msg, sizeof(msg), "%s (%ld/%ld) %s.",
params->name, (long)params->vid, (long)params->pid,
msg_hash_to_str(MSG_DEVICE_NOT_CONFIGURED));
task->title = strdup(msg);
task_set_title(task, strdup(msg));
}
free(params);
task->finished = true;
task_set_finished(task, true);
}
static void input_autoconfigure_disconnect_handler(retro_task_t *task)
{
autoconfig_disconnect_t *params = (autoconfig_disconnect_t*)task->state;
task->title = strdup(params->msg);
task->finished = true;
task_set_title(task, strdup(params->msg));
task_set_finished(task, true);
RARCH_LOG("%s: %s\n", msg_hash_to_str(MSG_AUTODETECT), params->msg);

View File

@ -617,7 +617,7 @@ static void task_database_handler(retro_task_t *task)
dbinfo = db->handle;
dbstate = &db->state;
if (!dbinfo || task->cancelled)
if (!dbinfo || task_get_cancelled(task))
goto task_finished;
switch (dbinfo->status)
@ -669,7 +669,7 @@ static void task_database_handler(retro_task_t *task)
return;
task_finished:
if (task)
task->finished = true;
task_set_finished(task, true);
if (dbstate)
{

View File

@ -128,12 +128,12 @@ error:
static void task_decompress_handler_finished(retro_task_t *task,
decompress_state_t *dec)
{
task->finished = true;
task_set_finished(task, true);
if (!task->error && task->cancelled)
task->error = strdup("Task canceled");
if (!task_get_error(task) && task_get_cancelled(task))
task_set_error(task, strdup("Task canceled"));
if (task->error)
if (task_get_error(task))
free(dec->source_file);
else
{
@ -141,7 +141,7 @@ static void task_decompress_handler_finished(retro_task_t *task,
(decompress_task_data_t*)calloc(1, sizeof(*data));
data->source_file = dec->source_file;
task->task_data = data;
task_set_data(task, data);
}
if (dec->subdir)
@ -166,11 +166,11 @@ static void task_decompress_handler(retro_task_t *task)
&retdec, dec->source_file,
dec->valid_ext, file_decompressed, &userdata);
task->progress = file_archive_parse_file_progress(&dec->archive);
task_set_progress(task, file_archive_parse_file_progress(&dec->archive));
if (task->cancelled || ret != 0)
if (task_get_cancelled(task) || ret != 0)
{
task->error = dec->callback_error;
task_set_error(task, dec->callback_error);
file_archive_parse_file_iterate_stop(&dec->archive);
task_decompress_handler_finished(task, dec);
@ -191,11 +191,11 @@ static void task_decompress_handler_target_file(retro_task_t *task)
&retdec, dec->source_file,
dec->valid_ext, file_decompressed_target_file, &userdata);
task->progress = file_archive_parse_file_progress(&dec->archive);
task_set_progress(task, file_archive_parse_file_progress(&dec->archive));
if (task->cancelled || ret != 0)
if (task_get_cancelled(task) || ret != 0)
{
task->error = dec->callback_error;
task_set_error(task, dec->callback_error);
file_archive_parse_file_iterate_stop(&dec->archive);
task_decompress_handler_finished(task, dec);
@ -216,11 +216,11 @@ static void task_decompress_handler_subdir(retro_task_t *task)
&retdec, dec->source_file,
dec->valid_ext, file_decompressed_subdir, &userdata);
task->progress = file_archive_parse_file_progress(&dec->archive);
task_set_progress(task, file_archive_parse_file_progress(&dec->archive));
if (task->cancelled || ret != 0)
if (task_get_cancelled(task) || ret != 0)
{
task->error = dec->callback_error;
task_set_error(task, dec->callback_error);
file_archive_parse_file_iterate_stop(&dec->archive);
task_decompress_handler_finished(task, dec);

View File

@ -70,7 +70,7 @@ void task_file_load_handler(retro_task_t *task)
{
case NBIO_STATUS_TRANSFER_PARSE:
if (task_file_transfer_iterate_parse(nbio) == -1)
task->cancelled = true;
task_set_cancelled(task, true);
nbio->status = NBIO_STATUS_TRANSFER_PARSE_FREE;
break;
case NBIO_STATUS_TRANSFER:
@ -90,17 +90,17 @@ void task_file_load_handler(retro_task_t *task)
case IMAGE_TYPE_TGA:
case IMAGE_TYPE_BMP:
if (!task_image_load_handler(task))
task->finished = true;
task_set_finished(task, true);
break;
case 0:
if (nbio->is_finished)
task->finished = true;
task_set_finished(task, true);
break;
}
if (task->cancelled)
if (task_get_cancelled(task))
{
task->error = strdup("Task canceled.");
task->finished = true;
task_set_error(task, strdup("Task canceled."));
task_set_finished(task, true);
}
}

View File

@ -124,10 +124,7 @@ static int task_http_iterate_transfer(retro_task_t *task)
if (!net_http_update(http->handle, &pos, &tot))
{
#if 0
/* TODO/FIXME - race issue */
task->progress = (tot == 0) ? -1 : (signed)(pos * 100 / tot);
#endif
task_set_progress(task, (tot == 0) ? -1 : (signed)(pos * 100 / tot));
return -1;
}
@ -139,7 +136,7 @@ static void task_http_transfer_handler(retro_task_t *task)
http_transfer_data_t *data = NULL;
http_handle_t *http = (http_handle_t*)task->state;
if (task->cancelled)
if (task_get_cancelled(task))
goto task_finished;
switch (http->status)
@ -168,7 +165,7 @@ static void task_http_transfer_handler(retro_task_t *task)
return;
task_finished:
task->finished = true;
task_set_finished(task, true);
if (http->handle)
{
@ -178,17 +175,17 @@ task_finished:
if (tmp && http->cb)
http->cb(tmp, len);
if (net_http_error(http->handle) || task->cancelled)
if (net_http_error(http->handle) || task_get_cancelled(task))
{
tmp = (char*)net_http_data(http->handle, &len, true);
if (tmp)
free(tmp);
if (task->cancelled)
task->error = strdup("Task cancelled.");
if (task_get_cancelled(task))
task_set_error(task, strdup("Task cancelled."));
else
task->error = strdup("Download failed.");
task_set_error(task, strdup("Download failed."));
}
else
{
@ -196,12 +193,12 @@ task_finished:
data->data = tmp;
data->len = len;
task->task_data = data;
task_set_data(task, data);
}
net_http_delete(http->handle);
} else if (http->error)
task->error = strdup("Internal error.");
task_set_error(task, strdup("Internal error."));
free(http);
}
@ -234,9 +231,7 @@ static bool task_http_retriever(retro_task_t *task, void *data)
/* Fill HTTP info link */
strlcpy(info->url, http->connection.url, sizeof(info->url));
#if 0
info->progress = task->progress;
#endif
info->progress = task_get_progress(task);
return true;
}

View File

@ -309,12 +309,14 @@ bool task_image_load_handler(retro_task_t *task)
if ( (nbio && nbio->is_finished )
&& (image && image->is_finished )
&& (task && !task->cancelled))
&& (task && !task_get_cancelled(task)))
{
task->task_data = malloc(sizeof(image->ti));
void *data = malloc(sizeof(image->ti));
if (task->task_data)
memcpy(task->task_data, &image->ti, sizeof(image->ti));
if (data)
memcpy(data, &image->ti, sizeof(image->ti));
task_set_data(task, data);
return false;
}

View File

@ -72,9 +72,9 @@ static void task_netplay_lan_scan_handler(retro_task_t *task)
RARCH_NETPLAY_DISCOVERY_CTL_LAN_SEND_QUERY, NULL);
}
task->progress = 100;
task->title = strdup(msg_hash_to_str(MSG_NETPLAY_LAN_SCANNING));
task->finished = true;
task_set_progress(task, 100);
task_set_title(task, strdup(msg_hash_to_str(MSG_NETPLAY_LAN_SCANNING)));
task_set_finished(task, true);
return;
}

View File

@ -373,7 +373,7 @@ static void task_overlay_resolve_iterate(retro_task_t *task)
loader->resolve_pos, loader->size))
{
RARCH_ERR("[Overlay]: Failed to resolve next targets.\n");
task->cancelled = true;
task_set_cancelled(task, true);
loader->state = OVERLAY_STATUS_DEFERRED_ERROR;
return;
}
@ -450,7 +450,7 @@ static void task_overlay_deferred_loading(retro_task_t *task)
{
RARCH_ERR("[Overlay]: Failed to load overlay descs for overlay #%u.\n",
(unsigned)overlay->pos);
task->cancelled = true;
task_set_cancelled(task, true);
loader->state = OVERLAY_STATUS_DEFERRED_ERROR;
break;
}
@ -471,7 +471,7 @@ static void task_overlay_deferred_loading(retro_task_t *task)
loader->loading_status = OVERLAY_IMAGE_TRANSFER_NONE;
break;
case OVERLAY_IMAGE_TRANSFER_ERROR:
task->cancelled = true;
task_set_cancelled(task, true);
loader->state = OVERLAY_STATUS_DEFERRED_ERROR;
break;
}
@ -638,7 +638,7 @@ static void task_overlay_deferred_load(retro_task_t *task)
return;
error:
task->cancelled = true;
task_set_cancelled(task, true);
loader->pos = 0;
loader->state = OVERLAY_STATUS_DEFERRED_ERROR;
}
@ -652,7 +652,7 @@ static void task_overlay_free(retro_task_t *task)
if (loader->overlay_path)
free(loader->overlay_path);
if (task->cancelled)
if (task_get_cancelled(task))
{
for (i = 0; i < overlay->load_images_size; i++)
{
@ -688,16 +688,16 @@ static void task_overlay_handler(retro_task_t *task)
task_overlay_resolve_iterate(task);
break;
case OVERLAY_STATUS_DEFERRED_ERROR:
task->cancelled = true;
task_set_cancelled(task, true);
break;
case OVERLAY_STATUS_DEFERRED_DONE:
default:
case OVERLAY_STATUS_NONE:
task->finished = true;
task_set_finished(task, true);
break;
}
if (task->finished && !task->cancelled)
if (task_get_finished(task) && !task_get_cancelled(task))
{
overlay_task_data_t *data = (overlay_task_data_t*)
calloc(1, sizeof(*data));
@ -706,7 +706,7 @@ static void task_overlay_handler(retro_task_t *task)
data->size = loader->size;
data->active = loader->active;
task->task_data = data;
task_set_data(task, data);
}
}

View File

@ -58,8 +58,8 @@ static void task_powerstate_handler(retro_task_t *task)
powerstate->state = frontend->get_powerstate(&seconds, &powerstate->percent);
}
task->task_data = powerstate;
task->finished = true;
task_set_data(task, powerstate);
task_set_finished(task, true);
}
void task_push_get_powerstate(void)

View File

@ -515,17 +515,17 @@ static void task_save_handler_finished(retro_task_t *task,
{
save_task_state_t *task_data = NULL;
task->finished = true;
task_set_finished(task, true);
filestream_close(state->file);
if (!task->error && task->cancelled)
task->error = strdup("Task canceled");
if (!task_get_error(task) && task_get_cancelled(task))
task_set_error(task, strdup("Task canceled"));
task_data = (save_task_state_t*)calloc(1, sizeof(*task_data));
memcpy(task_data, state, sizeof(*state));
task->task_data = task_data;
task_set_data(task, task_data);
if (state->data)
{
@ -564,12 +564,9 @@ static void task_save_handler(retro_task_t *task)
state->written += written;
#if 0
/* TODO/FIXME - data race */
task->progress = (state->written / (float)state->size) * 100;
#endif
task_set_progress(task, (state->written / (float)state->size) * 100);
if (task->cancelled || written != remaining)
if (task_get_cancelled(task) || written != remaining)
{
char err[PATH_MAX_LENGTH];
@ -588,7 +585,7 @@ static void task_save_handler(retro_task_t *task)
else
snprintf(err, sizeof(err), "%s %s", msg_hash_to_str(MSG_FAILED_TO_SAVE_STATE_TO), state->path);
task->error = strdup(err);
task_set_error(task, strdup(err));
task_save_handler_finished(task, state);
return;
}
@ -597,10 +594,7 @@ static void task_save_handler(retro_task_t *task)
{
char *msg = NULL;
if (task->title)
free(task->title);
task->title = NULL;
task_free_title(task);
if (state->undo_save)
msg = strdup(msg_hash_to_str(MSG_RESTORED_OLD_SAVE_STATE));
@ -616,8 +610,8 @@ static void task_save_handler(retro_task_t *task)
msg = strdup(new_msg);
}
if (!task->mute && msg)
task->title = msg;
if (!task_get_mute(task) && msg)
task_set_title(task, msg);
task_save_handler_finished(task, state);
@ -694,18 +688,18 @@ static void task_load_handler_finished(retro_task_t *task,
{
load_task_data_t *task_data = NULL;
task->finished = true;
task_set_finished(task, true);
if (state->file)
filestream_close(state->file);
if (!task->error && task->cancelled)
task->error = strdup("Task canceled");
if (!task_get_error(task) && task_get_cancelled(task))
task_set_error(task, strdup("Task canceled"));
task_data = (load_task_data_t*)calloc(1, sizeof(*task_data));
memcpy(task_data, state, sizeof(*task_data));
task->task_data = task_data;
task_set_data(task, task_data);
free(state);
}
@ -749,13 +743,10 @@ static void task_load_handler(retro_task_t *task)
(uint8_t*)state->data + state->bytes_read, remaining);
state->bytes_read += bytes_read;
#if 0
/* TODO/FIXME - data race */
if (state->size > 0)
task->progress = (state->bytes_read / (float)state->size) * 100;
#endif
task_set_progress(task, (state->bytes_read / (float)state->size) * 100);
if (task->cancelled || bytes_read != remaining)
if (task_get_cancelled(task) || bytes_read != remaining)
{
if (state->autoload)
{
@ -767,10 +758,10 @@ static void task_load_handler(retro_task_t *task)
msg_hash_to_str(MSG_AUTOLOADING_SAVESTATE_FROM),
state->path,
msg_hash_to_str(MSG_FAILED));
task->error = strdup(msg);
task_set_error(task, strdup(msg));
}
else
task->error = strdup(msg_hash_to_str(MSG_FAILED_TO_LOAD_STATE));
task_set_error(task, strdup(msg_hash_to_str(MSG_FAILED_TO_LOAD_STATE)));
free(state->data);
state->data = NULL;
@ -784,10 +775,7 @@ static void task_load_handler(retro_task_t *task)
msg[0] = '\0';
if (task->title)
free(task->title);
task->title = NULL;
task_free_title(task);
if (state->autoload)
{
@ -806,8 +794,8 @@ static void task_load_handler(retro_task_t *task)
}
if (!task->mute)
task->title = strdup(msg);
if (!task_get_mute(task))
task_set_title(task, strdup(msg));
task_load_handler_finished(task, state);

View File

@ -90,9 +90,9 @@ static void task_screenshot_handler(retro_task_t *task)
bool is_paused = runloop_ctl(RUNLOOP_CTL_IS_PAUSED, NULL);
bool ret = false;
if (task->progress == 100)
if (task_get_progress(task) == 100)
{
task->finished = true;
task_set_finished(task, true);
if (state->userbuf)
free(state->userbuf);
@ -173,7 +173,7 @@ static void task_screenshot_handler(retro_task_t *task)
}
#endif
task->progress = 100;
task_set_progress(task, 100);
if (!ret)
{

View File

@ -76,10 +76,10 @@ static void wifi_scan_callback(void *task_data,
static void task_wifi_scan_handler(retro_task_t *task)
{
driver_wifi_scan();
task->progress = 100;
task->title = strdup(msg_hash_to_str(MSG_WIFI_SCAN_COMPLETE));
task->finished = true;
task_set_progress(task, 100);
task_set_title(task, strdup(msg_hash_to_str(MSG_WIFI_SCAN_COMPLETE)));
task_set_finished(task, true);
return;
}