Skip to content

Commit

Permalink
Interface hsync_TDictCompress adding support getDictCompressBorder();…
Browse files Browse the repository at this point in the history
… for hsynz;

(This new api for optimize libdeflate dict compress speed to 10x)
  • Loading branch information
housisong committed Jul 5, 2024
1 parent 9fa3f6f commit 914c6db
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
3 changes: 2 additions & 1 deletion libhsync/sync_make/dict_compress_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ extern "C" {
size_t* out_dictSize);
size_t (*dictCompress)(hsync_dictCompressHandle dictHandle,size_t blockIndex,
hpatch_byte* out_code,hpatch_byte* out_codeEnd,
const hpatch_byte* in_dataBegin,const hpatch_byte* in_dataEnd);
const hpatch_byte* in_dataBegin,size_t in_dataSize,size_t in_borderSize);
const char* (*compressTypeForDisplay)(void);//like compressType but just for display,can NULL
size_t (*getDictCompressBorder)(void);//if 0,can NULL
} hsync_TDictCompress;

#define kDictCompressCancel (~(size_t)0)
Expand Down
2 changes: 1 addition & 1 deletion libhsync/sync_make/sync_info_make.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ namespace sync_private{
hsync_dictCompressHandle compressHandle=compressPlugin->dictCompressOpen(compressPlugin,1,buf.size());
checkv(compressHandle!=0);
size_t compressedSize=compressPlugin->dictCompress(compressHandle,0,cmbuf.data(),cmbuf.data()+cmbuf.size(),
buf.data(),buf.data()+buf.size());
buf.data(),buf.size(),0);
checkv(compressedSize!=kDictCompressError);
if ((compressedSize==kDictCompressCancel)||(compressedSize>=buf.size()))
compressedSize=0; //cancel compress
Expand Down
38 changes: 24 additions & 14 deletions libhsync/sync_make/sync_make.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,19 @@ struct _TCompress{
}
}
inline ~_TCompress(){ if (dictCompressHandle) compressPlugin->dictCompressClose(compressPlugin,dictCompressHandle); }
inline size_t doCompress(uint32_t blockIndex,const TByte* data,const TByte* dataEnd){
inline size_t doCompress(uint32_t blockIndex,const TByte* in_data,size_t in_dataSize,size_t in_borderSize){
if (dictCompressHandle==0)
return 0;//not compress
blockEnd=blockIndex+1;
assert(cmBuf!=0);
hpatch_byte* curBuf=cmBuf+cmBufPos;
size_t result=compressPlugin->dictCompress(dictCompressHandle,blockIndex,curBuf,
curBuf+kMaxCompressedSize,data,dataEnd);
curBuf+kMaxCompressedSize,in_data,in_dataSize,in_borderSize);
checkv(result!=kDictCompressError);
if (result==kDictCompressCancel){
result=0; //cancel compress
size_t dataSize=dataEnd-data;
memcpy(curBuf,data,dataSize);
cmBufPos+=dataSize;
memcpy(curBuf,in_data,in_dataSize);
cmBufPos+=in_dataSize;
}else{
checkv((result<=kMaxCompressedSize)
&&(result<=2*kSyncBlockSize)); //for decompress memroy size ctrl
Expand Down Expand Up @@ -122,6 +121,7 @@ struct _TCompress{
uint32_t blockBegin;
uint32_t blockEnd;
unsigned char* buf;
size_t in_borderSize;
size_t cmSize;
#if (_IS_USED_MULTITHREAD)
struct TWorkBuf* next;
Expand Down Expand Up @@ -231,7 +231,7 @@ static void _saveCompressedData(_TCreateDatas& cd,TWorkBuf* workData,void* _mt=0
if (cd.out_hsynz){
size_t writeSize=workData->cmSize;
if (writeSize>0){
hpatch_byte* cmBuf=workData->buf+dataLens;
hpatch_byte* cmBuf=workData->buf+dataLens+workData->in_borderSize;
writeStream(cd.out_hsynz,cd.curOutPos,cmBuf,writeSize);
}else{
writeSize=dataLens-backZeroLen;
Expand Down Expand Up @@ -264,35 +264,41 @@ static void _create_sync_data_part(_TCreateDatas& cd,TWorkBuf* workData,
TNewDataSyncInfo* out_hsyni=cd.out_hsyni;
const uint32_t kSyncBlockSize=out_hsyni->kSyncBlockSize;
const uint32_t kSyncBlockCount=(workData->blockEnd-workData->blockBegin);
hpatch_StreamPos_t curReadPos=(hpatch_StreamPos_t)workData->blockBegin*kSyncBlockSize;

size_t backZeroLen=0;
const size_t dataLens=(size_t)kSyncBlockSize*kSyncBlockCount;
compress.cmBuf=workData->buf+dataLens;
compress.cmBuf=workData->buf+dataLens+workData->in_borderSize;
compress.cmBufPos=0;
{//read data
const hpatch_StreamPos_t curReadPos=(hpatch_StreamPos_t)workData->blockBegin*kSyncBlockSize;
if (curReadPos+dataLens>cd.newData->streamSize){
backZeroLen=(size_t)(curReadPos+dataLens-cd.newData->streamSize);
assert(backZeroLen<kSyncBlockSize);
}
{
size_t readLen=dataLens-backZeroLen+workData->in_borderSize;
if (curReadPos+readLen>cd.newData->streamSize)
readLen=cd.newData->streamSize-curReadPos;
#if (_IS_USED_MULTITHREAD)
TMt* mt=(TMt*)_mt;
CAutoLocker _autoLocker(mt?mt->readLocker.locker:0);
if (mt)
compress.resertDict(cd.newData,curReadPos,workData->blockBegin);
#endif
checkv(cd.newData->read(cd.newData,curReadPos,workData->buf,workData->buf+dataLens-backZeroLen));
checkv(cd.newData->read(cd.newData,curReadPos,workData->buf,workData->buf+readLen));
}
if (backZeroLen)
memset(workData->buf+dataLens-backZeroLen,0,backZeroLen);
}

hpatch_byte* dataBuf=workData->buf;
for (uint32_t i=workData->blockBegin;i<workData->blockEnd;++i,dataBuf+=kSyncBlockSize) {
for (uint32_t i=workData->blockBegin;i<workData->blockEnd;++i,dataBuf+=kSyncBlockSize,curReadPos+=kSyncBlockSize) {
size_t srcDataLen=(i+1<workData->blockEnd)?kSyncBlockSize:kSyncBlockSize-backZeroLen;
size_t cur_borderSize=workData->in_borderSize;
if (curReadPos+srcDataLen+cur_borderSize>cd.newData->streamSize)
cur_borderSize=cd.newData->streamSize-(curReadPos+srcDataLen);
//compress
size_t compressedSize=compress.doCompress(i,dataBuf,dataBuf+srcDataLen);
size_t compressedSize=compress.doCompress(i,dataBuf,srcDataLen,cur_borderSize);
checkv(compressedSize==(uint32_t)compressedSize);
if (out_hsyni->savedSizes) //save compressedSize
out_hsyni->savedSizes[i]=(uint32_t)compressedSize;
Expand Down Expand Up @@ -387,6 +393,7 @@ void _private_create_sync_data(TNewDataSyncInfo* newSyncInfo,
checkChecksumInit(createDatas.out_hsyni->savedNewDataCheckChecksum,
createDatas.out_hsyni->kStrongChecksumByteSize);

const size_t in_borderSize=(compressPlugin&&compressPlugin->getDictCompressBorder)?compressPlugin->getDictCompressBorder():0;
const hpatch_StreamPos_t kMaxCompressedSize=compressPlugin?compressPlugin->maxCompressedSize(kSyncBlockSize):0;
const size_t _kBestWorkBufSize=2*(1<<20);
uint32_t bestWorkBlockCount=(uint32_t)((_kBestWorkBufSize+kSyncBlockSize+kMaxCompressedSize-1)
Expand All @@ -402,7 +409,7 @@ void _private_create_sync_data(TNewDataSyncInfo* newSyncInfo,
bestWorkBlockCount=(bestWorkBlockCount<=maxWorkBlockCount)?bestWorkBlockCount:maxWorkBlockCount;

const size_t kWorkBufCount=threadNum+(threadNum/4)+1;
const size_t kWorkMemBufSize=(kSyncBlockSize+(size_t)kMaxCompressedSize)*bestWorkBlockCount;
const size_t kWorkMemBufSize=(kSyncBlockSize+(size_t)kMaxCompressedSize)*bestWorkBlockCount+in_borderSize;
TAutoMem membuf((sizeof(TWorkBuf)+kWorkMemBufSize)*kWorkBufCount);

TMt mt(threadNum,createDatas,bestWorkBlockCount,checksumPlugin,compressPlugin,
Expand All @@ -413,6 +420,7 @@ void _private_create_sync_data(TNewDataSyncInfo* newSyncInfo,
memset(workBufList,0,sizeof(TWorkBuf)*kWorkBufCount);
for (size_t i=0;i<kWorkBufCount;++i,workBufList++,workMemBuf+=kWorkMemBufSize){
workBufList->buf=workMemBuf;
workBufList->in_borderSize=in_borderSize;
checkv(mt.read_chan.send(workBufList,true));
}
}
Expand All @@ -424,10 +432,12 @@ void _private_create_sync_data(TNewDataSyncInfo* newSyncInfo,
}else
#endif
{
TAutoMem membuf((kSyncBlockSize+(size_t)kMaxCompressedSize)*bestWorkBlockCount);
TAutoMem membuf((kSyncBlockSize+(size_t)kMaxCompressedSize)*bestWorkBlockCount+in_borderSize);
CChecksum checksumBlockData(checksumPlugin,false);
_TCompress compress(compressPlugin,kBlockCount,kSyncBlockSize,createDatas.out_hsyni);
TWorkBuf workData={0}; workData.buf=membuf.data();
TWorkBuf workData={0};
workData.buf=membuf.data();
workData.in_borderSize=in_borderSize;
for (uint32_t ib=0; ib<kBlockCount; ib+=bestWorkBlockCount){
workData.blockBegin=ib;
workData.blockEnd=(ib+bestWorkBlockCount<=kBlockCount)?ib+bestWorkBlockCount:kBlockCount;
Expand Down

0 comments on commit 914c6db

Please sign in to comment.