Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32763 Prevent crash when reporting a disk full error writing to a compressed file #19179

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions system/jlib/jfcmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ class jlib_decl CFcmpCompressor : public CSimpleInterfaceOf<ICompressor>

virtual void close() override
{
//Protect against close() being called more than once on a compressor
if (!inbuf)
{
if (isDebugBuild())
throwUnexpectedX("CFCmpCompressor::close() called more than once");
return;
}
if (inlenblk!=COMMITTED)
{
inlen = inlenblk; // transaction failed
Expand Down
3 changes: 3 additions & 0 deletions system/jlib/jlzw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2773,6 +2773,9 @@ class CAESCompressor : implements ICompressor, public CInterface

virtual void close() override
{
if (outmax == 0)
return;

comp->close();
// now encrypt
MemoryBuffer buf;
Expand Down
230 changes: 155 additions & 75 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3463,92 +3463,44 @@ class JlibIOTest : public CppUnit::TestFixture
CPPUNIT_TEST_SUITE_REGISTRATION(JlibIOTest);
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(JlibIOTest, "JlibIOTest");

//--------------------------------------------------------------------------------------------------------------------

class JlibCompressionTestsStress : public CppUnit::TestFixture
class JlibCompressionTestBase : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(JlibCompressionTestsStress);
CPPUNIT_TEST(test);
CPPUNIT_TEST_SUITE_END();

protected:
static constexpr size32_t sz = 100*0x100000; // 100MB
static constexpr const char *aesKey = "012345678901234567890123";
enum CompressOpt { RowCompress, AllRowCompress, BlockCompress, CompressToBuffer };
public:
void test()
void initCompressionBuffer(MemoryBuffer & src, size32_t & rowSz)
{
try
{
MemoryBuffer src;
src.ensureCapacity(sz);
const char *aesKey = "012345678901234567890123";
Owned<ICompressHandlerIterator> iter = getCompressHandlerIterator();
src.ensureCapacity(sz);

StringBuffer tmp;
unsigned card = 0;
size32_t rowSz = 0;
while (true)
StringBuffer tmp;
unsigned card = 0;
while (true)
{
size32_t cLen = src.length();
if (cLen > sz)
break;
src.append(cLen);
tmp.clear().appendf("%10u", cLen);
src.append(tmp.length(), tmp.str());
src.append(++card % 52);
src.append(crc32((const char *)&cLen, sizeof(cLen), 0));
unsigned ccrc = crc32((const char *)&card, sizeof(card), 0);
tmp.clear().appendf("%10u", ccrc);
src.append(tmp.length(), tmp.str());
tmp.clear().appendf("%20u", (++card % 10));
src.append(tmp.length(), tmp.str());
if (0 == rowSz)
rowSz = src.length();
else
{
size32_t cLen = src.length();
if (cLen > sz)
break;
src.append(cLen);
tmp.clear().appendf("%10u", cLen);
src.append(tmp.length(), tmp.str());
src.append(++card % 52);
src.append(crc32((const char *)&cLen, sizeof(cLen), 0));
unsigned ccrc = crc32((const char *)&card, sizeof(card), 0);
tmp.clear().appendf("%10u", ccrc);
src.append(tmp.length(), tmp.str());
tmp.clear().appendf("%20u", (++card % 10));
src.append(tmp.length(), tmp.str());
if (0 == rowSz)
rowSz = src.length();
else
{
dbgassertex(0 == (src.length() % rowSz));
}
dbgassertex(0 == (src.length() % rowSz));
}

DBGLOG("Algorithm(options) || Comp(ms) || Deco(ms) || 200MB/s (w,r) || 1GB/s (w,r) || 5GB/s (w,r) || Ratio [cLen]");
DBGLOG(" || || || 2Gb/s || 10Gb/s || 50Gb/s ||");

unsigned time200MBs = transferTimeMs(sz, 200000000);
unsigned time1GBs = transferTimeMs(sz, 1000000000);
unsigned time5GBs = transferTimeMs(sz, 5000000000);
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", "uncompressed", 0, 0,
time200MBs, time200MBs, time200MBs, time1GBs, time1GBs, time1GBs, time5GBs, time5GBs, time5GBs, 1.0, sz);
ForEach(*iter)
{
ICompressHandler &handler = iter->query();
const char * type = handler.queryType();
//Ignore unusual compressors with no expanders...
if (strieq(type, "randrow"))
continue;
const char * options = streq("AES", handler.queryType()) ? aesKey: "";
if (streq(type, "LZ4HC"))
{
testCompressor(handler, "hclevel=3", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=4", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=5", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=6", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=8", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=10", rowSz, src.length(), src.bytes(), RowCompress);
}
testCompressor(handler, options, rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, options, rowSz, src.length(), src.bytes(), CompressToBuffer);
if (streq(type, "LZ4"))
{
testCompressor(handler, "allrow", rowSz, src.length(), src.bytes(), AllRowCompress); // block doesn't affect the compressor, just tracing
testCompressor(handler, "block", rowSz, src.length(), src.bytes(), BlockCompress); // block doesn't affect the compressor, just tracing
}
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
}
}

unsigned transferTimeMs(__int64 size, __int64 bytesPerSecond)
{
return (unsigned)((size * 1000) / bytesPerSecond);
Expand All @@ -3575,6 +3527,18 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture
}
compressor->commitblock();
compressor->close();
try
{
//Test that calling close again doesn't cause any problems
compressor->close();
}
catch (IException * e)
{
//In debug mode it is ok to throw an unexpected error, in release it should be ignored.
if (!isDebugBuild() || e->errorCode() != 9999)
CPPUNIT_FAIL("Unexpected exception from second call to compressor->close()");
e->Release();
}
break;
}
case AllRowCompress:
Expand Down Expand Up @@ -3649,9 +3613,125 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture
}
};


class JlibCompressionStandardTest : public JlibCompressionTestBase
{
CPPUNIT_TEST_SUITE(JlibCompressionStandardTest);
CPPUNIT_TEST(test);
CPPUNIT_TEST_SUITE_END();

public:
void test()
{
try
{
MemoryBuffer src;
size32_t rowSz = 0;
initCompressionBuffer(src, rowSz);


DBGLOG("Algorithm(options) || Comp(ms) || Deco(ms) || 200MB/s (w,r) || 1GB/s (w,r) || 5GB/s (w,r) || Ratio [cLen]");
DBGLOG(" || || || 2Gb/s || 10Gb/s || 50Gb/s ||");

unsigned time200MBs = transferTimeMs(sz, 200000000);
unsigned time1GBs = transferTimeMs(sz, 1000000000);
unsigned time5GBs = transferTimeMs(sz, 5000000000);
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", "uncompressed", 0, 0,
time200MBs, time200MBs, time200MBs, time1GBs, time1GBs, time1GBs, time5GBs, time5GBs, time5GBs, 1.0, sz);

Owned<ICompressHandlerIterator> iter = getCompressHandlerIterator();
ForEach(*iter)
{
ICompressHandler &handler = iter->query();
const char * type = handler.queryType();
//Ignore unusual compressors with no expanders...
if (strieq(type, "randrow"))
continue;
const char * options = streq("AES", handler.queryType()) ? aesKey: "";
testCompressor(handler, options, rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, options, rowSz, src.length(), src.bytes(), CompressToBuffer);
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
}
}

};

CPPUNIT_TEST_SUITE_REGISTRATION( JlibCompressionStandardTest );
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibCompressionStandardTest, "JlibCompressionStandardTest" );


class JlibCompressionTestsStress : public JlibCompressionTestBase
{
CPPUNIT_TEST_SUITE(JlibCompressionTestsStress);
CPPUNIT_TEST(test);
CPPUNIT_TEST_SUITE_END();

public:
void test()
{
try
{
MemoryBuffer src;
size32_t rowSz = 0;
initCompressionBuffer(src, rowSz);

DBGLOG("Algorithm(options) || Comp(ms) || Deco(ms) || 200MB/s (w,r) || 1GB/s (w,r) || 5GB/s (w,r) || Ratio [cLen]");
DBGLOG(" || || || 2Gb/s || 10Gb/s || 50Gb/s ||");

unsigned time200MBs = transferTimeMs(sz, 200000000);
unsigned time1GBs = transferTimeMs(sz, 1000000000);
unsigned time5GBs = transferTimeMs(sz, 5000000000);
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", "uncompressed", 0, 0,
time200MBs, time200MBs, time200MBs, time1GBs, time1GBs, time1GBs, time5GBs, time5GBs, time5GBs, 1.0, sz);

Owned<ICompressHandlerIterator> iter = getCompressHandlerIterator();
ForEach(*iter)
{
ICompressHandler &handler = iter->query();
const char * type = handler.queryType();
//Ignore unusual compressors with no expanders...
if (strieq(type, "randrow"))
continue;
const char * options = streq("AES", handler.queryType()) ? aesKey: "";
if (streq(type, "LZ4HC"))
{
testCompressor(handler, "hclevel=3", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=4", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=5", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=6", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=8", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=10", rowSz, src.length(), src.bytes(), RowCompress);
}
testCompressor(handler, options, rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, options, rowSz, src.length(), src.bytes(), CompressToBuffer);
if (streq(type, "LZ4"))
{
testCompressor(handler, "allrow", rowSz, src.length(), src.bytes(), AllRowCompress); // block doesn't affect the compressor, just tracing
testCompressor(handler, "block", rowSz, src.length(), src.bytes(), BlockCompress); // block doesn't affect the compressor, just tracing
}
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
}
}

};

CPPUNIT_TEST_SUITE_REGISTRATION( JlibCompressionTestsStress );
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibCompressionTestsStress, "JlibCompressionTestsStress" );


//--------------------------------------------------------------------------------------------------------------------


class JlibFriendlySizeTest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(JlibFriendlySizeTest);
Expand Down
Loading