| 1 | #include "duckdb/execution/operator/persistent/physical_insert.hpp" |
| 2 | #include "duckdb/parallel/thread_context.hpp" |
| 3 | #include "duckdb/catalog/catalog_entry/duck_table_entry.hpp" |
| 4 | #include "duckdb/common/types/column/column_data_collection.hpp" |
| 5 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 6 | #include "duckdb/execution/expression_executor.hpp" |
| 7 | #include "duckdb/storage/data_table.hpp" |
| 8 | #include "duckdb/main/client_context.hpp" |
| 9 | #include "duckdb/parser/parsed_data/create_table_info.hpp" |
| 10 | #include "duckdb/planner/expression/bound_constant_expression.hpp" |
| 11 | #include "duckdb/storage/table_io_manager.hpp" |
| 12 | #include "duckdb/transaction/local_storage.hpp" |
| 13 | #include "duckdb/parser/statement/insert_statement.hpp" |
| 14 | #include "duckdb/parser/statement/update_statement.hpp" |
| 15 | #include "duckdb/storage/table/scan_state.hpp" |
| 16 | #include "duckdb/common/types/conflict_manager.hpp" |
| 17 | #include "duckdb/execution/index/art/art.hpp" |
| 18 | #include "duckdb/transaction/duck_transaction.hpp" |
| 19 | #include "duckdb/storage/table/append_state.hpp" |
| 20 | |
| 21 | namespace duckdb { |
| 22 | |
| 23 | PhysicalInsert::PhysicalInsert(vector<LogicalType> types_p, TableCatalogEntry &table, |
| 24 | physical_index_vector_t<idx_t> column_index_map, |
| 25 | vector<unique_ptr<Expression>> bound_defaults, |
| 26 | vector<unique_ptr<Expression>> set_expressions, vector<PhysicalIndex> set_columns, |
| 27 | vector<LogicalType> set_types, idx_t estimated_cardinality, bool return_chunk, |
| 28 | bool parallel, OnConflictAction action_type, |
| 29 | unique_ptr<Expression> on_conflict_condition_p, |
| 30 | unique_ptr<Expression> do_update_condition_p, unordered_set<column_t> conflict_target_p, |
| 31 | vector<column_t> columns_to_fetch_p) |
| 32 | : PhysicalOperator(PhysicalOperatorType::INSERT, std::move(types_p), estimated_cardinality), |
| 33 | column_index_map(std::move(column_index_map)), insert_table(&table), insert_types(table.GetTypes()), |
| 34 | bound_defaults(std::move(bound_defaults)), return_chunk(return_chunk), parallel(parallel), |
| 35 | action_type(action_type), set_expressions(std::move(set_expressions)), set_columns(std::move(set_columns)), |
| 36 | set_types(std::move(set_types)), on_conflict_condition(std::move(on_conflict_condition_p)), |
| 37 | do_update_condition(std::move(do_update_condition_p)), conflict_target(std::move(conflict_target_p)), |
| 38 | columns_to_fetch(std::move(columns_to_fetch_p)) { |
| 39 | |
| 40 | if (action_type == OnConflictAction::THROW) { |
| 41 | return; |
| 42 | } |
| 43 | |
| 44 | D_ASSERT(set_expressions.size() == set_columns.size()); |
| 45 | |
| 46 | // One or more columns are referenced from the existing table, |
| 47 | // we use the 'insert_types' to figure out which types these columns have |
| 48 | types_to_fetch = vector<LogicalType>(columns_to_fetch.size(), LogicalType::SQLNULL); |
| 49 | for (idx_t i = 0; i < columns_to_fetch.size(); i++) { |
| 50 | auto &id = columns_to_fetch[i]; |
| 51 | D_ASSERT(id < insert_types.size()); |
| 52 | types_to_fetch[i] = insert_types[id]; |
| 53 | } |
| 54 | } |
| 55 | |
| 56 | PhysicalInsert::PhysicalInsert(LogicalOperator &op, SchemaCatalogEntry &schema, unique_ptr<BoundCreateTableInfo> info_p, |
| 57 | idx_t estimated_cardinality, bool parallel) |
| 58 | : PhysicalOperator(PhysicalOperatorType::CREATE_TABLE_AS, op.types, estimated_cardinality), insert_table(nullptr), |
| 59 | return_chunk(false), schema(&schema), info(std::move(info_p)), parallel(parallel), |
| 60 | action_type(OnConflictAction::THROW) { |
| 61 | GetInsertInfo(info: *info, insert_types, bound_defaults); |
| 62 | } |
| 63 | |
| 64 | void PhysicalInsert::GetInsertInfo(const BoundCreateTableInfo &info, vector<LogicalType> &insert_types, |
| 65 | vector<unique_ptr<Expression>> &bound_defaults) { |
| 66 | auto &create_info = info.base->Cast<CreateTableInfo>(); |
| 67 | for (auto &col : create_info.columns.Physical()) { |
| 68 | insert_types.push_back(x: col.GetType()); |
| 69 | bound_defaults.push_back(x: make_uniq<BoundConstantExpression>(args: Value(col.GetType()))); |
| 70 | } |
| 71 | } |
| 72 | |
| 73 | //===--------------------------------------------------------------------===// |
| 74 | // Sink |
| 75 | //===--------------------------------------------------------------------===// |
| 76 | class InsertGlobalState : public GlobalSinkState { |
| 77 | public: |
| 78 | explicit InsertGlobalState(ClientContext &context, const vector<LogicalType> &return_types, DuckTableEntry &table) |
| 79 | : table(table), insert_count(0), initialized(false), return_collection(context, return_types) { |
| 80 | } |
| 81 | |
| 82 | mutex lock; |
| 83 | DuckTableEntry &table; |
| 84 | idx_t insert_count; |
| 85 | bool initialized; |
| 86 | LocalAppendState append_state; |
| 87 | ColumnDataCollection return_collection; |
| 88 | }; |
| 89 | |
| 90 | class InsertLocalState : public LocalSinkState { |
| 91 | public: |
| 92 | InsertLocalState(ClientContext &context, const vector<LogicalType> &types, |
| 93 | const vector<unique_ptr<Expression>> &bound_defaults) |
| 94 | : default_executor(context, bound_defaults) { |
| 95 | insert_chunk.Initialize(allocator&: Allocator::Get(context), types); |
| 96 | } |
| 97 | |
| 98 | DataChunk insert_chunk; |
| 99 | ExpressionExecutor default_executor; |
| 100 | TableAppendState local_append_state; |
| 101 | unique_ptr<RowGroupCollection> local_collection; |
| 102 | optional_ptr<OptimisticDataWriter> writer; |
| 103 | // Rows that have been updated by a DO UPDATE conflict |
| 104 | unordered_set<row_t> updated_global_rows; |
| 105 | // Rows in the transaction-local storage that have been updated by a DO UPDATE conflict |
| 106 | unordered_set<row_t> updated_local_rows; |
| 107 | idx_t update_count = 0; |
| 108 | }; |
| 109 | |
| 110 | unique_ptr<GlobalSinkState> PhysicalInsert::GetGlobalSinkState(ClientContext &context) const { |
| 111 | optional_ptr<TableCatalogEntry> table; |
| 112 | if (info) { |
| 113 | // CREATE TABLE AS |
| 114 | D_ASSERT(!insert_table); |
| 115 | auto &catalog = schema->catalog; |
| 116 | table = &catalog.CreateTable(transaction: catalog.GetCatalogTransaction(context), schema&: *schema.get_mutable(), info&: *info) |
| 117 | ->Cast<TableCatalogEntry>(); |
| 118 | } else { |
| 119 | D_ASSERT(insert_table); |
| 120 | D_ASSERT(insert_table->IsDuckTable()); |
| 121 | table = insert_table.get_mutable(); |
| 122 | } |
| 123 | auto result = make_uniq<InsertGlobalState>(args&: context, args: GetTypes(), args&: table->Cast<DuckTableEntry>()); |
| 124 | return std::move(result); |
| 125 | } |
| 126 | |
| 127 | unique_ptr<LocalSinkState> PhysicalInsert::GetLocalSinkState(ExecutionContext &context) const { |
| 128 | return make_uniq<InsertLocalState>(args&: context.client, args: insert_types, args: bound_defaults); |
| 129 | } |
| 130 | |
| 131 | void PhysicalInsert::ResolveDefaults(const TableCatalogEntry &table, DataChunk &chunk, |
| 132 | const physical_index_vector_t<idx_t> &column_index_map, |
| 133 | ExpressionExecutor &default_executor, DataChunk &result) { |
| 134 | chunk.Flatten(); |
| 135 | default_executor.SetChunk(chunk); |
| 136 | |
| 137 | result.Reset(); |
| 138 | result.SetCardinality(chunk); |
| 139 | |
| 140 | if (!column_index_map.empty()) { |
| 141 | // columns specified by the user, use column_index_map |
| 142 | for (auto &col : table.GetColumns().Physical()) { |
| 143 | auto storage_idx = col.StorageOid(); |
| 144 | auto mapped_index = column_index_map[col.Physical()]; |
| 145 | if (mapped_index == DConstants::INVALID_INDEX) { |
| 146 | // insert default value |
| 147 | default_executor.ExecuteExpression(expr_idx: storage_idx, result&: result.data[storage_idx]); |
| 148 | } else { |
| 149 | // get value from child chunk |
| 150 | D_ASSERT((idx_t)mapped_index < chunk.ColumnCount()); |
| 151 | D_ASSERT(result.data[storage_idx].GetType() == chunk.data[mapped_index].GetType()); |
| 152 | result.data[storage_idx].Reference(other&: chunk.data[mapped_index]); |
| 153 | } |
| 154 | } |
| 155 | } else { |
| 156 | // no columns specified, just append directly |
| 157 | for (idx_t i = 0; i < result.ColumnCount(); i++) { |
| 158 | D_ASSERT(result.data[i].GetType() == chunk.data[i].GetType()); |
| 159 | result.data[i].Reference(other&: chunk.data[i]); |
| 160 | } |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | bool AllConflictsMeetCondition(DataChunk &result) { |
| 165 | auto data = FlatVector::GetData<bool>(vector&: result.data[0]); |
| 166 | for (idx_t i = 0; i < result.size(); i++) { |
| 167 | if (!data[i]) { |
| 168 | return false; |
| 169 | } |
| 170 | } |
| 171 | return true; |
| 172 | } |
| 173 | |
| 174 | void CheckOnConflictCondition(ExecutionContext &context, DataChunk &conflicts, const unique_ptr<Expression> &condition, |
| 175 | DataChunk &result) { |
| 176 | ExpressionExecutor executor(context.client, *condition); |
| 177 | result.Initialize(context&: context.client, types: {LogicalType::BOOLEAN}); |
| 178 | executor.Execute(input&: conflicts, result); |
| 179 | result.SetCardinality(conflicts.size()); |
| 180 | } |
| 181 | |
| 182 | static void CombineExistingAndInsertTuples(DataChunk &result, DataChunk &scan_chunk, DataChunk &input_chunk, |
| 183 | ClientContext &client, const PhysicalInsert &op) { |
| 184 | auto &types_to_fetch = op.types_to_fetch; |
| 185 | auto &insert_types = op.insert_types; |
| 186 | |
| 187 | if (types_to_fetch.empty()) { |
| 188 | // We have not scanned the initial table, so we can just duplicate the initial chunk |
| 189 | result.Initialize(context&: client, types: input_chunk.GetTypes()); |
| 190 | result.Reference(chunk&: input_chunk); |
| 191 | result.SetCardinality(input_chunk); |
| 192 | return; |
| 193 | } |
| 194 | vector<LogicalType> combined_types; |
| 195 | combined_types.reserve(n: insert_types.size() + types_to_fetch.size()); |
| 196 | combined_types.insert(position: combined_types.end(), first: insert_types.begin(), last: insert_types.end()); |
| 197 | combined_types.insert(position: combined_types.end(), first: types_to_fetch.begin(), last: types_to_fetch.end()); |
| 198 | |
| 199 | result.Initialize(context&: client, types: combined_types); |
| 200 | result.Reset(); |
| 201 | // Add the VALUES list |
| 202 | for (idx_t i = 0; i < insert_types.size(); i++) { |
| 203 | idx_t col_idx = i; |
| 204 | auto &other_col = input_chunk.data[i]; |
| 205 | auto &this_col = result.data[col_idx]; |
| 206 | D_ASSERT(other_col.GetType() == this_col.GetType()); |
| 207 | this_col.Reference(other&: other_col); |
| 208 | } |
| 209 | // Add the columns from the original conflicting tuples |
| 210 | for (idx_t i = 0; i < types_to_fetch.size(); i++) { |
| 211 | idx_t col_idx = i + insert_types.size(); |
| 212 | auto &other_col = scan_chunk.data[i]; |
| 213 | auto &this_col = result.data[col_idx]; |
| 214 | D_ASSERT(other_col.GetType() == this_col.GetType()); |
| 215 | this_col.Reference(other&: other_col); |
| 216 | } |
| 217 | // This is guaranteed by the requirement of a conflict target to have a condition or set expressions |
| 218 | // Only when we have any sort of condition or SET expression that references the existing table is this possible |
| 219 | // to not be true. |
| 220 | // We can have a SET expression without a conflict target ONLY if there is only 1 Index on the table |
| 221 | // In which case this also can't cause a discrepancy between existing tuple count and insert tuple count |
| 222 | D_ASSERT(input_chunk.size() == scan_chunk.size()); |
| 223 | result.SetCardinality(input_chunk.size()); |
| 224 | } |
| 225 | |
| 226 | static void CreateUpdateChunk(ExecutionContext &context, DataChunk &chunk, TableCatalogEntry &table, Vector &row_ids, |
| 227 | DataChunk &update_chunk, const PhysicalInsert &op) { |
| 228 | |
| 229 | auto &do_update_condition = op.do_update_condition; |
| 230 | auto &set_types = op.set_types; |
| 231 | auto &set_expressions = op.set_expressions; |
| 232 | // Check the optional condition for the DO UPDATE clause, to filter which rows will be updated |
| 233 | if (do_update_condition) { |
| 234 | DataChunk do_update_filter_result; |
| 235 | do_update_filter_result.Initialize(context&: context.client, types: {LogicalType::BOOLEAN}); |
| 236 | ExpressionExecutor where_executor(context.client, *do_update_condition); |
| 237 | where_executor.Execute(input&: chunk, result&: do_update_filter_result); |
| 238 | do_update_filter_result.SetCardinality(chunk.size()); |
| 239 | |
| 240 | ManagedSelection selection(chunk.size()); |
| 241 | |
| 242 | auto where_data = FlatVector::GetData<bool>(vector&: do_update_filter_result.data[0]); |
| 243 | for (idx_t i = 0; i < chunk.size(); i++) { |
| 244 | if (where_data[i]) { |
| 245 | selection.Append(idx: i); |
| 246 | } |
| 247 | } |
| 248 | if (selection.Count() != selection.Size()) { |
| 249 | // Not all conflicts met the condition, need to filter out the ones that don't |
| 250 | chunk.Slice(sel_vector: selection.Selection(), count: selection.Count()); |
| 251 | chunk.SetCardinality(selection.Count()); |
| 252 | // Also apply this Slice to the to-update row_ids |
| 253 | row_ids.Slice(sel: selection.Selection(), count: selection.Count()); |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | // Execute the SET expressions |
| 258 | update_chunk.Initialize(context&: context.client, types: set_types); |
| 259 | ExpressionExecutor executor(context.client, set_expressions); |
| 260 | executor.Execute(input&: chunk, result&: update_chunk); |
| 261 | update_chunk.SetCardinality(chunk); |
| 262 | } |
| 263 | |
| 264 | template <bool GLOBAL> |
| 265 | static idx_t PerformOnConflictAction(ExecutionContext &context, DataChunk &chunk, TableCatalogEntry &table, |
| 266 | Vector &row_ids, const PhysicalInsert &op) { |
| 267 | |
| 268 | if (op.action_type == OnConflictAction::NOTHING) { |
| 269 | return 0; |
| 270 | } |
| 271 | auto &set_columns = op.set_columns; |
| 272 | |
| 273 | DataChunk update_chunk; |
| 274 | CreateUpdateChunk(context, chunk, table, row_ids, update_chunk, op); |
| 275 | |
| 276 | auto &data_table = table.GetStorage(); |
| 277 | // Perform the update, using the results of the SET expressions |
| 278 | if (GLOBAL) { |
| 279 | data_table.Update(table, context&: context.client, row_ids, column_ids: set_columns, data&: update_chunk); |
| 280 | } else { |
| 281 | auto &local_storage = LocalStorage::Get(context&: context.client, db&: data_table.db); |
| 282 | // Perform the update, using the results of the SET expressions |
| 283 | local_storage.Update(table&: data_table, row_ids, column_ids: set_columns, data&: update_chunk); |
| 284 | } |
| 285 | return update_chunk.size(); |
| 286 | } |
| 287 | |
| 288 | // TODO: should we use a hash table to keep track of this instead? |
| 289 | template <bool GLOBAL> |
| 290 | static void RegisterUpdatedRows(InsertLocalState &lstate, const Vector &row_ids, idx_t count) { |
| 291 | // Insert all rows, if any of the rows has already been updated before, we throw an error |
| 292 | auto data = FlatVector::GetData<row_t>(vector: row_ids); |
| 293 | |
| 294 | // The rowids in the transaction-local ART aren't final yet so we have to separately keep track of the two sets of |
| 295 | // rowids |
| 296 | unordered_set<row_t> &updated_rows = GLOBAL ? lstate.updated_global_rows : lstate.updated_local_rows; |
| 297 | for (idx_t i = 0; i < count; i++) { |
| 298 | auto result = updated_rows.insert(x: data[i]); |
| 299 | if (result.second == false) { |
| 300 | throw InvalidInputException( |
| 301 | "ON CONFLICT DO UPDATE can not update the same row twice in the same command, Ensure that no rows " |
| 302 | "proposed for insertion within the same command have duplicate constrained values" ); |
| 303 | } |
| 304 | } |
| 305 | } |
| 306 | |
| 307 | template <bool GLOBAL> |
| 308 | static idx_t HandleInsertConflicts(TableCatalogEntry &table, ExecutionContext &context, InsertLocalState &lstate, |
| 309 | DataTable &data_table, const PhysicalInsert &op) { |
| 310 | auto &types_to_fetch = op.types_to_fetch; |
| 311 | auto &on_conflict_condition = op.on_conflict_condition; |
| 312 | auto &conflict_target = op.conflict_target; |
| 313 | auto &columns_to_fetch = op.columns_to_fetch; |
| 314 | |
| 315 | auto &local_storage = LocalStorage::Get(context&: context.client, db&: data_table.db); |
| 316 | |
| 317 | // We either want to do nothing, or perform an update when conflicts arise |
| 318 | ConflictInfo conflict_info(conflict_target); |
| 319 | ConflictManager conflict_manager(VerifyExistenceType::APPEND, lstate.insert_chunk.size(), &conflict_info); |
| 320 | if (GLOBAL) { |
| 321 | data_table.VerifyAppendConstraints(table, context&: context.client, chunk&: lstate.insert_chunk, conflict_manager: &conflict_manager); |
| 322 | } else { |
| 323 | DataTable::VerifyUniqueIndexes(indexes&: local_storage.GetIndexes(table&: data_table), context&: context.client, chunk&: lstate.insert_chunk, |
| 324 | conflict_manager: &conflict_manager); |
| 325 | } |
| 326 | conflict_manager.Finalize(); |
| 327 | if (conflict_manager.ConflictCount() == 0) { |
| 328 | // No conflicts found, 0 updates performed |
| 329 | return 0; |
| 330 | } |
| 331 | auto &conflicts = conflict_manager.Conflicts(); |
| 332 | auto &row_ids = conflict_manager.RowIds(); |
| 333 | |
| 334 | DataChunk conflict_chunk; // contains only the conflicting values |
| 335 | DataChunk scan_chunk; // contains the original values, that caused the conflict |
| 336 | DataChunk combined_chunk; // contains conflict_chunk + scan_chunk (wide) |
| 337 | |
| 338 | // Filter out everything but the conflicting rows |
| 339 | conflict_chunk.Initialize(context&: context.client, types: lstate.insert_chunk.GetTypes()); |
| 340 | conflict_chunk.Reference(chunk&: lstate.insert_chunk); |
| 341 | conflict_chunk.Slice(sel_vector: conflicts.Selection(), count: conflicts.Count()); |
| 342 | conflict_chunk.SetCardinality(conflicts.Count()); |
| 343 | |
| 344 | // Holds the pins for the fetched rows |
| 345 | unique_ptr<ColumnFetchState> fetch_state; |
| 346 | if (!types_to_fetch.empty()) { |
| 347 | D_ASSERT(scan_chunk.size() == 0); |
| 348 | // When these values are required for the conditions or the SET expressions, |
| 349 | // then we scan the existing table for the conflicting tuples, using the rowids |
| 350 | scan_chunk.Initialize(context&: context.client, types: types_to_fetch); |
| 351 | fetch_state = make_uniq<ColumnFetchState>(); |
| 352 | if (GLOBAL) { |
| 353 | auto &transaction = DuckTransaction::Get(context&: context.client, catalog&: table.catalog); |
| 354 | data_table.Fetch(transaction, result&: scan_chunk, column_ids: columns_to_fetch, row_ids, fetch_count: conflicts.Count(), state&: *fetch_state); |
| 355 | } else { |
| 356 | local_storage.FetchChunk(table&: data_table, row_ids, count: conflicts.Count(), col_ids: columns_to_fetch, chunk&: scan_chunk, |
| 357 | fetch_state&: *fetch_state); |
| 358 | } |
| 359 | } |
| 360 | |
| 361 | // Splice the Input chunk and the fetched chunk together |
| 362 | CombineExistingAndInsertTuples(result&: combined_chunk, scan_chunk, input_chunk&: conflict_chunk, client&: context.client, op); |
| 363 | |
| 364 | if (on_conflict_condition) { |
| 365 | DataChunk conflict_condition_result; |
| 366 | CheckOnConflictCondition(context, conflicts&: combined_chunk, condition: on_conflict_condition, result&: conflict_condition_result); |
| 367 | bool conditions_met = AllConflictsMeetCondition(result&: conflict_condition_result); |
| 368 | if (!conditions_met) { |
| 369 | // Filter out the tuples that did pass the filter, then run the verify again |
| 370 | ManagedSelection sel(combined_chunk.size()); |
| 371 | auto data = FlatVector::GetData<bool>(vector&: conflict_condition_result.data[0]); |
| 372 | for (idx_t i = 0; i < combined_chunk.size(); i++) { |
| 373 | if (!data[i]) { |
| 374 | // Only populate the selection vector with the tuples that did not meet the condition |
| 375 | sel.Append(idx: i); |
| 376 | } |
| 377 | } |
| 378 | combined_chunk.Slice(sel_vector: sel.Selection(), count: sel.Count()); |
| 379 | row_ids.Slice(sel: sel.Selection(), count: sel.Count()); |
| 380 | if (GLOBAL) { |
| 381 | data_table.VerifyAppendConstraints(table, context&: context.client, chunk&: combined_chunk, conflict_manager: nullptr); |
| 382 | } else { |
| 383 | DataTable::VerifyUniqueIndexes(indexes&: local_storage.GetIndexes(table&: data_table), context&: context.client, |
| 384 | chunk&: lstate.insert_chunk, conflict_manager: nullptr); |
| 385 | } |
| 386 | throw InternalException("The previous operation was expected to throw but didn't" ); |
| 387 | } |
| 388 | } |
| 389 | |
| 390 | RegisterUpdatedRows<GLOBAL>(lstate, row_ids, combined_chunk.size()); |
| 391 | |
| 392 | idx_t updated_tuples = PerformOnConflictAction<GLOBAL>(context, combined_chunk, table, row_ids, op); |
| 393 | |
| 394 | // Remove the conflicting tuples from the insert chunk |
| 395 | SelectionVector sel_vec(lstate.insert_chunk.size()); |
| 396 | idx_t new_size = |
| 397 | SelectionVector::Inverted(src: conflicts.Selection(), dst&: sel_vec, source_size: conflicts.Count(), count: lstate.insert_chunk.size()); |
| 398 | lstate.insert_chunk.Slice(sel_vector: sel_vec, count: new_size); |
| 399 | lstate.insert_chunk.SetCardinality(new_size); |
| 400 | return updated_tuples; |
| 401 | } |
| 402 | |
| 403 | idx_t PhysicalInsert::OnConflictHandling(TableCatalogEntry &table, ExecutionContext &context, |
| 404 | InsertLocalState &lstate) const { |
| 405 | auto &data_table = table.GetStorage(); |
| 406 | if (action_type == OnConflictAction::THROW) { |
| 407 | data_table.VerifyAppendConstraints(table, context&: context.client, chunk&: lstate.insert_chunk, conflict_manager: nullptr); |
| 408 | return 0; |
| 409 | } |
| 410 | // Check whether any conflicts arise, and if they all meet the conflict_target + condition |
| 411 | // If that's not the case - We throw the first error |
| 412 | idx_t updated_tuples = 0; |
| 413 | updated_tuples += HandleInsertConflicts<true>(table, context, lstate, data_table, op: *this); |
| 414 | // Also check the transaction-local storage+ART so we can detect conflicts within this transaction |
| 415 | updated_tuples += HandleInsertConflicts<false>(table, context, lstate, data_table, op: *this); |
| 416 | |
| 417 | return updated_tuples; |
| 418 | } |
| 419 | |
| 420 | SinkResultType PhysicalInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
| 421 | auto &gstate = input.global_state.Cast<InsertGlobalState>(); |
| 422 | auto &lstate = input.local_state.Cast<InsertLocalState>(); |
| 423 | |
| 424 | auto &table = gstate.table; |
| 425 | auto &storage = table.GetStorage(); |
| 426 | PhysicalInsert::ResolveDefaults(table, chunk, column_index_map, default_executor&: lstate.default_executor, result&: lstate.insert_chunk); |
| 427 | |
| 428 | if (!parallel) { |
| 429 | if (!gstate.initialized) { |
| 430 | storage.InitializeLocalAppend(state&: gstate.append_state, context&: context.client); |
| 431 | gstate.initialized = true; |
| 432 | } |
| 433 | |
| 434 | idx_t updated_tuples = OnConflictHandling(table, context, lstate); |
| 435 | gstate.insert_count += lstate.insert_chunk.size(); |
| 436 | gstate.insert_count += updated_tuples; |
| 437 | storage.LocalAppend(state&: gstate.append_state, table, context&: context.client, chunk&: lstate.insert_chunk, unsafe: true); |
| 438 | |
| 439 | if (return_chunk) { |
| 440 | gstate.return_collection.Append(new_chunk&: lstate.insert_chunk); |
| 441 | } |
| 442 | } else { |
| 443 | D_ASSERT(!return_chunk); |
| 444 | // parallel append |
| 445 | if (!lstate.local_collection) { |
| 446 | lock_guard<mutex> l(gstate.lock); |
| 447 | auto &table_info = storage.info; |
| 448 | auto &block_manager = TableIOManager::Get(table&: storage).GetBlockManagerForRowData(); |
| 449 | lstate.local_collection = |
| 450 | make_uniq<RowGroupCollection>(args&: table_info, args&: block_manager, args: insert_types, args: MAX_ROW_ID); |
| 451 | lstate.local_collection->InitializeEmpty(); |
| 452 | lstate.local_collection->InitializeAppend(state&: lstate.local_append_state); |
| 453 | lstate.writer = &gstate.table.GetStorage().CreateOptimisticWriter(context&: context.client); |
| 454 | } |
| 455 | OnConflictHandling(table, context, lstate); |
| 456 | |
| 457 | auto new_row_group = lstate.local_collection->Append(chunk&: lstate.insert_chunk, state&: lstate.local_append_state); |
| 458 | if (new_row_group) { |
| 459 | lstate.writer->WriteNewRowGroup(row_groups&: *lstate.local_collection); |
| 460 | } |
| 461 | } |
| 462 | |
| 463 | return SinkResultType::NEED_MORE_INPUT; |
| 464 | } |
| 465 | |
| 466 | void PhysicalInsert::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const { |
| 467 | auto &gstate = gstate_p.Cast<InsertGlobalState>(); |
| 468 | auto &lstate = lstate_p.Cast<InsertLocalState>(); |
| 469 | auto &client_profiler = QueryProfiler::Get(context&: context.client); |
| 470 | context.thread.profiler.Flush(phys_op: *this, expression_executor&: lstate.default_executor, name: "default_executor" , id: 1); |
| 471 | client_profiler.Flush(profiler&: context.thread.profiler); |
| 472 | |
| 473 | if (!parallel) { |
| 474 | return; |
| 475 | } |
| 476 | if (!lstate.local_collection) { |
| 477 | return; |
| 478 | } |
| 479 | // parallel append: finalize the append |
| 480 | TransactionData tdata(0, 0); |
| 481 | lstate.local_collection->FinalizeAppend(transaction: tdata, state&: lstate.local_append_state); |
| 482 | |
| 483 | auto append_count = lstate.local_collection->GetTotalRows(); |
| 484 | |
| 485 | lock_guard<mutex> lock(gstate.lock); |
| 486 | gstate.insert_count += append_count; |
| 487 | if (append_count < RowGroup::ROW_GROUP_SIZE) { |
| 488 | // we have few rows - append to the local storage directly |
| 489 | auto &table = gstate.table; |
| 490 | auto &storage = table.GetStorage(); |
| 491 | storage.InitializeLocalAppend(state&: gstate.append_state, context&: context.client); |
| 492 | auto &transaction = DuckTransaction::Get(context&: context.client, catalog&: table.catalog); |
| 493 | lstate.local_collection->Scan(transaction, fun: [&](DataChunk &insert_chunk) { |
| 494 | storage.LocalAppend(state&: gstate.append_state, table, context&: context.client, chunk&: insert_chunk); |
| 495 | return true; |
| 496 | }); |
| 497 | storage.FinalizeLocalAppend(state&: gstate.append_state); |
| 498 | } else { |
| 499 | // we have written rows to disk optimistically - merge directly into the transaction-local storage |
| 500 | gstate.table.GetStorage().FinalizeOptimisticWriter(context&: context.client, writer&: *lstate.writer); |
| 501 | gstate.table.GetStorage().LocalMerge(context&: context.client, collection&: *lstate.local_collection); |
| 502 | } |
| 503 | } |
| 504 | |
| 505 | SinkFinalizeType PhysicalInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
| 506 | GlobalSinkState &state) const { |
| 507 | auto &gstate = state.Cast<InsertGlobalState>(); |
| 508 | if (!parallel && gstate.initialized) { |
| 509 | auto &table = gstate.table; |
| 510 | auto &storage = table.GetStorage(); |
| 511 | storage.FinalizeLocalAppend(state&: gstate.append_state); |
| 512 | } |
| 513 | return SinkFinalizeType::READY; |
| 514 | } |
| 515 | |
| 516 | //===--------------------------------------------------------------------===// |
| 517 | // Source |
| 518 | //===--------------------------------------------------------------------===// |
| 519 | class InsertSourceState : public GlobalSourceState { |
| 520 | public: |
| 521 | explicit InsertSourceState(const PhysicalInsert &op) { |
| 522 | if (op.return_chunk) { |
| 523 | D_ASSERT(op.sink_state); |
| 524 | auto &g = op.sink_state->Cast<InsertGlobalState>(); |
| 525 | g.return_collection.InitializeScan(state&: scan_state); |
| 526 | } |
| 527 | } |
| 528 | |
| 529 | ColumnDataScanState scan_state; |
| 530 | }; |
| 531 | |
| 532 | unique_ptr<GlobalSourceState> PhysicalInsert::GetGlobalSourceState(ClientContext &context) const { |
| 533 | return make_uniq<InsertSourceState>(args: *this); |
| 534 | } |
| 535 | |
| 536 | SourceResultType PhysicalInsert::GetData(ExecutionContext &context, DataChunk &chunk, |
| 537 | OperatorSourceInput &input) const { |
| 538 | auto &state = input.global_state.Cast<InsertSourceState>(); |
| 539 | auto &insert_gstate = sink_state->Cast<InsertGlobalState>(); |
| 540 | if (!return_chunk) { |
| 541 | chunk.SetCardinality(1); |
| 542 | chunk.SetValue(col_idx: 0, index: 0, val: Value::BIGINT(value: insert_gstate.insert_count)); |
| 543 | return SourceResultType::FINISHED; |
| 544 | } |
| 545 | |
| 546 | insert_gstate.return_collection.Scan(state&: state.scan_state, result&: chunk); |
| 547 | return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
| 548 | } |
| 549 | |
| 550 | } // namespace duckdb |
| 551 | |