| 1 | #include <Processors/Formats/Impl/TemplateBlockOutputFormat.h> |
| 2 | #include <Formats/FormatFactory.h> |
| 3 | #include <IO/WriteHelpers.h> |
| 4 | #include <DataTypes/DataTypesNumber.h> |
| 5 | #include <Interpreters/Context.h> |
| 6 | |
| 7 | |
| 8 | namespace DB |
| 9 | { |
| 10 | |
| 11 | namespace ErrorCodes |
| 12 | { |
| 13 | extern const int SYNTAX_ERROR; |
| 14 | } |
| 15 | |
| 16 | TemplateBlockOutputFormat::TemplateBlockOutputFormat(const Block & , WriteBuffer & out_, const FormatSettings & settings_, |
| 17 | ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_, |
| 18 | std::string row_between_delimiter_) |
| 19 | : IOutputFormat(header_, out_), settings(settings_), format(std::move(format_)) |
| 20 | , row_format(std::move(row_format_)), row_between_delimiter(std::move(row_between_delimiter_)) |
| 21 | { |
| 22 | auto & sample = getPort(PortKind::Main).getHeader(); |
| 23 | size_t columns = sample.columns(); |
| 24 | types.resize(columns); |
| 25 | for (size_t i = 0; i < columns; ++i) |
| 26 | types[i] = sample.safeGetByPosition(i).type; |
| 27 | |
| 28 | /// Validate format string for whole output |
| 29 | size_t data_idx = format.format_idx_to_column_idx.size() + 1; |
| 30 | for (size_t i = 0; i < format.format_idx_to_column_idx.size(); ++i) |
| 31 | { |
| 32 | if (!format.format_idx_to_column_idx[i]) |
| 33 | format.throwInvalidFormat("Output part name cannot be empty." , i); |
| 34 | switch (*format.format_idx_to_column_idx[i]) |
| 35 | { |
| 36 | case static_cast<size_t>(ResultsetPart::Data): |
| 37 | data_idx = i; |
| 38 | [[fallthrough]]; |
| 39 | case static_cast<size_t>(ResultsetPart::Totals): |
| 40 | case static_cast<size_t>(ResultsetPart::ExtremesMin): |
| 41 | case static_cast<size_t>(ResultsetPart::ExtremesMax): |
| 42 | if (format.formats[i] != ColumnFormat::None) |
| 43 | format.throwInvalidFormat("Serialization type for data, totals, min and max must be empty or None" , i); |
| 44 | break; |
| 45 | case static_cast<size_t>(ResultsetPart::Rows): |
| 46 | case static_cast<size_t>(ResultsetPart::RowsBeforeLimit): |
| 47 | case static_cast<size_t>(ResultsetPart::TimeElapsed): |
| 48 | case static_cast<size_t>(ResultsetPart::RowsRead): |
| 49 | case static_cast<size_t>(ResultsetPart::BytesRead): |
| 50 | if (format.formats[i] == ColumnFormat::None) |
| 51 | format.throwInvalidFormat("Serialization type for output part rows, rows_before_limit, time, " |
| 52 | "rows_read or bytes_read is not specified" , i); |
| 53 | break; |
| 54 | default: |
| 55 | format.throwInvalidFormat("Invalid output part" , i); |
| 56 | } |
| 57 | } |
| 58 | if (data_idx != 0) |
| 59 | format.throwInvalidFormat("${data} must be the first output part" , 0); |
| 60 | |
| 61 | /// Validate format string for rows |
| 62 | if (row_format.delimiters.size() == 1) |
| 63 | row_format.throwInvalidFormat("No columns specified" , 0); |
| 64 | for (size_t i = 0; i < row_format.columnsCount(); ++i) |
| 65 | { |
| 66 | if (!row_format.format_idx_to_column_idx[i]) |
| 67 | row_format.throwInvalidFormat("Cannot skip format field for output, it's a bug." , i); |
| 68 | if (header_.columns() <= *row_format.format_idx_to_column_idx[i]) |
| 69 | row_format.throwInvalidFormat("Column index " + std::to_string(*row_format.format_idx_to_column_idx[i]) + |
| 70 | " must be less then number of columns (" + std::to_string(header_.columns()) + ")" , i); |
| 71 | if (row_format.formats[i] == ColumnFormat::None) |
| 72 | row_format.throwInvalidFormat("Serialization type for file column is not specified" , i); |
| 73 | } |
| 74 | } |
| 75 | |
| 76 | TemplateBlockOutputFormat::ResultsetPart TemplateBlockOutputFormat::stringToResultsetPart(const String & part) |
| 77 | { |
| 78 | if (part == "data" ) |
| 79 | return ResultsetPart::Data; |
| 80 | else if (part == "totals" ) |
| 81 | return ResultsetPart::Totals; |
| 82 | else if (part == "min" ) |
| 83 | return ResultsetPart::ExtremesMin; |
| 84 | else if (part == "max" ) |
| 85 | return ResultsetPart::ExtremesMax; |
| 86 | else if (part == "rows" ) |
| 87 | return ResultsetPart::Rows; |
| 88 | else if (part == "rows_before_limit" ) |
| 89 | return ResultsetPart::RowsBeforeLimit; |
| 90 | else if (part == "time" ) |
| 91 | return ResultsetPart::TimeElapsed; |
| 92 | else if (part == "rows_read" ) |
| 93 | return ResultsetPart::RowsRead; |
| 94 | else if (part == "bytes_read" ) |
| 95 | return ResultsetPart::BytesRead; |
| 96 | else |
| 97 | throw Exception("Unknown output part " + part, ErrorCodes::SYNTAX_ERROR); |
| 98 | } |
| 99 | |
| 100 | void TemplateBlockOutputFormat::writeRow(const Chunk & chunk, size_t row_num) |
| 101 | { |
| 102 | size_t columns = row_format.format_idx_to_column_idx.size(); |
| 103 | for (size_t j = 0; j < columns; ++j) |
| 104 | { |
| 105 | writeString(row_format.delimiters[j], out); |
| 106 | |
| 107 | size_t col_idx = *row_format.format_idx_to_column_idx[j]; |
| 108 | serializeField(*chunk.getColumns()[col_idx], *types[col_idx], row_num, row_format.formats[j]); |
| 109 | } |
| 110 | writeString(row_format.delimiters[columns], out); |
| 111 | } |
| 112 | |
| 113 | void TemplateBlockOutputFormat::serializeField(const IColumn & column, const IDataType & type, size_t row_num, ColumnFormat col_format) |
| 114 | { |
| 115 | switch (col_format) |
| 116 | { |
| 117 | case ColumnFormat::Escaped: |
| 118 | type.serializeAsTextEscaped(column, row_num, out, settings); |
| 119 | break; |
| 120 | case ColumnFormat::Quoted: |
| 121 | type.serializeAsTextQuoted(column, row_num, out, settings); |
| 122 | break; |
| 123 | case ColumnFormat::Csv: |
| 124 | type.serializeAsTextCSV(column, row_num, out, settings); |
| 125 | break; |
| 126 | case ColumnFormat::Json: |
| 127 | type.serializeAsTextJSON(column, row_num, out, settings); |
| 128 | break; |
| 129 | case ColumnFormat::Xml: |
| 130 | type.serializeAsTextXML(column, row_num, out, settings); |
| 131 | break; |
| 132 | case ColumnFormat::Raw: |
| 133 | type.serializeAsText(column, row_num, out, settings); |
| 134 | break; |
| 135 | default: |
| 136 | __builtin_unreachable(); |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | template <typename U, typename V> void TemplateBlockOutputFormat::writeValue(U value, ColumnFormat col_format) |
| 141 | { |
| 142 | auto type = std::make_unique<V>(); |
| 143 | auto col = type->createColumn(); |
| 144 | col->insert(value); |
| 145 | serializeField(*col, *type, 0, col_format); |
| 146 | } |
| 147 | |
| 148 | void TemplateBlockOutputFormat::consume(Chunk chunk) |
| 149 | { |
| 150 | doWritePrefix(); |
| 151 | |
| 152 | size_t rows = chunk.getNumRows(); |
| 153 | |
| 154 | for (size_t i = 0; i < rows; ++i) |
| 155 | { |
| 156 | if (row_count) |
| 157 | writeString(row_between_delimiter, out); |
| 158 | |
| 159 | writeRow(chunk, i); |
| 160 | ++row_count; |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | void TemplateBlockOutputFormat::doWritePrefix() |
| 165 | { |
| 166 | if (need_write_prefix) |
| 167 | { |
| 168 | writeString(format.delimiters.front(), out); |
| 169 | need_write_prefix = false; |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | void TemplateBlockOutputFormat::finalize() |
| 174 | { |
| 175 | if (finalized) |
| 176 | return; |
| 177 | |
| 178 | doWritePrefix(); |
| 179 | |
| 180 | size_t parts = format.format_idx_to_column_idx.size(); |
| 181 | |
| 182 | for (size_t i = 0; i < parts; ++i) |
| 183 | { |
| 184 | auto type = std::make_shared<DataTypeUInt64>(); |
| 185 | ColumnWithTypeAndName col(type->createColumnConst(1, row_count), type, String("tmp" )); |
| 186 | switch (static_cast<ResultsetPart>(*format.format_idx_to_column_idx[i])) |
| 187 | { |
| 188 | case ResultsetPart::Totals: |
| 189 | if (!totals) |
| 190 | format.throwInvalidFormat("Cannot print totals for this request" , i); |
| 191 | writeRow(totals, 0); |
| 192 | break; |
| 193 | case ResultsetPart::ExtremesMin: |
| 194 | if (!extremes) |
| 195 | format.throwInvalidFormat("Cannot print extremes for this request" , i); |
| 196 | writeRow(extremes, 0); |
| 197 | break; |
| 198 | case ResultsetPart::ExtremesMax: |
| 199 | if (!extremes) |
| 200 | format.throwInvalidFormat("Cannot print extremes for this request" , i); |
| 201 | writeRow(extremes, 1); |
| 202 | break; |
| 203 | case ResultsetPart::Rows: |
| 204 | writeValue<size_t, DataTypeUInt64>(row_count, format.formats[i]); |
| 205 | break; |
| 206 | case ResultsetPart::RowsBeforeLimit: |
| 207 | if (!rows_before_limit_set) |
| 208 | format.throwInvalidFormat("Cannot print rows_before_limit for this request" , i); |
| 209 | writeValue<size_t, DataTypeUInt64>(rows_before_limit, format.formats[i]); |
| 210 | break; |
| 211 | case ResultsetPart::TimeElapsed: |
| 212 | writeValue<double, DataTypeFloat64>(watch.elapsedSeconds(), format.formats[i]); |
| 213 | break; |
| 214 | case ResultsetPart::RowsRead: |
| 215 | writeValue<size_t, DataTypeUInt64>(progress.read_rows.load(), format.formats[i]); |
| 216 | break; |
| 217 | case ResultsetPart::BytesRead: |
| 218 | writeValue<size_t, DataTypeUInt64>(progress.read_bytes.load(), format.formats[i]); |
| 219 | break; |
| 220 | default: |
| 221 | break; |
| 222 | } |
| 223 | writeString(format.delimiters[i + 1], out); |
| 224 | } |
| 225 | |
| 226 | finalized = true; |
| 227 | } |
| 228 | |
| 229 | |
| 230 | void registerOutputFormatProcessorTemplate(FormatFactory & factory) |
| 231 | { |
| 232 | factory.registerOutputFormatProcessor("Template" , []( |
| 233 | WriteBuffer & buf, |
| 234 | const Block & sample, |
| 235 | FormatFactory::WriteCallback, |
| 236 | const FormatSettings & settings) |
| 237 | { |
| 238 | ParsedTemplateFormatString resultset_format; |
| 239 | if (settings.template_settings.resultset_format.empty()) |
| 240 | { |
| 241 | /// Default format string: "${data}" |
| 242 | resultset_format.delimiters.resize(2); |
| 243 | resultset_format.formats.emplace_back(ParsedTemplateFormatString::ColumnFormat::None); |
| 244 | resultset_format.format_idx_to_column_idx.emplace_back(0); |
| 245 | resultset_format.column_names.emplace_back("data" ); |
| 246 | } |
| 247 | else |
| 248 | { |
| 249 | /// Read format string from file |
| 250 | resultset_format = ParsedTemplateFormatString( |
| 251 | FormatSchemaInfo(settings.template_settings.resultset_format, "Template" , false, |
| 252 | settings.schema.is_server, settings.schema.format_schema_path), |
| 253 | [&](const String & partName) |
| 254 | { |
| 255 | return static_cast<size_t>(TemplateBlockOutputFormat::stringToResultsetPart(partName)); |
| 256 | }); |
| 257 | } |
| 258 | |
| 259 | ParsedTemplateFormatString row_format = ParsedTemplateFormatString( |
| 260 | FormatSchemaInfo(settings.template_settings.row_format, "Template" , false, |
| 261 | settings.schema.is_server, settings.schema.format_schema_path), |
| 262 | [&](const String & colName) |
| 263 | { |
| 264 | return sample.getPositionByName(colName); |
| 265 | }); |
| 266 | |
| 267 | return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter); |
| 268 | }); |
| 269 | |
| 270 | factory.registerOutputFormatProcessor("CustomSeparated" , []( |
| 271 | WriteBuffer & buf, |
| 272 | const Block & sample, |
| 273 | FormatFactory::WriteCallback, |
| 274 | const FormatSettings & settings) |
| 275 | { |
| 276 | ParsedTemplateFormatString resultset_format = ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(settings.custom); |
| 277 | ParsedTemplateFormatString row_format = ParsedTemplateFormatString::setupCustomSeparatedRowFormat(settings.custom, sample); |
| 278 | |
| 279 | return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.custom.row_between_delimiter); |
| 280 | }); |
| 281 | } |
| 282 | } |
| 283 | |