| 1 | #pragma once |
| 2 | |
| 3 | #include <memory> |
| 4 | #include <shared_mutex> |
| 5 | |
| 6 | #include <Common/LRUCache.h> |
| 7 | #include <Common/filesystemHelpers.h> |
| 8 | #include <Core/Block.h> |
| 9 | #include <Core/SortDescription.h> |
| 10 | #include <Interpreters/IJoin.h> |
| 11 | #include <DataStreams/SizeLimits.h> |
| 12 | |
| 13 | namespace DB |
| 14 | { |
| 15 | |
| 16 | class AnalyzedJoin; |
| 17 | class MergeJoinCursor; |
| 18 | struct MergeJoinEqualRange; |
| 19 | |
| 20 | struct MiniLSM |
| 21 | { |
| 22 | using SortedFiles = std::vector<std::unique_ptr<TemporaryFile>>; |
| 23 | |
| 24 | const String & path; |
| 25 | const Block & sample_block; |
| 26 | const SortDescription & sort_description; |
| 27 | const size_t rows_in_block; |
| 28 | const size_t max_size; |
| 29 | std::vector<SortedFiles> sorted_files; |
| 30 | |
| 31 | MiniLSM(const String & path_, const Block & sample_block_, const SortDescription & description, |
| 32 | size_t rows_in_block_, size_t max_size_ = 16) |
| 33 | : path(path_) |
| 34 | , sample_block(sample_block_) |
| 35 | , sort_description(description) |
| 36 | , rows_in_block(rows_in_block_) |
| 37 | , max_size(max_size_) |
| 38 | {} |
| 39 | |
| 40 | void insert(const BlocksList & blocks); |
| 41 | void merge(std::function<void(const Block &)> callback = [](const Block &){}); |
| 42 | }; |
| 43 | |
| 44 | |
| 45 | class MergeJoin : public IJoin |
| 46 | { |
| 47 | public: |
| 48 | MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block); |
| 49 | |
| 50 | bool addJoinedBlock(const Block & block) override; |
| 51 | void joinBlock(Block &) override; |
| 52 | void joinTotals(Block &) const override; |
| 53 | void setTotals(const Block &) override; |
| 54 | bool hasTotals() const override { return totals; } |
| 55 | size_t getTotalRowCount() const override { return right_blocks_row_count; } |
| 56 | |
| 57 | private: |
| 58 | /// There're two size limits for right-hand table: max_rows_in_join, max_bytes_in_join. |
| 59 | /// max_bytes is prefered. If it isn't set we aproximate it as (max_rows * bytes/row). |
| 60 | struct BlockByteWeight |
| 61 | { |
| 62 | size_t operator()(const Block & block) const { return block.bytes(); } |
| 63 | }; |
| 64 | |
| 65 | using Cache = LRUCache<size_t, Block, std::hash<size_t>, BlockByteWeight>; |
| 66 | |
| 67 | mutable std::shared_mutex rwlock; |
| 68 | std::shared_ptr<AnalyzedJoin> table_join; |
| 69 | SizeLimits size_limits; |
| 70 | SortDescription left_sort_description; |
| 71 | SortDescription right_sort_description; |
| 72 | SortDescription left_merge_description; |
| 73 | SortDescription right_merge_description; |
| 74 | Block right_sample_block; |
| 75 | Block right_table_keys; |
| 76 | Block right_columns_to_add; |
| 77 | BlocksList right_blocks; |
| 78 | Blocks min_max_right_blocks; |
| 79 | std::unique_ptr<Cache> cached_right_blocks; |
| 80 | std::vector<std::shared_ptr<Block>> loaded_right_blocks; |
| 81 | std::unique_ptr<MiniLSM> lsm; |
| 82 | MiniLSM::SortedFiles flushed_right_blocks; |
| 83 | Block totals; |
| 84 | size_t right_blocks_row_count = 0; |
| 85 | size_t right_blocks_bytes = 0; |
| 86 | bool is_in_memory = true; |
| 87 | const bool nullable_right_side; |
| 88 | const bool is_all; |
| 89 | const bool is_inner; |
| 90 | const bool is_left; |
| 91 | const bool skip_not_intersected; |
| 92 | const size_t max_rows_in_right_block; |
| 93 | |
| 94 | void changeLeftColumns(Block & block, MutableColumns && columns); |
| 95 | void addRightColumns(Block & block, MutableColumns && columns); |
| 96 | |
| 97 | void mergeRightBlocks(); |
| 98 | |
| 99 | template <bool in_memory> |
| 100 | size_t rightBlocksCount(); |
| 101 | template <bool in_memory> |
| 102 | void joinSortedBlock(Block & block); |
| 103 | template <bool in_memory> |
| 104 | std::shared_ptr<Block> loadRightBlock(size_t pos); |
| 105 | |
| 106 | void leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, |
| 107 | MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail); |
| 108 | void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, |
| 109 | MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail); |
| 110 | |
| 111 | bool saveRightBlock(Block && block); |
| 112 | void flushRightBlocks(); |
| 113 | |
| 114 | void mergeInMemoryRightBlocks(); |
| 115 | void mergeFlushedRightBlocks(); |
| 116 | |
| 117 | void clearRightBlocksList() |
| 118 | { |
| 119 | right_blocks.clear(); |
| 120 | right_blocks_row_count = 0; |
| 121 | right_blocks_bytes = 0; |
| 122 | } |
| 123 | |
| 124 | void countBlockSize(const Block & block) |
| 125 | { |
| 126 | right_blocks_row_count += block.rows(); |
| 127 | right_blocks_bytes += block.bytes(); |
| 128 | } |
| 129 | }; |
| 130 | |
| 131 | } |
| 132 | |