| 1 | #include <string.h> |
| 2 | #include <random> |
| 3 | #include <pcg_random.hpp> |
| 4 | #include <common/likely.h> |
| 5 | #include <common/Types.h> |
| 6 | |
| 7 | #include <IO/ReadBuffer.h> |
| 8 | #include <IO/ReadBufferFromFileDescriptor.h> |
| 9 | #include <IO/WriteBufferFromFileDescriptor.h> |
| 10 | #include <IO/BufferWithOwnMemory.h> |
| 11 | #include <Compression/CompressionInfo.h> |
| 12 | #include <IO/WriteHelpers.h> |
| 13 | #include <IO/copyData.h> |
| 14 | #include <Common/PODArray.h> |
| 15 | |
| 16 | /** Quick and dirty implementation of data scrambler. |
| 17 | * |
| 18 | * The task is to replace the data with pseudorandom values. |
| 19 | * But with keeping some probability distributions |
| 20 | * and with maintaining the same compression ratio. |
| 21 | * |
| 22 | * The solution is to operate directly on compressed LZ4 stream. |
| 23 | * The stream consists of independent compressed blocks. |
| 24 | * Each block is a stream of "literals" and "matches". |
| 25 | * Liteal is an instruction to literally put some following bytes, |
| 26 | * and match is an instruction to copy some bytes that was already seen before. |
| 27 | * |
| 28 | * We get literals and apply some scramble operation on it. |
| 29 | * But we keep literal length and matches without changes. |
| 30 | * |
| 31 | * That's how we get pseudorandom data but with keeping |
| 32 | * all repetitive patterns and maintaining the same compression ratio. |
| 33 | * |
| 34 | * Actually, the compression ratio, if you decompress scrambled data and compress again |
| 35 | * become slightly worse, because LZ4 use simple match finder based on value of hash function, |
| 36 | * and it can find different matches due to collisions in hash function. |
| 37 | * |
| 38 | * Scramble operation replace literals with pseudorandom bytes, |
| 39 | * but with some heuristics to keep some sort of data structure. |
| 40 | * |
| 41 | * It's in question, is it scramble data enough and while is it safe to publish scrambled data. |
| 42 | * In general, you should assume that it is not safe. |
| 43 | */ |
| 44 | |
| 45 | |
| 46 | #define ML_BITS 4 |
| 47 | #define ML_MASK ((1U<<ML_BITS)-1) |
| 48 | #define RUN_BITS (8-ML_BITS) |
| 49 | #define RUN_MASK ((1U<<RUN_BITS)-1) |
| 50 | |
| 51 | #define MINMATCH 4 |
| 52 | #define WILDCOPYLENGTH 8 |
| 53 | #define LASTLITERALS 5 |
| 54 | |
| 55 | |
| 56 | static UInt8 rand(pcg64 & generator, UInt8 min, UInt8 max) |
| 57 | { |
| 58 | return min + generator() % (max + 1 - min); |
| 59 | } |
| 60 | |
| 61 | static void mutate(pcg64 & generator, void * src, size_t length) |
| 62 | { |
| 63 | UInt8 * pos = static_cast<UInt8 *>(src); |
| 64 | UInt8 * end = pos + length; |
| 65 | |
| 66 | while (pos < end) |
| 67 | { |
| 68 | if (pos + strlen("https" ) <= end && 0 == memcmp(pos, "https" , strlen("https" ))) |
| 69 | { |
| 70 | pos += strlen("https" ); |
| 71 | continue; |
| 72 | } |
| 73 | |
| 74 | if (pos + strlen("http" ) <= end && 0 == memcmp(pos, "http" , strlen("http" ))) |
| 75 | { |
| 76 | pos += strlen("http" ); |
| 77 | continue; |
| 78 | } |
| 79 | |
| 80 | if (pos + strlen("www" ) <= end && 0 == memcmp(pos, "www" , strlen("www" ))) |
| 81 | { |
| 82 | pos += strlen("www" ); |
| 83 | continue; |
| 84 | } |
| 85 | |
| 86 | if (*pos >= '1' && *pos <= '9') |
| 87 | *pos = rand(generator, '1', '9'); |
| 88 | else if (*pos >= 'a' && *pos <= 'z') |
| 89 | *pos = rand(generator, 'a', 'z'); |
| 90 | else if (*pos >= 'A' && *pos <= 'Z') |
| 91 | *pos = rand(generator, 'A', 'Z'); |
| 92 | else if (*pos >= 0x80 && *pos <= 0xBF) |
| 93 | *pos = rand(generator, *pos & 0xF0U, *pos | 0x0FU); |
| 94 | else if (*pos == '\\') |
| 95 | ++pos; |
| 96 | |
| 97 | ++pos; |
| 98 | } |
| 99 | |
| 100 | pos = static_cast<UInt8 *>(src); |
| 101 | while (pos < end) |
| 102 | { |
| 103 | if (pos + 3 <= end |
| 104 | && isAlphaASCII(pos[0]) |
| 105 | && !isAlphaASCII(pos[1]) && pos[1] != '\\' && pos[1] >= 0x20 |
| 106 | && isAlphaASCII(pos[2])) |
| 107 | { |
| 108 | auto res = rand(generator, 0, 3); |
| 109 | if (res == 2) |
| 110 | { |
| 111 | std::swap(pos[0], pos[1]); |
| 112 | } |
| 113 | else if (res == 3) |
| 114 | std::swap(pos[1], pos[2]); |
| 115 | |
| 116 | pos += 3; |
| 117 | } |
| 118 | else if (pos + 5 <= end |
| 119 | && pos[0] >= 0xC0 && pos[0] <= 0xDF && pos[1] >= 0x80 && pos[1] <= 0xBF |
| 120 | && pos[2] >= 0x20 && pos[2] < 0x80 && !isAlphaASCII(pos[2]) |
| 121 | && pos[3] >= 0xC0 && pos[3] <= 0xDF && pos[4] >= 0x80 && pos[4] <= 0xBF) |
| 122 | { |
| 123 | auto res = rand(generator, 0, 3); |
| 124 | if (res == 2) |
| 125 | { |
| 126 | std::swap(pos[1], pos[2]); |
| 127 | std::swap(pos[0], pos[1]); |
| 128 | } |
| 129 | else if (res == 3) |
| 130 | { |
| 131 | std::swap(pos[3], pos[2]); |
| 132 | std::swap(pos[4], pos[3]); |
| 133 | } |
| 134 | |
| 135 | pos += 5; |
| 136 | } |
| 137 | else |
| 138 | ++pos; |
| 139 | } |
| 140 | } |
| 141 | |
| 142 | |
| 143 | static void LZ4_copy8(void* dst, const void* src) |
| 144 | { |
| 145 | memcpy(dst,src,8); |
| 146 | } |
| 147 | |
| 148 | /* customized variant of memcpy, which can overwrite up to 8 bytes beyond dstEnd */ |
| 149 | static void LZ4_wildCopy(void* dstPtr, const void* srcPtr, void* dstEnd) |
| 150 | { |
| 151 | UInt8* d = (UInt8*)dstPtr; |
| 152 | const UInt8* s = (const UInt8*)srcPtr; |
| 153 | UInt8* const e = (UInt8*)dstEnd; |
| 154 | |
| 155 | do { LZ4_copy8(d,s); d+=8; s+=8; } while (d<e); |
| 156 | } |
| 157 | |
| 158 | |
| 159 | static UInt16 LZ4_read16(const void* memPtr) |
| 160 | { |
| 161 | UInt16 val; memcpy(&val, memPtr, sizeof(val)); return val; |
| 162 | } |
| 163 | |
| 164 | |
| 165 | static void LZ4_write32(void* memPtr, UInt32 value) |
| 166 | { |
| 167 | memcpy(memPtr, &value, sizeof(value)); |
| 168 | } |
| 169 | |
| 170 | |
| 171 | int LZ4_decompress_mutate( |
| 172 | char* const source, |
| 173 | char* const dest, |
| 174 | int outputSize) |
| 175 | { |
| 176 | pcg64 generator; |
| 177 | |
| 178 | /* Local Variables */ |
| 179 | UInt8* ip = (UInt8*) source; |
| 180 | |
| 181 | UInt8* op = (UInt8*) dest; |
| 182 | UInt8* const oend = op + outputSize; |
| 183 | UInt8* cpy; |
| 184 | |
| 185 | const unsigned dec32table[] = {0, 1, 2, 1, 4, 4, 4, 4}; |
| 186 | const int dec64table[] = {0, 0, 0, -1, 0, 1, 2, 3}; |
| 187 | |
| 188 | /* Main Loop : decode sequences */ |
| 189 | while (1) { |
| 190 | size_t length; |
| 191 | const UInt8* match; |
| 192 | size_t offset; |
| 193 | |
| 194 | /* get literal length */ |
| 195 | unsigned const token = *ip++; |
| 196 | if ((length=(token>>ML_BITS)) == RUN_MASK) { |
| 197 | unsigned s; |
| 198 | do { |
| 199 | s = *ip++; |
| 200 | length += s; |
| 201 | } while (s==255); |
| 202 | } |
| 203 | |
| 204 | /* copy literals */ |
| 205 | cpy = op+length; |
| 206 | if (cpy>oend-WILDCOPYLENGTH) |
| 207 | { |
| 208 | if (cpy != oend) goto _output_error; /* Error : block decoding must stop exactly there */ |
| 209 | mutate(generator, ip, length); |
| 210 | memcpy(op, ip, length); |
| 211 | ip += length; |
| 212 | op += length; |
| 213 | break; /* Necessarily EOF, due to parsing restrictions */ |
| 214 | } |
| 215 | mutate(generator, ip, cpy - op); |
| 216 | LZ4_wildCopy(op, ip, cpy); |
| 217 | ip += length; op = cpy; |
| 218 | |
| 219 | /* get offset */ |
| 220 | offset = LZ4_read16(ip); ip+=2; |
| 221 | match = op - offset; |
| 222 | LZ4_write32(op, (UInt32)offset); /* costs ~1%; silence an msan warning when offset==0 */ |
| 223 | |
| 224 | /* get matchlength */ |
| 225 | length = token & ML_MASK; |
| 226 | if (length == ML_MASK) { |
| 227 | unsigned s; |
| 228 | do { |
| 229 | s = *ip++; |
| 230 | length += s; |
| 231 | } while (s==255); |
| 232 | } |
| 233 | length += MINMATCH; |
| 234 | |
| 235 | /* copy match within block */ |
| 236 | cpy = op + length; |
| 237 | if (unlikely(offset<8)) { |
| 238 | const int dec64 = dec64table[offset]; |
| 239 | op[0] = match[0]; |
| 240 | op[1] = match[1]; |
| 241 | op[2] = match[2]; |
| 242 | op[3] = match[3]; |
| 243 | match += dec32table[offset]; |
| 244 | memcpy(op+4, match, 4); |
| 245 | match -= dec64; |
| 246 | } else { LZ4_copy8(op, match); match+=8; } |
| 247 | op += 8; |
| 248 | |
| 249 | if (unlikely(cpy>oend-12)) { |
| 250 | UInt8* const oCopyLimit = oend-(WILDCOPYLENGTH-1); |
| 251 | if (cpy > oend-LASTLITERALS) goto _output_error; /* Error : last LASTLITERALS bytes must be literals (uncompressed) */ |
| 252 | if (op < oCopyLimit) { |
| 253 | LZ4_wildCopy(op, match, oCopyLimit); |
| 254 | match += oCopyLimit - op; |
| 255 | op = oCopyLimit; |
| 256 | } |
| 257 | while (op<cpy) *op++ = *match++; |
| 258 | } else { |
| 259 | LZ4_copy8(op, match); |
| 260 | if (length>16) LZ4_wildCopy(op+8, match+8, cpy); |
| 261 | } |
| 262 | op=cpy; /* correction */ |
| 263 | } |
| 264 | |
| 265 | return (int) (((const char*)ip)-source); /* Nb of input bytes read */ |
| 266 | |
| 267 | /* Overflow error detected */ |
| 268 | _output_error: |
| 269 | return (int) (-(((const char*)ip)-source))-1; |
| 270 | } |
| 271 | |
| 272 | |
| 273 | namespace DB |
| 274 | { |
| 275 | |
| 276 | namespace ErrorCodes |
| 277 | { |
| 278 | extern const int UNKNOWN_COMPRESSION_METHOD; |
| 279 | extern const int TOO_LARGE_SIZE_COMPRESSED; |
| 280 | extern const int CANNOT_DECOMPRESS; |
| 281 | } |
| 282 | |
| 283 | class MutatingCompressedReadBufferBase |
| 284 | { |
| 285 | protected: |
| 286 | ReadBuffer * compressed_in; |
| 287 | |
| 288 | /// If 'compressed_in' buffer has whole compressed block - then use it. Otherwise copy parts of data to 'own_compressed_buffer'. |
| 289 | PODArray<char> own_compressed_buffer; |
| 290 | /// Points to memory, holding compressed block. |
| 291 | char * compressed_buffer = nullptr; |
| 292 | |
| 293 | size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum) |
| 294 | { |
| 295 | if (compressed_in->eof()) |
| 296 | return 0; |
| 297 | |
| 298 | CityHash_v1_0_2::uint128 checksum; |
| 299 | compressed_in->readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum)); |
| 300 | |
| 301 | own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE); |
| 302 | compressed_in->readStrict(&own_compressed_buffer[0], COMPRESSED_BLOCK_HEADER_SIZE); |
| 303 | |
| 304 | UInt8 method = own_compressed_buffer[0]; /// See CompressedWriteBuffer.h |
| 305 | |
| 306 | size_t & size_compressed = size_compressed_without_checksum; |
| 307 | |
| 308 | if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) || |
| 309 | method == static_cast<UInt8>(CompressionMethodByte::ZSTD) || |
| 310 | method == static_cast<UInt8>(CompressionMethodByte::NONE)) |
| 311 | { |
| 312 | size_compressed = unalignedLoad<UInt32>(&own_compressed_buffer[1]); |
| 313 | size_decompressed = unalignedLoad<UInt32>(&own_compressed_buffer[5]); |
| 314 | } |
| 315 | else |
| 316 | throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD); |
| 317 | |
| 318 | if (size_compressed > DBMS_MAX_COMPRESSED_SIZE) |
| 319 | throw Exception("Too large size_compressed. Most likely corrupted data." , ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); |
| 320 | |
| 321 | /// Is whole compressed block located in 'compressed_in' buffer? |
| 322 | if (compressed_in->offset() >= COMPRESSED_BLOCK_HEADER_SIZE && |
| 323 | compressed_in->position() + size_compressed - COMPRESSED_BLOCK_HEADER_SIZE <= compressed_in->buffer().end()) |
| 324 | { |
| 325 | compressed_in->position() -= COMPRESSED_BLOCK_HEADER_SIZE; |
| 326 | compressed_buffer = compressed_in->position(); |
| 327 | compressed_in->position() += size_compressed; |
| 328 | } |
| 329 | else |
| 330 | { |
| 331 | own_compressed_buffer.resize(size_compressed); |
| 332 | compressed_buffer = &own_compressed_buffer[0]; |
| 333 | compressed_in->readStrict(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, size_compressed - COMPRESSED_BLOCK_HEADER_SIZE); |
| 334 | } |
| 335 | |
| 336 | return size_compressed + sizeof(checksum); |
| 337 | } |
| 338 | |
| 339 | void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum) |
| 340 | { |
| 341 | UInt8 method = compressed_buffer[0]; /// See CompressedWriteBuffer.h |
| 342 | |
| 343 | if (method == static_cast<UInt8>(CompressionMethodByte::LZ4)) |
| 344 | { |
| 345 | if (LZ4_decompress_mutate(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, to, size_decompressed) < 0) |
| 346 | throw Exception("Cannot LZ4_decompress_fast" , ErrorCodes::CANNOT_DECOMPRESS); |
| 347 | } |
| 348 | else |
| 349 | throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD); |
| 350 | } |
| 351 | |
| 352 | public: |
| 353 | /// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'. |
| 354 | MutatingCompressedReadBufferBase(ReadBuffer * in = nullptr) |
| 355 | : compressed_in(in), own_compressed_buffer(COMPRESSED_BLOCK_HEADER_SIZE) |
| 356 | { |
| 357 | } |
| 358 | }; |
| 359 | |
| 360 | |
| 361 | class MutatingCompressedReadBuffer : public MutatingCompressedReadBufferBase, public BufferWithOwnMemory<ReadBuffer> |
| 362 | { |
| 363 | private: |
| 364 | size_t size_compressed = 0; |
| 365 | |
| 366 | bool nextImpl() override |
| 367 | { |
| 368 | size_t size_decompressed; |
| 369 | size_t size_compressed_without_checksum; |
| 370 | size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum); |
| 371 | if (!size_compressed) |
| 372 | return false; |
| 373 | |
| 374 | memory.resize(size_decompressed); |
| 375 | working_buffer = Buffer(&memory[0], &memory[size_decompressed]); |
| 376 | |
| 377 | decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum); |
| 378 | |
| 379 | return true; |
| 380 | } |
| 381 | |
| 382 | public: |
| 383 | MutatingCompressedReadBuffer(ReadBuffer & in_) |
| 384 | : MutatingCompressedReadBufferBase(&in_), BufferWithOwnMemory<ReadBuffer>(0) |
| 385 | { |
| 386 | } |
| 387 | }; |
| 388 | |
| 389 | } |
| 390 | |
| 391 | |
| 392 | int main(int, char **) |
| 393 | try |
| 394 | { |
| 395 | DB::ReadBufferFromFileDescriptor in(STDIN_FILENO); |
| 396 | DB::MutatingCompressedReadBuffer mutating_in(in); |
| 397 | DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO); |
| 398 | |
| 399 | DB::copyData(mutating_in, out); |
| 400 | |
| 401 | return 0; |
| 402 | } |
| 403 | catch (...) |
| 404 | { |
| 405 | std::cerr << DB::getCurrentExceptionMessage(true); |
| 406 | return DB::getCurrentExceptionCode(); |
| 407 | } |
| 408 | |