| 1 | #include "duckdb/execution/operator/persistent/physical_batch_insert.hpp" |
| 2 | |
| 3 | #include "duckdb/parallel/thread_context.hpp" |
| 4 | #include "duckdb/storage/data_table.hpp" |
| 5 | #include "duckdb/storage/table/row_group_collection.hpp" |
| 6 | #include "duckdb/storage/table_io_manager.hpp" |
| 7 | #include "duckdb/transaction/local_storage.hpp" |
| 8 | #include "duckdb/catalog/catalog_entry/duck_table_entry.hpp" |
| 9 | #include "duckdb/storage/table/append_state.hpp" |
| 10 | #include "duckdb/storage/table/scan_state.hpp" |
| 11 | |
| 12 | namespace duckdb { |
| 13 | |
| 14 | PhysicalBatchInsert::PhysicalBatchInsert(vector<LogicalType> types, TableCatalogEntry &table, |
| 15 | physical_index_vector_t<idx_t> column_index_map, |
| 16 | vector<unique_ptr<Expression>> bound_defaults, idx_t estimated_cardinality) |
| 17 | : PhysicalOperator(PhysicalOperatorType::BATCH_INSERT, std::move(types), estimated_cardinality), |
| 18 | column_index_map(std::move(column_index_map)), insert_table(&table), insert_types(table.GetTypes()), |
| 19 | bound_defaults(std::move(bound_defaults)) { |
| 20 | } |
| 21 | |
| 22 | PhysicalBatchInsert::PhysicalBatchInsert(LogicalOperator &op, SchemaCatalogEntry &schema, |
| 23 | unique_ptr<BoundCreateTableInfo> info_p, idx_t estimated_cardinality) |
| 24 | : PhysicalOperator(PhysicalOperatorType::BATCH_CREATE_TABLE_AS, op.types, estimated_cardinality), |
| 25 | insert_table(nullptr), schema(&schema), info(std::move(info_p)) { |
| 26 | PhysicalInsert::GetInsertInfo(info: *info, insert_types, bound_defaults); |
| 27 | } |
| 28 | |
| 29 | //===--------------------------------------------------------------------===// |
| 30 | // Sink |
| 31 | //===--------------------------------------------------------------------===// |
| 32 | |
| 33 | class CollectionMerger { |
| 34 | public: |
| 35 | explicit CollectionMerger(ClientContext &context) : context(context) { |
| 36 | } |
| 37 | |
| 38 | ClientContext &context; |
| 39 | vector<unique_ptr<RowGroupCollection>> current_collections; |
| 40 | |
| 41 | public: |
| 42 | void AddCollection(unique_ptr<RowGroupCollection> collection) { |
| 43 | current_collections.push_back(x: std::move(collection)); |
| 44 | } |
| 45 | |
| 46 | bool Empty() { |
| 47 | return current_collections.empty(); |
| 48 | } |
| 49 | |
| 50 | unique_ptr<RowGroupCollection> Flush(OptimisticDataWriter &writer) { |
| 51 | if (Empty()) { |
| 52 | return nullptr; |
| 53 | } |
| 54 | unique_ptr<RowGroupCollection> new_collection = std::move(current_collections[0]); |
| 55 | if (current_collections.size() > 1) { |
| 56 | // we have gathered multiple collections: create one big collection and merge that |
| 57 | auto &types = new_collection->GetTypes(); |
| 58 | TableAppendState append_state; |
| 59 | new_collection->InitializeAppend(state&: append_state); |
| 60 | |
| 61 | DataChunk scan_chunk; |
| 62 | scan_chunk.Initialize(context, types); |
| 63 | |
| 64 | vector<column_t> column_ids; |
| 65 | for (idx_t i = 0; i < types.size(); i++) { |
| 66 | column_ids.push_back(x: i); |
| 67 | } |
| 68 | for (auto &collection : current_collections) { |
| 69 | if (!collection) { |
| 70 | continue; |
| 71 | } |
| 72 | TableScanState scan_state; |
| 73 | scan_state.Initialize(column_ids); |
| 74 | collection->InitializeScan(state&: scan_state.local_state, column_ids, table_filters: nullptr); |
| 75 | |
| 76 | while (true) { |
| 77 | scan_chunk.Reset(); |
| 78 | scan_state.local_state.ScanCommitted(result&: scan_chunk, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS); |
| 79 | if (scan_chunk.size() == 0) { |
| 80 | break; |
| 81 | } |
| 82 | auto new_row_group = new_collection->Append(chunk&: scan_chunk, state&: append_state); |
| 83 | if (new_row_group) { |
| 84 | writer.WriteNewRowGroup(row_groups&: *new_collection); |
| 85 | } |
| 86 | } |
| 87 | } |
| 88 | new_collection->FinalizeAppend(transaction: TransactionData(0, 0), state&: append_state); |
| 89 | writer.WriteLastRowGroup(row_groups&: *new_collection); |
| 90 | } |
| 91 | current_collections.clear(); |
| 92 | return new_collection; |
| 93 | } |
| 94 | }; |
| 95 | |
| 96 | enum class RowGroupBatchType : uint8_t { FLUSHED, NOT_FLUSHED }; |
| 97 | struct RowGroupBatchEntry { |
| 98 | RowGroupBatchEntry(idx_t batch_idx, unique_ptr<RowGroupCollection> collection_p, RowGroupBatchType type) |
| 99 | : batch_idx(batch_idx), total_rows(collection_p->GetTotalRows()), collection(std::move(collection_p)), |
| 100 | type(type) { |
| 101 | } |
| 102 | |
| 103 | idx_t batch_idx; |
| 104 | idx_t total_rows; |
| 105 | unique_ptr<RowGroupCollection> collection; |
| 106 | RowGroupBatchType type; |
| 107 | }; |
| 108 | |
| 109 | class BatchInsertGlobalState : public GlobalSinkState { |
| 110 | public: |
| 111 | static constexpr const idx_t BATCH_FLUSH_THRESHOLD = LocalStorage::MERGE_THRESHOLD * 3; |
| 112 | |
| 113 | public: |
| 114 | explicit BatchInsertGlobalState(DuckTableEntry &table) : table(table), insert_count(0) { |
| 115 | } |
| 116 | |
| 117 | mutex lock; |
| 118 | DuckTableEntry &table; |
| 119 | idx_t insert_count; |
| 120 | vector<RowGroupBatchEntry> collections; |
| 121 | idx_t next_start = 0; |
| 122 | |
| 123 | void FindMergeCollections(idx_t min_batch_index, optional_idx &merged_batch_index, |
| 124 | vector<unique_ptr<RowGroupCollection>> &result) { |
| 125 | bool merge = false; |
| 126 | idx_t start_index = next_start; |
| 127 | idx_t current_idx; |
| 128 | idx_t total_count = 0; |
| 129 | for (current_idx = start_index; current_idx < collections.size(); current_idx++) { |
| 130 | auto &entry = collections[current_idx]; |
| 131 | if (entry.batch_idx >= min_batch_index) { |
| 132 | // this entry is AFTER the min_batch_index |
| 133 | // we might still find new entries! |
| 134 | break; |
| 135 | } |
| 136 | if (entry.type == RowGroupBatchType::FLUSHED) { |
| 137 | // already flushed: cannot flush anything here |
| 138 | if (total_count > 0) { |
| 139 | merge = true; |
| 140 | break; |
| 141 | } |
| 142 | start_index = current_idx + 1; |
| 143 | if (start_index > next_start) { |
| 144 | // avoid checking this segment again in the future |
| 145 | next_start = start_index; |
| 146 | } |
| 147 | total_count = 0; |
| 148 | continue; |
| 149 | } |
| 150 | // not flushed - add to set of indexes to flush |
| 151 | total_count += entry.total_rows; |
| 152 | if (total_count >= BATCH_FLUSH_THRESHOLD) { |
| 153 | merge = true; |
| 154 | break; |
| 155 | } |
| 156 | } |
| 157 | if (merge && total_count > 0) { |
| 158 | D_ASSERT(current_idx > start_index); |
| 159 | merged_batch_index = collections[start_index].batch_idx; |
| 160 | for (idx_t idx = start_index; idx < current_idx; idx++) { |
| 161 | auto &entry = collections[idx]; |
| 162 | if (!entry.collection || entry.type == RowGroupBatchType::FLUSHED) { |
| 163 | throw InternalException("Adding a row group collection that should not be flushed" ); |
| 164 | } |
| 165 | result.push_back(x: std::move(entry.collection)); |
| 166 | entry.total_rows = total_count; |
| 167 | entry.type = RowGroupBatchType::FLUSHED; |
| 168 | } |
| 169 | if (start_index + 1 < current_idx) { |
| 170 | // erase all entries except the first one |
| 171 | collections.erase(first: collections.begin() + start_index + 1, last: collections.begin() + current_idx); |
| 172 | } |
| 173 | } |
| 174 | } |
| 175 | |
| 176 | unique_ptr<RowGroupCollection> MergeCollections(ClientContext &context, |
| 177 | vector<unique_ptr<RowGroupCollection>> merge_collections, |
| 178 | OptimisticDataWriter &writer) { |
| 179 | CollectionMerger merger(context); |
| 180 | for (auto &collection : merge_collections) { |
| 181 | merger.AddCollection(collection: std::move(collection)); |
| 182 | } |
| 183 | return merger.Flush(writer); |
| 184 | } |
| 185 | |
| 186 | void AddCollection(ClientContext &context, idx_t batch_index, idx_t min_batch_index, |
| 187 | unique_ptr<RowGroupCollection> current_collection, |
| 188 | optional_ptr<OptimisticDataWriter> writer = nullptr, |
| 189 | optional_ptr<bool> written_to_disk = nullptr) { |
| 190 | if (batch_index < min_batch_index) { |
| 191 | throw InternalException( |
| 192 | "Batch index of the added collection (%llu) is smaller than the min batch index (%llu)" , batch_index, |
| 193 | min_batch_index); |
| 194 | } |
| 195 | auto new_count = current_collection->GetTotalRows(); |
| 196 | auto batch_type = |
| 197 | new_count < RowGroup::ROW_GROUP_SIZE ? RowGroupBatchType::NOT_FLUSHED : RowGroupBatchType::FLUSHED; |
| 198 | if (batch_type == RowGroupBatchType::FLUSHED && writer) { |
| 199 | writer->WriteLastRowGroup(row_groups&: *current_collection); |
| 200 | } |
| 201 | optional_idx merged_batch_index; |
| 202 | vector<unique_ptr<RowGroupCollection>> merge_collections; |
| 203 | { |
| 204 | lock_guard<mutex> l(lock); |
| 205 | insert_count += new_count; |
| 206 | |
| 207 | // add the collection to the batch index |
| 208 | RowGroupBatchEntry new_entry(batch_index, std::move(current_collection), batch_type); |
| 209 | |
| 210 | auto it = std::lower_bound( |
| 211 | first: collections.begin(), last: collections.end(), val: new_entry, |
| 212 | comp: [&](const RowGroupBatchEntry &a, const RowGroupBatchEntry &b) { return a.batch_idx < b.batch_idx; }); |
| 213 | if (it != collections.end() && it->batch_idx == new_entry.batch_idx) { |
| 214 | throw InternalException( |
| 215 | "PhysicalBatchInsert::AddCollection error: batch index %d is present in multiple " |
| 216 | "collections. This occurs when " |
| 217 | "batch indexes are not uniquely distributed over threads" , |
| 218 | batch_index); |
| 219 | } |
| 220 | collections.insert(position: it, x: std::move(new_entry)); |
| 221 | if (writer) { |
| 222 | FindMergeCollections(min_batch_index, merged_batch_index, result&: merge_collections); |
| 223 | } |
| 224 | } |
| 225 | if (!merge_collections.empty()) { |
| 226 | // merge together the collections |
| 227 | D_ASSERT(writer); |
| 228 | auto final_collection = MergeCollections(context, merge_collections: std::move(merge_collections), writer&: *writer); |
| 229 | if (written_to_disk) { |
| 230 | *written_to_disk = true; |
| 231 | } |
| 232 | // add the merged-together collection to the set of batch indexes |
| 233 | { |
| 234 | lock_guard<mutex> l(lock); |
| 235 | RowGroupBatchEntry new_entry(merged_batch_index.GetIndex(), std::move(final_collection), |
| 236 | RowGroupBatchType::FLUSHED); |
| 237 | auto it = std::lower_bound(first: collections.begin(), last: collections.end(), val: new_entry, |
| 238 | comp: [&](const RowGroupBatchEntry &a, const RowGroupBatchEntry &b) { |
| 239 | return a.batch_idx < b.batch_idx; |
| 240 | }); |
| 241 | if (it->batch_idx != merged_batch_index.GetIndex()) { |
| 242 | throw InternalException("Merged batch index was no longer present in collection" ); |
| 243 | } |
| 244 | it->collection = std::move(new_entry.collection); |
| 245 | } |
| 246 | } |
| 247 | } |
| 248 | }; |
| 249 | |
| 250 | class BatchInsertLocalState : public LocalSinkState { |
| 251 | public: |
| 252 | BatchInsertLocalState(ClientContext &context, const vector<LogicalType> &types, |
| 253 | const vector<unique_ptr<Expression>> &bound_defaults) |
| 254 | : default_executor(context, bound_defaults), written_to_disk(false) { |
| 255 | insert_chunk.Initialize(allocator&: Allocator::Get(context), types); |
| 256 | } |
| 257 | |
| 258 | DataChunk insert_chunk; |
| 259 | ExpressionExecutor default_executor; |
| 260 | idx_t current_index; |
| 261 | TableAppendState current_append_state; |
| 262 | unique_ptr<RowGroupCollection> current_collection; |
| 263 | optional_ptr<OptimisticDataWriter> writer; |
| 264 | bool written_to_disk; |
| 265 | |
| 266 | void CreateNewCollection(DuckTableEntry &table, const vector<LogicalType> &insert_types) { |
| 267 | auto &table_info = table.GetStorage().info; |
| 268 | auto &block_manager = TableIOManager::Get(table&: table.GetStorage()).GetBlockManagerForRowData(); |
| 269 | current_collection = make_uniq<RowGroupCollection>(args&: table_info, args&: block_manager, args: insert_types, args: MAX_ROW_ID); |
| 270 | current_collection->InitializeEmpty(); |
| 271 | current_collection->InitializeAppend(state&: current_append_state); |
| 272 | written_to_disk = false; |
| 273 | } |
| 274 | }; |
| 275 | |
| 276 | unique_ptr<GlobalSinkState> PhysicalBatchInsert::GetGlobalSinkState(ClientContext &context) const { |
| 277 | optional_ptr<TableCatalogEntry> table; |
| 278 | if (info) { |
| 279 | // CREATE TABLE AS |
| 280 | D_ASSERT(!insert_table); |
| 281 | auto &catalog = schema->catalog; |
| 282 | auto created_table = catalog.CreateTable(transaction: catalog.GetCatalogTransaction(context), schema&: *schema.get_mutable(), info&: *info); |
| 283 | table = &created_table->Cast<TableCatalogEntry>(); |
| 284 | } else { |
| 285 | D_ASSERT(insert_table); |
| 286 | D_ASSERT(insert_table->IsDuckTable()); |
| 287 | table = insert_table.get_mutable(); |
| 288 | } |
| 289 | auto result = make_uniq<BatchInsertGlobalState>(args&: table->Cast<DuckTableEntry>()); |
| 290 | return std::move(result); |
| 291 | } |
| 292 | |
| 293 | unique_ptr<LocalSinkState> PhysicalBatchInsert::GetLocalSinkState(ExecutionContext &context) const { |
| 294 | return make_uniq<BatchInsertLocalState>(args&: context.client, args: insert_types, args: bound_defaults); |
| 295 | } |
| 296 | |
| 297 | void PhysicalBatchInsert::NextBatch(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const { |
| 298 | auto &gstate = state.Cast<BatchInsertGlobalState>(); |
| 299 | auto &lstate = lstate_p.Cast<BatchInsertLocalState>(); |
| 300 | |
| 301 | auto &table = gstate.table; |
| 302 | auto batch_index = lstate.partition_info.batch_index.GetIndex(); |
| 303 | if (lstate.current_collection) { |
| 304 | if (lstate.current_index == batch_index) { |
| 305 | throw InternalException("NextBatch called with the same batch index?" ); |
| 306 | } |
| 307 | // batch index has changed: move the old collection to the global state and create a new collection |
| 308 | TransactionData tdata(0, 0); |
| 309 | lstate.current_collection->FinalizeAppend(transaction: tdata, state&: lstate.current_append_state); |
| 310 | gstate.AddCollection(context&: context.client, batch_index: lstate.current_index, min_batch_index: lstate.partition_info.min_batch_index.GetIndex(), |
| 311 | current_collection: std::move(lstate.current_collection), writer: lstate.writer, written_to_disk: &lstate.written_to_disk); |
| 312 | lstate.CreateNewCollection(table, insert_types); |
| 313 | } |
| 314 | lstate.current_index = batch_index; |
| 315 | } |
| 316 | |
| 317 | SinkResultType PhysicalBatchInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
| 318 | auto &gstate = input.global_state.Cast<BatchInsertGlobalState>(); |
| 319 | auto &lstate = input.local_state.Cast<BatchInsertLocalState>(); |
| 320 | |
| 321 | auto &table = gstate.table; |
| 322 | PhysicalInsert::ResolveDefaults(table, chunk, column_index_map, defaults_executor&: lstate.default_executor, result&: lstate.insert_chunk); |
| 323 | |
| 324 | auto batch_index = lstate.partition_info.batch_index.GetIndex(); |
| 325 | if (!lstate.current_collection) { |
| 326 | lock_guard<mutex> l(gstate.lock); |
| 327 | // no collection yet: create a new one |
| 328 | lstate.CreateNewCollection(table, insert_types); |
| 329 | lstate.writer = &table.GetStorage().CreateOptimisticWriter(context&: context.client); |
| 330 | } else if (lstate.current_index != batch_index) { |
| 331 | throw InternalException("Current batch differs from batch - but NextBatch was not called!?" ); |
| 332 | } |
| 333 | lstate.current_index = batch_index; |
| 334 | |
| 335 | table.GetStorage().VerifyAppendConstraints(table, context&: context.client, chunk&: lstate.insert_chunk); |
| 336 | |
| 337 | auto new_row_group = lstate.current_collection->Append(chunk&: lstate.insert_chunk, state&: lstate.current_append_state); |
| 338 | if (new_row_group) { |
| 339 | // we have already written to disk - flush the next row group as well |
| 340 | lstate.writer->WriteNewRowGroup(row_groups&: *lstate.current_collection); |
| 341 | lstate.written_to_disk = true; |
| 342 | } |
| 343 | return SinkResultType::NEED_MORE_INPUT; |
| 344 | } |
| 345 | |
| 346 | void PhysicalBatchInsert::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, |
| 347 | LocalSinkState &lstate_p) const { |
| 348 | auto &gstate = gstate_p.Cast<BatchInsertGlobalState>(); |
| 349 | auto &lstate = lstate_p.Cast<BatchInsertLocalState>(); |
| 350 | auto &client_profiler = QueryProfiler::Get(context&: context.client); |
| 351 | context.thread.profiler.Flush(phys_op: *this, expression_executor&: lstate.default_executor, name: "default_executor" , id: 1); |
| 352 | client_profiler.Flush(profiler&: context.thread.profiler); |
| 353 | |
| 354 | if (!lstate.current_collection) { |
| 355 | return; |
| 356 | } |
| 357 | |
| 358 | if (lstate.current_collection->GetTotalRows() > 0) { |
| 359 | TransactionData tdata(0, 0); |
| 360 | lstate.current_collection->FinalizeAppend(transaction: tdata, state&: lstate.current_append_state); |
| 361 | gstate.AddCollection(context&: context.client, batch_index: lstate.current_index, min_batch_index: lstate.partition_info.min_batch_index.GetIndex(), |
| 362 | current_collection: std::move(lstate.current_collection)); |
| 363 | } |
| 364 | { |
| 365 | lock_guard<mutex> l(gstate.lock); |
| 366 | gstate.table.GetStorage().FinalizeOptimisticWriter(context&: context.client, writer&: *lstate.writer); |
| 367 | } |
| 368 | } |
| 369 | |
| 370 | SinkFinalizeType PhysicalBatchInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
| 371 | GlobalSinkState &gstate_p) const { |
| 372 | auto &gstate = gstate_p.Cast<BatchInsertGlobalState>(); |
| 373 | |
| 374 | // in the finalize, do a final pass over all of the collections we created and try to merge smaller collections |
| 375 | // together |
| 376 | vector<unique_ptr<CollectionMerger>> mergers; |
| 377 | unique_ptr<CollectionMerger> current_merger; |
| 378 | |
| 379 | auto &storage = gstate.table.GetStorage(); |
| 380 | for (auto &entry : gstate.collections) { |
| 381 | if (entry.type == RowGroupBatchType::NOT_FLUSHED) { |
| 382 | // this collection has not been flushed: add it to the merge set |
| 383 | if (!current_merger) { |
| 384 | current_merger = make_uniq<CollectionMerger>(args&: context); |
| 385 | } |
| 386 | current_merger->AddCollection(collection: std::move(entry.collection)); |
| 387 | } else { |
| 388 | // this collection has been flushed: it does not need to be merged |
| 389 | // create a separate collection merger only for this entry |
| 390 | if (current_merger) { |
| 391 | // we have small collections remaining: flush them |
| 392 | mergers.push_back(x: std::move(current_merger)); |
| 393 | current_merger.reset(); |
| 394 | } |
| 395 | auto larger_merger = make_uniq<CollectionMerger>(args&: context); |
| 396 | larger_merger->AddCollection(collection: std::move(entry.collection)); |
| 397 | mergers.push_back(x: std::move(larger_merger)); |
| 398 | } |
| 399 | } |
| 400 | if (current_merger) { |
| 401 | mergers.push_back(x: std::move(current_merger)); |
| 402 | } |
| 403 | |
| 404 | // now that we have created all of the mergers, perform the actual merging |
| 405 | vector<unique_ptr<RowGroupCollection>> final_collections; |
| 406 | final_collections.reserve(n: mergers.size()); |
| 407 | auto &writer = storage.CreateOptimisticWriter(context); |
| 408 | for (auto &merger : mergers) { |
| 409 | final_collections.push_back(x: merger->Flush(writer)); |
| 410 | } |
| 411 | storage.FinalizeOptimisticWriter(context, writer); |
| 412 | |
| 413 | // finally, merge the row groups into the local storage |
| 414 | for (auto &collection : final_collections) { |
| 415 | storage.LocalMerge(context, collection&: *collection); |
| 416 | } |
| 417 | return SinkFinalizeType::READY; |
| 418 | } |
| 419 | |
| 420 | //===--------------------------------------------------------------------===// |
| 421 | // Source |
| 422 | //===--------------------------------------------------------------------===// |
| 423 | |
| 424 | SourceResultType PhysicalBatchInsert::GetData(ExecutionContext &context, DataChunk &chunk, |
| 425 | OperatorSourceInput &input) const { |
| 426 | auto &insert_gstate = sink_state->Cast<BatchInsertGlobalState>(); |
| 427 | |
| 428 | chunk.SetCardinality(1); |
| 429 | chunk.SetValue(col_idx: 0, index: 0, val: Value::BIGINT(value: insert_gstate.insert_count)); |
| 430 | |
| 431 | return SourceResultType::FINISHED; |
| 432 | } |
| 433 | |
| 434 | } // namespace duckdb |
| 435 | |