mirror of
https://github.com/RPCS3/rpcs3.git
synced 2025-01-07 03:58:09 +00:00
602 lines
14 KiB
C
602 lines
14 KiB
C
/* MtCoder.c -- Multi-thread Coder
|
|
2018-07-04 : Igor Pavlov : Public domain */
|
|
|
|
#include "Precomp.h"
|
|
|
|
#include "MtCoder.h"
|
|
|
|
#ifndef _7ZIP_ST
|
|
|
|
SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
|
|
{
|
|
CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
|
|
UInt64 inSize2 = 0;
|
|
UInt64 outSize2 = 0;
|
|
if (inSize != (UInt64)(Int64)-1)
|
|
{
|
|
inSize2 = inSize - thunk->inSize;
|
|
thunk->inSize = inSize;
|
|
}
|
|
if (outSize != (UInt64)(Int64)-1)
|
|
{
|
|
outSize2 = outSize - thunk->outSize;
|
|
thunk->outSize = outSize;
|
|
}
|
|
return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
|
|
}
|
|
|
|
|
|
void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
|
|
{
|
|
p->vt.Progress = MtProgressThunk_Progress;
|
|
}
|
|
|
|
|
|
|
|
#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
|
|
|
|
|
|
static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
|
|
{
|
|
if (Event_IsCreated(p))
|
|
return Event_Reset(p);
|
|
return AutoResetEvent_CreateNotSignaled(p);
|
|
}
|
|
|
|
|
|
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
|
|
|
|
|
|
static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
|
|
{
|
|
WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
|
|
if (wres == 0)
|
|
{
|
|
t->stop = False;
|
|
if (!Thread_WasCreated(&t->thread))
|
|
wres = Thread_Create(&t->thread, ThreadFunc, t);
|
|
if (wres == 0)
|
|
wres = Event_Set(&t->startEvent);
|
|
}
|
|
if (wres == 0)
|
|
return SZ_OK;
|
|
return MY_SRes_HRESULT_FROM_WRes(wres);
|
|
}
|
|
|
|
|
|
static void MtCoderThread_Destruct(CMtCoderThread *t)
|
|
{
|
|
if (Thread_WasCreated(&t->thread))
|
|
{
|
|
t->stop = 1;
|
|
Event_Set(&t->startEvent);
|
|
Thread_Wait(&t->thread);
|
|
Thread_Close(&t->thread);
|
|
}
|
|
|
|
Event_Close(&t->startEvent);
|
|
|
|
if (t->inBuf)
|
|
{
|
|
ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
|
|
t->inBuf = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
|
|
{
|
|
size_t size = *processedSize;
|
|
*processedSize = 0;
|
|
while (size != 0)
|
|
{
|
|
size_t cur = size;
|
|
SRes res = ISeqInStream_Read(stream, data, &cur);
|
|
*processedSize += cur;
|
|
data += cur;
|
|
size -= cur;
|
|
RINOK(res);
|
|
if (cur == 0)
|
|
return SZ_OK;
|
|
}
|
|
return SZ_OK;
|
|
}
|
|
|
|
|
|
/*
|
|
ThreadFunc2() returns:
|
|
SZ_OK - in all normal cases (even for stream error or memory allocation error)
|
|
SZ_ERROR_THREAD - in case of failure in system synch function
|
|
*/
|
|
|
|
static SRes ThreadFunc2(CMtCoderThread *t)
|
|
{
|
|
CMtCoder *mtc = t->mtCoder;
|
|
|
|
for (;;)
|
|
{
|
|
unsigned bi;
|
|
SRes res;
|
|
SRes res2;
|
|
BoolInt finished;
|
|
unsigned bufIndex;
|
|
size_t size;
|
|
const Byte *inData;
|
|
UInt64 readProcessed = 0;
|
|
|
|
RINOK_THREAD(Event_Wait(&mtc->readEvent))
|
|
|
|
/* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
|
|
|
|
if (mtc->stopReading)
|
|
{
|
|
return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
|
|
}
|
|
|
|
res = MtProgress_GetError(&mtc->mtProgress);
|
|
|
|
size = 0;
|
|
inData = NULL;
|
|
finished = True;
|
|
|
|
if (res == SZ_OK)
|
|
{
|
|
size = mtc->blockSize;
|
|
if (mtc->inStream)
|
|
{
|
|
if (!t->inBuf)
|
|
{
|
|
t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
|
|
if (!t->inBuf)
|
|
res = SZ_ERROR_MEM;
|
|
}
|
|
if (res == SZ_OK)
|
|
{
|
|
res = FullRead(mtc->inStream, t->inBuf, &size);
|
|
readProcessed = mtc->readProcessed + size;
|
|
mtc->readProcessed = readProcessed;
|
|
}
|
|
if (res != SZ_OK)
|
|
{
|
|
mtc->readRes = res;
|
|
/* after reading error - we can stop encoding of previous blocks */
|
|
MtProgress_SetError(&mtc->mtProgress, res);
|
|
}
|
|
else
|
|
finished = (size != mtc->blockSize);
|
|
}
|
|
else
|
|
{
|
|
size_t rem;
|
|
readProcessed = mtc->readProcessed;
|
|
rem = mtc->inDataSize - (size_t)readProcessed;
|
|
if (size > rem)
|
|
size = rem;
|
|
inData = mtc->inData + (size_t)readProcessed;
|
|
readProcessed += size;
|
|
mtc->readProcessed = readProcessed;
|
|
finished = (mtc->inDataSize == (size_t)readProcessed);
|
|
}
|
|
}
|
|
|
|
/* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
|
|
|
|
res2 = SZ_OK;
|
|
|
|
if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
|
|
{
|
|
res2 = SZ_ERROR_THREAD;
|
|
if (res == SZ_OK)
|
|
{
|
|
res = res2;
|
|
// MtProgress_SetError(&mtc->mtProgress, res);
|
|
}
|
|
}
|
|
|
|
bi = mtc->blockIndex;
|
|
|
|
if (++mtc->blockIndex >= mtc->numBlocksMax)
|
|
mtc->blockIndex = 0;
|
|
|
|
bufIndex = (unsigned)(int)-1;
|
|
|
|
if (res == SZ_OK)
|
|
res = MtProgress_GetError(&mtc->mtProgress);
|
|
|
|
if (res != SZ_OK)
|
|
finished = True;
|
|
|
|
if (!finished)
|
|
{
|
|
if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
|
|
&& mtc->expectedDataSize != readProcessed)
|
|
{
|
|
res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
|
|
if (res == SZ_OK)
|
|
mtc->numStartedThreads++;
|
|
else
|
|
{
|
|
MtProgress_SetError(&mtc->mtProgress, res);
|
|
finished = True;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (finished)
|
|
mtc->stopReading = True;
|
|
|
|
RINOK_THREAD(Event_Set(&mtc->readEvent))
|
|
|
|
if (res2 != SZ_OK)
|
|
return res2;
|
|
|
|
if (res == SZ_OK)
|
|
{
|
|
CriticalSection_Enter(&mtc->cs);
|
|
bufIndex = mtc->freeBlockHead;
|
|
mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
|
|
CriticalSection_Leave(&mtc->cs);
|
|
|
|
res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
|
|
mtc->inStream ? t->inBuf : inData, size, finished);
|
|
|
|
// MtProgress_Reinit(&mtc->mtProgress, t->index);
|
|
|
|
if (res != SZ_OK)
|
|
MtProgress_SetError(&mtc->mtProgress, res);
|
|
}
|
|
|
|
{
|
|
CMtCoderBlock *block = &mtc->blocks[bi];
|
|
block->res = res;
|
|
block->bufIndex = bufIndex;
|
|
block->finished = finished;
|
|
}
|
|
|
|
#ifdef MTCODER__USE_WRITE_THREAD
|
|
RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
|
|
#else
|
|
{
|
|
unsigned wi;
|
|
{
|
|
CriticalSection_Enter(&mtc->cs);
|
|
wi = mtc->writeIndex;
|
|
if (wi == bi)
|
|
mtc->writeIndex = (unsigned)(int)-1;
|
|
else
|
|
mtc->ReadyBlocks[bi] = True;
|
|
CriticalSection_Leave(&mtc->cs);
|
|
}
|
|
|
|
if (wi != bi)
|
|
{
|
|
if (res != SZ_OK || finished)
|
|
return 0;
|
|
continue;
|
|
}
|
|
|
|
if (mtc->writeRes != SZ_OK)
|
|
res = mtc->writeRes;
|
|
|
|
for (;;)
|
|
{
|
|
if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
|
|
{
|
|
res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
|
|
if (res != SZ_OK)
|
|
{
|
|
mtc->writeRes = res;
|
|
MtProgress_SetError(&mtc->mtProgress, res);
|
|
}
|
|
}
|
|
|
|
if (++wi >= mtc->numBlocksMax)
|
|
wi = 0;
|
|
{
|
|
BoolInt isReady;
|
|
|
|
CriticalSection_Enter(&mtc->cs);
|
|
|
|
if (bufIndex != (unsigned)(int)-1)
|
|
{
|
|
mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
|
|
mtc->freeBlockHead = bufIndex;
|
|
}
|
|
|
|
isReady = mtc->ReadyBlocks[wi];
|
|
|
|
if (isReady)
|
|
mtc->ReadyBlocks[wi] = False;
|
|
else
|
|
mtc->writeIndex = wi;
|
|
|
|
CriticalSection_Leave(&mtc->cs);
|
|
|
|
RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
|
|
|
|
if (!isReady)
|
|
break;
|
|
}
|
|
|
|
{
|
|
CMtCoderBlock *block = &mtc->blocks[wi];
|
|
if (res == SZ_OK && block->res != SZ_OK)
|
|
res = block->res;
|
|
bufIndex = block->bufIndex;
|
|
finished = block->finished;
|
|
}
|
|
}
|
|
}
|
|
#endif
|
|
|
|
if (finished || res != SZ_OK)
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
|
|
{
|
|
CMtCoderThread *t = (CMtCoderThread *)pp;
|
|
for (;;)
|
|
{
|
|
if (Event_Wait(&t->startEvent) != 0)
|
|
return SZ_ERROR_THREAD;
|
|
if (t->stop)
|
|
return 0;
|
|
{
|
|
SRes res = ThreadFunc2(t);
|
|
CMtCoder *mtc = t->mtCoder;
|
|
if (res != SZ_OK)
|
|
{
|
|
MtProgress_SetError(&mtc->mtProgress, res);
|
|
}
|
|
|
|
#ifndef MTCODER__USE_WRITE_THREAD
|
|
{
|
|
unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
|
|
if (numFinished == mtc->numStartedThreads)
|
|
if (Event_Set(&mtc->finishedEvent) != 0)
|
|
return SZ_ERROR_THREAD;
|
|
}
|
|
#endif
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void MtCoder_Construct(CMtCoder *p)
|
|
{
|
|
unsigned i;
|
|
|
|
p->blockSize = 0;
|
|
p->numThreadsMax = 0;
|
|
p->expectedDataSize = (UInt64)(Int64)-1;
|
|
|
|
p->inStream = NULL;
|
|
p->inData = NULL;
|
|
p->inDataSize = 0;
|
|
|
|
p->progress = NULL;
|
|
p->allocBig = NULL;
|
|
|
|
p->mtCallback = NULL;
|
|
p->mtCallbackObject = NULL;
|
|
|
|
p->allocatedBufsSize = 0;
|
|
|
|
Event_Construct(&p->readEvent);
|
|
Semaphore_Construct(&p->blocksSemaphore);
|
|
|
|
for (i = 0; i < MTCODER__THREADS_MAX; i++)
|
|
{
|
|
CMtCoderThread *t = &p->threads[i];
|
|
t->mtCoder = p;
|
|
t->index = i;
|
|
t->inBuf = NULL;
|
|
t->stop = False;
|
|
Event_Construct(&t->startEvent);
|
|
Thread_Construct(&t->thread);
|
|
}
|
|
|
|
#ifdef MTCODER__USE_WRITE_THREAD
|
|
for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
|
|
Event_Construct(&p->writeEvents[i]);
|
|
#else
|
|
Event_Construct(&p->finishedEvent);
|
|
#endif
|
|
|
|
CriticalSection_Init(&p->cs);
|
|
CriticalSection_Init(&p->mtProgress.cs);
|
|
}
|
|
|
|
|
|
|
|
|
|
static void MtCoder_Free(CMtCoder *p)
|
|
{
|
|
unsigned i;
|
|
|
|
/*
|
|
p->stopReading = True;
|
|
if (Event_IsCreated(&p->readEvent))
|
|
Event_Set(&p->readEvent);
|
|
*/
|
|
|
|
for (i = 0; i < MTCODER__THREADS_MAX; i++)
|
|
MtCoderThread_Destruct(&p->threads[i]);
|
|
|
|
Event_Close(&p->readEvent);
|
|
Semaphore_Close(&p->blocksSemaphore);
|
|
|
|
#ifdef MTCODER__USE_WRITE_THREAD
|
|
for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
|
|
Event_Close(&p->writeEvents[i]);
|
|
#else
|
|
Event_Close(&p->finishedEvent);
|
|
#endif
|
|
}
|
|
|
|
|
|
void MtCoder_Destruct(CMtCoder *p)
|
|
{
|
|
MtCoder_Free(p);
|
|
|
|
CriticalSection_Delete(&p->cs);
|
|
CriticalSection_Delete(&p->mtProgress.cs);
|
|
}
|
|
|
|
|
|
SRes MtCoder_Code(CMtCoder *p)
|
|
{
|
|
unsigned numThreads = p->numThreadsMax;
|
|
unsigned numBlocksMax;
|
|
unsigned i;
|
|
SRes res = SZ_OK;
|
|
|
|
if (numThreads > MTCODER__THREADS_MAX)
|
|
numThreads = MTCODER__THREADS_MAX;
|
|
numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
|
|
|
|
if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
|
|
if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
|
|
if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
|
|
|
|
if (numBlocksMax > MTCODER__BLOCKS_MAX)
|
|
numBlocksMax = MTCODER__BLOCKS_MAX;
|
|
|
|
if (p->blockSize != p->allocatedBufsSize)
|
|
{
|
|
for (i = 0; i < MTCODER__THREADS_MAX; i++)
|
|
{
|
|
CMtCoderThread *t = &p->threads[i];
|
|
if (t->inBuf)
|
|
{
|
|
ISzAlloc_Free(p->allocBig, t->inBuf);
|
|
t->inBuf = NULL;
|
|
}
|
|
}
|
|
p->allocatedBufsSize = p->blockSize;
|
|
}
|
|
|
|
p->readRes = SZ_OK;
|
|
|
|
MtProgress_Init(&p->mtProgress, p->progress);
|
|
|
|
#ifdef MTCODER__USE_WRITE_THREAD
|
|
for (i = 0; i < numBlocksMax; i++)
|
|
{
|
|
RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
|
|
}
|
|
#else
|
|
RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
|
|
#endif
|
|
|
|
{
|
|
RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
|
|
|
|
if (Semaphore_IsCreated(&p->blocksSemaphore))
|
|
{
|
|
RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));
|
|
}
|
|
RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
|
|
}
|
|
|
|
for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
|
|
p->freeBlockList[i] = i + 1;
|
|
p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
|
|
p->freeBlockHead = 0;
|
|
|
|
p->readProcessed = 0;
|
|
p->blockIndex = 0;
|
|
p->numBlocksMax = numBlocksMax;
|
|
p->stopReading = False;
|
|
|
|
#ifndef MTCODER__USE_WRITE_THREAD
|
|
p->writeIndex = 0;
|
|
p->writeRes = SZ_OK;
|
|
for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
|
|
p->ReadyBlocks[i] = False;
|
|
p->numFinishedThreads = 0;
|
|
#endif
|
|
|
|
p->numStartedThreadsLimit = numThreads;
|
|
p->numStartedThreads = 0;
|
|
|
|
// for (i = 0; i < numThreads; i++)
|
|
{
|
|
CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
|
|
RINOK(MtCoderThread_CreateAndStart(nextThread));
|
|
}
|
|
|
|
RINOK_THREAD(Event_Set(&p->readEvent))
|
|
|
|
#ifdef MTCODER__USE_WRITE_THREAD
|
|
{
|
|
unsigned bi = 0;
|
|
|
|
for (;; bi++)
|
|
{
|
|
if (bi >= numBlocksMax)
|
|
bi = 0;
|
|
|
|
RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
|
|
|
|
{
|
|
const CMtCoderBlock *block = &p->blocks[bi];
|
|
unsigned bufIndex = block->bufIndex;
|
|
BoolInt finished = block->finished;
|
|
if (res == SZ_OK && block->res != SZ_OK)
|
|
res = block->res;
|
|
|
|
if (bufIndex != (unsigned)(int)-1)
|
|
{
|
|
if (res == SZ_OK)
|
|
{
|
|
res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
|
|
if (res != SZ_OK)
|
|
MtProgress_SetError(&p->mtProgress, res);
|
|
}
|
|
|
|
CriticalSection_Enter(&p->cs);
|
|
{
|
|
p->freeBlockList[bufIndex] = p->freeBlockHead;
|
|
p->freeBlockHead = bufIndex;
|
|
}
|
|
CriticalSection_Leave(&p->cs);
|
|
}
|
|
|
|
RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
|
|
|
|
if (finished)
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
#else
|
|
{
|
|
WRes wres = Event_Wait(&p->finishedEvent);
|
|
res = MY_SRes_HRESULT_FROM_WRes(wres);
|
|
}
|
|
#endif
|
|
|
|
if (res == SZ_OK)
|
|
res = p->readRes;
|
|
|
|
if (res == SZ_OK)
|
|
res = p->mtProgress.res;
|
|
|
|
#ifndef MTCODER__USE_WRITE_THREAD
|
|
if (res == SZ_OK)
|
|
res = p->writeRes;
|
|
#endif
|
|
|
|
if (res != SZ_OK)
|
|
MtCoder_Free(p);
|
|
return res;
|
|
}
|
|
|
|
#endif
|