cloud sync: speed up sync a bit by uploading/downloading in parallel (#16871)

This commit is contained in:
Eric Warmenhoven 2024-08-11 16:12:27 -04:00 committed by GitHub
parent d4920e76a9
commit 9c5a95a126
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -54,7 +54,7 @@ enum task_cloud_sync_phase
typedef struct
{
enum task_cloud_sync_phase phase;
bool waiting;
uint32_t waiting;
file_list_t *server_manifest;
size_t server_idx;
file_list_t *local_manifest;
@ -67,6 +67,9 @@ typedef struct
bool need_manifest_uploaded;
bool failures;
bool conflicts;
uint32_t uploads;
uint32_t downloads;
retro_time_t start_time;
} task_cloud_sync_state_t;
static slock_t *tcs_running_lock = NULL;
@ -94,7 +97,7 @@ static void task_cloud_sync_begin_handler(void *user_data, const char *path, boo
task_set_finished(task, true);
}
slock_lock(tcs_running_lock);
sync_state->waiting = false;
sync_state->waiting = 0;
slock_unlock(tcs_running_lock);
}
@ -195,7 +198,7 @@ static void task_cloud_sync_manifest_handler(void *user_data, const char *path,
sync_state->failures = true;
sync_state->phase = CLOUD_SYNC_PHASE_END;
slock_lock(tcs_running_lock);
sync_state->waiting = false;
sync_state->waiting = 0;
slock_unlock(tcs_running_lock);
return;
}
@ -209,7 +212,7 @@ static void task_cloud_sync_manifest_handler(void *user_data, const char *path,
}
sync_state->phase = CLOUD_SYNC_PHASE_READ_LOCAL_MANIFEST;
slock_lock(tcs_running_lock);
sync_state->waiting = false;
sync_state->waiting = 0;
slock_unlock(tcs_running_lock);
}
@ -219,11 +222,11 @@ static void task_cloud_sync_fetch_server_manifest(task_cloud_sync_state_t *sync_
task_cloud_sync_manifest_filename(manifest_path, sizeof(manifest_path), true);
sync_state->waiting = true;
sync_state->waiting = 1;
if (!cloud_sync_read(MANIFEST_FILENAME_SERVER, manifest_path, task_cloud_sync_manifest_handler, sync_state))
{
RARCH_WARN(CSPFX "could not read server manifest\n");
sync_state->waiting = false;
sync_state->waiting = 0;
sync_state->phase = CLOUD_SYNC_PHASE_END;
}
}
@ -385,12 +388,10 @@ static void task_cloud_sync_update_progress(retro_task_t *task)
if (!(sync_state = (task_cloud_sync_state_t *)task->state))
return;
val = sync_state->server_idx + sync_state->local_idx + sync_state->current_idx;
val = sync_state->server_idx + sync_state->current_idx;
if (sync_state->server_manifest)
count += sync_state->server_manifest->size;
if (sync_state->local_manifest)
count += sync_state->local_manifest->size;
if (sync_state->current_manifest)
count += sync_state->current_manifest->size;
@ -402,11 +403,15 @@ static void task_cloud_sync_update_progress(retro_task_t *task)
static void task_cloud_sync_add_to_updated_manifest(task_cloud_sync_state_t *sync_state, const char *key, char *hash, bool server)
{
file_list_t *list = server ? sync_state->updated_server_manifest : sync_state->updated_local_manifest;
size_t idx = list->size;
file_list_t *list;
size_t idx;
slock_lock(tcs_running_lock);
list = server ? sync_state->updated_server_manifest : sync_state->updated_local_manifest;
idx = list->size;
file_list_append(list, NULL, NULL, 0, 0, 0);
file_list_set_alt_at_offset(list, idx, key);
list->list[idx].userdata = hash;
slock_unlock(tcs_running_lock);
}
static INLINE int task_cloud_sync_key_cmp(struct item_file *left, struct item_file *right)
@ -495,6 +500,7 @@ static void task_cloud_sync_fetch_cb(void *user_data, const char *path, bool suc
filestream_close(file);
RARCH_LOG(CSPFX "successfully fetched %s\n", path);
task_cloud_sync_add_to_updated_manifest(sync_state, path, hash, false);
sync_state->downloads++;
}
else
{
@ -507,7 +513,7 @@ static void task_cloud_sync_fetch_cb(void *user_data, const char *path, bool suc
}
slock_lock(tcs_running_lock);
sync_state->waiting = false;
sync_state->waiting--;
slock_unlock(tcs_running_lock);
}
@ -559,7 +565,7 @@ static void task_cloud_sync_fetch_server_file(task_cloud_sync_state_t *sync_stat
fill_pathname_basedir(directory, filename, sizeof(directory));
path_mkdir(directory);
if (cloud_sync_read(key, filename, task_cloud_sync_fetch_cb, sync_state))
sync_state->waiting = true;
sync_state->waiting++;
else
{
RARCH_WARN(CSPFX "wanted to fetch %s but failed\n", key);
@ -606,6 +612,7 @@ static void task_cloud_sync_upload_cb(void *user_data, const char *path, bool su
sync_state->need_manifest_uploaded = true;
}
RARCH_LOG(CSPFX "uploading %s succeeded\n", path);
sync_state->uploads++;
}
else
{
@ -620,7 +627,7 @@ static void task_cloud_sync_upload_cb(void *user_data, const char *path, bool su
}
slock_lock(tcs_running_lock);
sync_state->waiting = false;
sync_state->waiting++;
slock_unlock(tcs_running_lock);
}
@ -647,7 +654,7 @@ static void task_cloud_sync_upload_current_file(task_cloud_sync_state_t *sync_st
item->userdata = task_cloud_sync_md5_rfile(file);
filestream_seek(file, 0, SEEK_SET);
sync_state->waiting = true;
sync_state->waiting++;
if (!cloud_sync_update(path, file, task_cloud_sync_upload_cb, sync_state))
{
/* if the upload fails, try to resurrect the hash from the last sync */
@ -658,7 +665,7 @@ static void task_cloud_sync_upload_current_file(task_cloud_sync_state_t *sync_st
task_cloud_sync_add_to_updated_manifest(sync_state, path, CS_FILE_HASH(local_file), false);
}
filestream_close(file);
sync_state->waiting = false;
sync_state->waiting--;
sync_state->failures = true;
RARCH_WARN(CSPFX "uploading %s failed\n", path);
}
@ -747,7 +754,7 @@ static void task_cloud_sync_delete_cb(void *user_data, const char *path, bool su
RARCH_WARN(CSPFX "deleting %s failed\n", path);
sync_state->failures = true;
slock_lock(tcs_running_lock);
sync_state->waiting = false;
sync_state->waiting--;
slock_unlock(tcs_running_lock);
return;
}
@ -760,7 +767,7 @@ static void task_cloud_sync_delete_cb(void *user_data, const char *path, bool su
task_cloud_sync_add_to_updated_manifest(sync_state, path, NULL, false);
sync_state->need_manifest_uploaded = true;
slock_lock(tcs_running_lock);
sync_state->waiting = false;
sync_state->waiting--;
slock_unlock(tcs_running_lock);
}
@ -777,7 +784,7 @@ static void task_cloud_sync_delete_server_file(task_cloud_sync_state_t *sync_sta
RARCH_LOG(CSPFX "deleting %s\n", key);
sync_state->waiting = true;
sync_state->waiting++;
if (!cloud_sync_free(key, task_cloud_sync_delete_cb, sync_state))
{
/* if the delete fails, resurrect the hash from the last sync */
@ -789,7 +796,7 @@ static void task_cloud_sync_delete_server_file(task_cloud_sync_state_t *sync_sta
}
task_cloud_sync_add_to_updated_manifest(sync_state, key, CS_FILE_HASH(server_file), true);
/* we don't mark need_manifest_uploaded here, nothing has changed */
sync_state->waiting = false;
sync_state->waiting--;
}
}
@ -929,7 +936,7 @@ static void task_cloud_sync_update_manifest_cb(void *user_data, const char *path
RARCH_LOG(CSPFX "uploading updated manifest succeeded\n");
sync_state->phase = CLOUD_SYNC_PHASE_END;
slock_lock(tcs_running_lock);
sync_state->waiting = false;
sync_state->waiting = 0;
slock_unlock(tcs_running_lock);
}
@ -948,6 +955,10 @@ static RFILE *task_cloud_sync_write_updated_manifest(file_list_t *manifest, char
return NULL;
}
/* since we may be transfering files at the same time,
* the newly created manifest might be out of order */
file_list_sort_on_alt(manifest);
rjsonwriter_raw(writer, "[\n", 2);
for (; idx < manifest->size; idx++)
@ -1000,12 +1011,12 @@ static void task_cloud_sync_update_manifests(task_cloud_sync_state_t *sync_state
task_cloud_sync_manifest_filename(manifest_path, sizeof(manifest_path), true);
file = task_cloud_sync_write_updated_manifest(sync_state->updated_server_manifest, manifest_path);
filestream_seek(file, 0, SEEK_SET);
sync_state->waiting = true;
sync_state->waiting = 1;
if (!cloud_sync_update(MANIFEST_FILENAME_SERVER, file, task_cloud_sync_update_manifest_cb, sync_state))
{
RARCH_LOG(CSPFX "uploading updated manifest failed\n");
filestream_close(file);
sync_state->waiting = false;
sync_state->waiting = 0;
sync_state->failures = true;
sync_state->phase = CLOUD_SYNC_PHASE_END;
}
@ -1019,6 +1030,7 @@ static void task_cloud_sync_end_handler(void *user_data, const char *path, bool
{
retro_task_t *task = (retro_task_t *)user_data;
task_cloud_sync_state_t *sync_state = NULL;
retro_time_t end_time = cpu_features_get_time_usec();
if (!task)
return;
@ -1038,7 +1050,11 @@ static void task_cloud_sync_end_handler(void *user_data, const char *path, bool
task_set_title(task, strdup(title));
}
RARCH_LOG(CSPFX "all done!\n");
RARCH_LOG(CSPFX "finished after %lld.%06lld seconds, %d files uploaded, %d files downloaded\n",
(end_time - sync_state->start_time) / 1000 / 1000,
(end_time - sync_state->start_time) % (1000 * 1000),
sync_state->uploads, sync_state->downloads);
task_set_finished(task, true);
}
@ -1054,9 +1070,10 @@ static void task_cloud_sync_task_handler(retro_task_t *task)
goto task_finished;
slock_lock(tcs_running_lock);
if (sync_state->waiting)
/* we can transfer more than one file at a time */
if (sync_state->waiting > ((sync_state->phase == CLOUD_SYNC_PHASE_DIFF) ? 4 : 0))
{
task->when = cpu_features_get_time_usec() + 500 * 1000; /* 500ms */
task->when = cpu_features_get_time_usec() + 17 * 1000; /* 17ms */
slock_unlock(tcs_running_lock);
return;
}
@ -1065,7 +1082,7 @@ static void task_cloud_sync_task_handler(retro_task_t *task)
switch (sync_state->phase)
{
case CLOUD_SYNC_PHASE_BEGIN:
sync_state->waiting = true;
sync_state->waiting = 1;
if (!cloud_sync_begin(task_cloud_sync_begin_handler, task))
{
RARCH_WARN(CSPFX "could not begin\n");
@ -1090,7 +1107,7 @@ static void task_cloud_sync_task_handler(retro_task_t *task)
task_cloud_sync_update_manifests(sync_state);
break;
case CLOUD_SYNC_PHASE_END:
sync_state->waiting = true;
sync_state->waiting = 1;
if (!cloud_sync_end(task_cloud_sync_end_handler, task))
{
RARCH_WARN(CSPFX "could not end?!\n");
@ -1169,6 +1186,7 @@ void task_push_cloud_sync(void)
}
sync_state->phase = CLOUD_SYNC_PHASE_BEGIN;
sync_state->start_time = cpu_features_get_time_usec();
strlcpy(task_title, "Cloud Sync in progress", sizeof(task_title));