Skip to content

Commit c2e867e

Browse files
liutang123liutang123
and
liutang123
authored
[Fix](multicatelog) Fix insert iceberg/hive table when use broker (#51187)
When write Iceberg/Hive table and use broker, FE should pass broker list to BE. Co-authored-by: liutang123 <[email protected]>
1 parent 61034f4 commit c2e867e

File tree

11 files changed

+98
-16
lines changed

11 files changed

+98
-16
lines changed

be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil
5252

5353
io::FSPropertiesRef fs_properties(_write_info.file_type);
5454
fs_properties.properties = &_hadoop_conf;
55+
if (!_write_info.broker_addresses.empty()) {
56+
fs_properties.broker_addresses = &(_write_info.broker_addresses);
57+
}
5558
io::FileDescription file_description = {
5659
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()),
5760
.fs_name {}};

be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class VIcebergPartitionWriter {
5050
std::string original_write_path;
5151
std::string target_path;
5252
TFileType::type file_type;
53+
std::vector<TNetworkAddress> broker_addresses;
5354
};
5455

5556
VIcebergPartitionWriter(const TDataSink& t_sink, std::vector<std::string> partition_values,

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -365,9 +365,15 @@ std::shared_ptr<VIcebergPartitionWriter> VIcebergTableWriter::_create_partition_
365365
write_path = output_path;
366366
}
367367

368-
VIcebergPartitionWriter::WriteInfo write_info = {
369-
std::move(write_path), std::move(original_write_path), std::move(target_path),
370-
iceberg_table_sink.file_type};
368+
VIcebergPartitionWriter::WriteInfo write_info = {std::move(write_path),
369+
std::move(original_write_path),
370+
std::move(target_path),
371+
iceberg_table_sink.file_type,
372+
{}};
373+
if (iceberg_table_sink.__isset.broker_addresses) {
374+
write_info.broker_addresses.assign(iceberg_table_sink.broker_addresses.begin(),
375+
iceberg_table_sink.broker_addresses.end());
376+
}
371377

372378
_write_file_count++;
373379
std::vector<std::string> column_names;

be/src/vec/sink/writer/vhive_partition_writer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
5858

5959
io::FSPropertiesRef fs_properties(_write_info.file_type);
6060
fs_properties.properties = &_hadoop_conf;
61+
if (!_write_info.broker_addresses.empty()) {
62+
fs_properties.broker_addresses = &(_write_info.broker_addresses);
63+
}
6164
io::FileDescription file_description = {
6265
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()),
6366
.fs_name {}};

be/src/vec/sink/writer/vhive_partition_writer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class VHivePartitionWriter {
4646
std::string original_write_path;
4747
std::string target_path;
4848
TFileType::type file_type;
49+
std::vector<TNetworkAddress> broker_addresses;
4950
};
5051

5152
VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,

be/src/vec/sink/writer/vhive_table_writer.cpp

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -302,30 +302,42 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
302302
if (existing_table == false) { // new table
303303
update_mode = TUpdateMode::NEW;
304304
if (_partition_columns_input_index.empty()) { // new unpartitioned table
305-
write_info = {write_location.write_path, write_location.original_write_path,
306-
write_location.target_path, write_location.file_type};
305+
write_info = {write_location.write_path,
306+
write_location.original_write_path,
307+
write_location.target_path,
308+
write_location.file_type,
309+
{}};
307310
} else { // a new partition in a new partitioned table
308311
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
309312
auto original_write_path =
310313
fmt::format("{}/{}", write_location.original_write_path, partition_name);
311314
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
312-
write_info = {std::move(write_path), std::move(original_write_path),
313-
std::move(target_path), write_location.file_type};
315+
write_info = {std::move(write_path),
316+
std::move(original_write_path),
317+
std::move(target_path),
318+
write_location.file_type,
319+
{}};
314320
}
315321
} else { // a new partition in an existing partitioned table, or an existing unpartitioned table
316322
if (_partition_columns_input_index.empty()) { // an existing unpartitioned table
317323
update_mode =
318324
!hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE;
319-
write_info = {write_location.write_path, write_location.original_write_path,
320-
write_location.target_path, write_location.file_type};
325+
write_info = {write_location.write_path,
326+
write_location.original_write_path,
327+
write_location.target_path,
328+
write_location.file_type,
329+
{}};
321330
} else { // a new partition in an existing partitioned table
322331
update_mode = TUpdateMode::NEW;
323332
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
324333
auto original_write_path =
325334
fmt::format("{}/{}", write_location.original_write_path, partition_name);
326335
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
327-
write_info = {std::move(write_path), std::move(original_write_path),
328-
std::move(target_path), write_location.file_type};
336+
write_info = {std::move(write_path),
337+
std::move(original_write_path),
338+
std::move(target_path),
339+
write_location.file_type,
340+
{}};
329341
}
330342
// need to get schema from existing table ?
331343
}
@@ -338,8 +350,11 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
338350
auto original_write_path =
339351
fmt::format("{}/{}", write_location.original_write_path, partition_name);
340352
auto target_path = fmt::format("{}", existing_partition->location.target_path);
341-
write_info = {std::move(write_path), std::move(original_write_path),
342-
std::move(target_path), existing_partition->location.file_type};
353+
write_info = {std::move(write_path),
354+
std::move(original_write_path),
355+
std::move(target_path),
356+
existing_partition->location.file_type,
357+
{}};
343358
file_format_type = existing_partition->file_format;
344359
write_compress_type = hive_table_sink.compression_type;
345360
} else {
@@ -348,13 +363,20 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
348363
auto original_write_path =
349364
fmt::format("{}/{}", write_location.original_write_path, partition_name);
350365
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
351-
write_info = {std::move(write_path), std::move(original_write_path),
352-
std::move(target_path), write_location.file_type};
366+
write_info = {std::move(write_path),
367+
std::move(original_write_path),
368+
std::move(target_path),
369+
write_location.file_type,
370+
{}};
353371
file_format_type = hive_table_sink.file_format;
354372
write_compress_type = hive_table_sink.compression_type;
355373
// need to get schema from existing table ?
356374
}
357375
}
376+
if (hive_table_sink.__isset.broker_addresses) {
377+
write_info.broker_addresses.assign(hive_table_sink.broker_addresses.begin(),
378+
hive_table_sink.broker_addresses.end());
379+
}
358380

359381
_write_file_count++;
360382
std::vector<std::string> column_names;

fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,21 @@ public boolean containsBroker(String brokerName) {
112112
}
113113
}
114114

115+
public List<FsBroker> getBrokers(String brokerName) {
116+
List<FsBroker> result = null;
117+
lock.lock();
118+
try {
119+
List<FsBroker> brokerList = brokerListMap.get(brokerName);
120+
if (brokerList == null || brokerList.isEmpty()) {
121+
return null;
122+
}
123+
result = new ArrayList<>(brokerList);
124+
} finally {
125+
lock.unlock();
126+
}
127+
return result;
128+
}
129+
115130
public FsBroker getAnyBroker(String brokerName) {
116131
lock.lock();
117132
try {

fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,20 @@
2020

2121
package org.apache.doris.planner;
2222

23+
import org.apache.doris.catalog.Env;
24+
import org.apache.doris.catalog.FsBroker;
2325
import org.apache.doris.common.AnalysisException;
2426
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
2527
import org.apache.doris.thrift.TDataSink;
2628
import org.apache.doris.thrift.TFileCompressType;
2729
import org.apache.doris.thrift.TFileFormatType;
30+
import org.apache.doris.thrift.TNetworkAddress;
2831

32+
import java.util.Collections;
33+
import java.util.List;
2934
import java.util.Optional;
3035
import java.util.Set;
36+
import java.util.stream.Collectors;
3137

3238
public abstract class BaseExternalTableDataSink extends DataSink {
3339

@@ -53,6 +59,21 @@ public DataPartition getOutputPartition() {
5359
*/
5460
protected abstract Set<TFileFormatType> supportedFileFormatTypes();
5561

62+
protected List<TNetworkAddress> getBrokerAddresses(String bindBroker) throws AnalysisException {
63+
List<FsBroker> brokers;
64+
if (bindBroker != null) {
65+
brokers = Env.getCurrentEnv().getBrokerMgr().getBrokers(bindBroker);
66+
} else {
67+
brokers = Env.getCurrentEnv().getBrokerMgr().getAllBrokers();
68+
}
69+
if (brokers == null || brokers.isEmpty()) {
70+
throw new AnalysisException("No alive broker.");
71+
}
72+
Collections.shuffle(brokers);
73+
return brokers.stream().map(broker -> new TNetworkAddress(broker.host, broker.port))
74+
.collect(Collectors.toList());
75+
}
76+
5677
protected TFileFormatType getTFileFormatType(String format) throws AnalysisException {
5778
TFileFormatType fileFormatType = TFileFormatType.FORMAT_UNKNOWN;
5879
String lowerCase = format.toLowerCase();

fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)
150150
}
151151
locationParams.setFileType(fileType);
152152
tSink.setLocation(locationParams);
153+
if (fileType.equals(TFileType.FILE_BROKER)) {
154+
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
155+
}
153156

154157
tSink.setHadoopConfig(targetTable.getHadoopProperties());
155158

fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.doris.thrift.TDataSinkType;
2828
import org.apache.doris.thrift.TExplainLevel;
2929
import org.apache.doris.thrift.TFileFormatType;
30+
import org.apache.doris.thrift.TFileType;
3031
import org.apache.doris.thrift.TIcebergTableSink;
3132
import org.apache.doris.thrift.TSortField;
3233

@@ -134,7 +135,11 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)
134135
LocationPath locationPath = new LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps);
135136
tSink.setOutputPath(locationPath.toStorageLocation().toString());
136137
tSink.setOriginalOutputPath(locationPath.getPath().toString());
137-
tSink.setFileType(locationPath.getTFileTypeForBE());
138+
TFileType fileType = locationPath.getTFileTypeForBE();
139+
tSink.setFileType(fileType);
140+
if (fileType.equals(TFileType.FILE_BROKER)) {
141+
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
142+
}
138143

139144
if (insertCtx.isPresent()) {
140145
BaseExternalTableInsertCommandContext context = (BaseExternalTableInsertCommandContext) insertCtx.get();

gensrc/thrift/DataSinks.thrift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ struct THiveTableSink {
355355
9: optional map<string, string> hadoop_config
356356
10: optional bool overwrite
357357
11: optional THiveSerDeProperties serde_properties
358+
12: optional list<Types.TNetworkAddress> broker_addresses;
358359
}
359360

360361
enum TUpdateMode {
@@ -415,6 +416,7 @@ struct TIcebergTableSink {
415416
11: optional Types.TFileType file_type
416417
12: optional string original_output_path
417418
13: optional PlanNodes.TFileCompressType compression_type
419+
14: optional list<Types.TNetworkAddress> broker_addresses;
418420
}
419421

420422
enum TDictLayoutType {

0 commit comments

Comments
 (0)