Skip to content

[Fix](multicatelog) Fix insert iceberg/hive table when use broker #51187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil

io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
if (!_write_info.broker_addresses.empty()) {
fs_properties.broker_addresses = &(_write_info.broker_addresses);
}
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()),
.fs_name {}};
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class VIcebergPartitionWriter {
std::string original_write_path;
std::string target_path;
TFileType::type file_type;
std::vector<TNetworkAddress> broker_addresses;
};

VIcebergPartitionWriter(const TDataSink& t_sink, std::vector<std::string> partition_values,
Expand Down
12 changes: 9 additions & 3 deletions be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,15 @@ std::shared_ptr<VIcebergPartitionWriter> VIcebergTableWriter::_create_partition_
write_path = output_path;
}

VIcebergPartitionWriter::WriteInfo write_info = {
std::move(write_path), std::move(original_write_path), std::move(target_path),
iceberg_table_sink.file_type};
VIcebergPartitionWriter::WriteInfo write_info = {std::move(write_path),
std::move(original_write_path),
std::move(target_path),
iceberg_table_sink.file_type,
{}};
if (iceberg_table_sink.__isset.broker_addresses) {
write_info.broker_addresses.assign(iceberg_table_sink.broker_addresses.begin(),
iceberg_table_sink.broker_addresses.end());
}

_write_file_count++;
std::vector<std::string> column_names;
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)

io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
if (!_write_info.broker_addresses.empty()) {
fs_properties.broker_addresses = &(_write_info.broker_addresses);
}
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()),
.fs_name {}};
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vhive_partition_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class VHivePartitionWriter {
std::string original_write_path;
std::string target_path;
TFileType::type file_type;
std::vector<TNetworkAddress> broker_addresses;
};

VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,
Expand Down
46 changes: 34 additions & 12 deletions be/src/vec/sink/writer/vhive_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,30 +302,42 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
if (existing_table == false) { // new table
update_mode = TUpdateMode::NEW;
if (_partition_columns_input_index.empty()) { // new unpartitioned table
write_info = {write_location.write_path, write_location.original_write_path,
write_location.target_path, write_location.file_type};
write_info = {write_location.write_path,
write_location.original_write_path,
write_location.target_path,
write_location.file_type,
{}};
} else { // a new partition in a new partitioned table
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
write_info = {std::move(write_path),
std::move(original_write_path),
std::move(target_path),
write_location.file_type,
{}};
}
} else { // a new partition in an existing partitioned table, or an existing unpartitioned table
if (_partition_columns_input_index.empty()) { // an existing unpartitioned table
update_mode =
!hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE;
write_info = {write_location.write_path, write_location.original_write_path,
write_location.target_path, write_location.file_type};
write_info = {write_location.write_path,
write_location.original_write_path,
write_location.target_path,
write_location.file_type,
{}};
} else { // a new partition in an existing partitioned table
update_mode = TUpdateMode::NEW;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
write_info = {std::move(write_path),
std::move(original_write_path),
std::move(target_path),
write_location.file_type,
{}};
}
// need to get schema from existing table ?
}
Expand All @@ -338,8 +350,11 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}", existing_partition->location.target_path);
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), existing_partition->location.file_type};
write_info = {std::move(write_path),
std::move(original_write_path),
std::move(target_path),
existing_partition->location.file_type,
{}};
file_format_type = existing_partition->file_format;
write_compress_type = hive_table_sink.compression_type;
} else {
Expand All @@ -348,13 +363,20 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
write_info = {std::move(write_path),
std::move(original_write_path),
std::move(target_path),
write_location.file_type,
{}};
file_format_type = hive_table_sink.file_format;
write_compress_type = hive_table_sink.compression_type;
// need to get schema from existing table ?
}
}
if (hive_table_sink.__isset.broker_addresses) {
write_info.broker_addresses.assign(hive_table_sink.broker_addresses.begin(),
hive_table_sink.broker_addresses.end());
}

_write_file_count++;
std::vector<std::string> column_names;
Expand Down
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ public boolean containsBroker(String brokerName) {
}
}

public List<FsBroker> getBrokers(String brokerName) {
List<FsBroker> result = null;
lock.lock();
try {
List<FsBroker> brokerList = brokerListMap.get(brokerName);
if (brokerList == null || brokerList.isEmpty()) {
return null;
}
result = new ArrayList<>(brokerList);
} finally {
lock.unlock();
}
return result;
}

public FsBroker getAnyBroker(String brokerName) {
lock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@

package org.apache.doris.planner;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TNetworkAddress;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public abstract class BaseExternalTableDataSink extends DataSink {

Expand All @@ -53,6 +59,21 @@ public DataPartition getOutputPartition() {
*/
protected abstract Set<TFileFormatType> supportedFileFormatTypes();

protected List<TNetworkAddress> getBrokerAddresses(String bindBroker) throws AnalysisException {
List<FsBroker> brokers;
if (bindBroker != null) {
brokers = Env.getCurrentEnv().getBrokerMgr().getBrokers(bindBroker);
} else {
brokers = Env.getCurrentEnv().getBrokerMgr().getAllBrokers();
}
if (brokers == null || brokers.isEmpty()) {
throw new AnalysisException("No alive broker.");
}
Collections.shuffle(brokers);
return brokers.stream().map(broker -> new TNetworkAddress(broker.host, broker.port))
.collect(Collectors.toList());
}

protected TFileFormatType getTFileFormatType(String format) throws AnalysisException {
TFileFormatType fileFormatType = TFileFormatType.FORMAT_UNKNOWN;
String lowerCase = format.toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)
}
locationParams.setFileType(fileType);
tSink.setLocation(locationParams);
if (fileType.equals(TFileType.FILE_BROKER)) {
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
}

tSink.setHadoopConfig(targetTable.getHadoopProperties());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TIcebergTableSink;
import org.apache.doris.thrift.TSortField;

Expand Down Expand Up @@ -134,7 +135,11 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)
LocationPath locationPath = new LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps);
tSink.setOutputPath(locationPath.toStorageLocation().toString());
tSink.setOriginalOutputPath(locationPath.getPath().toString());
tSink.setFileType(locationPath.getTFileTypeForBE());
TFileType fileType = locationPath.getTFileTypeForBE();
tSink.setFileType(fileType);
if (fileType.equals(TFileType.FILE_BROKER)) {
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
}

if (insertCtx.isPresent()) {
BaseExternalTableInsertCommandContext context = (BaseExternalTableInsertCommandContext) insertCtx.get();
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ struct THiveTableSink {
9: optional map<string, string> hadoop_config
10: optional bool overwrite
11: optional THiveSerDeProperties serde_properties
12: optional list<Types.TNetworkAddress> broker_addresses;
}

enum TUpdateMode {
Expand Down Expand Up @@ -415,6 +416,7 @@ struct TIcebergTableSink {
11: optional Types.TFileType file_type
12: optional string original_output_path
13: optional PlanNodes.TFileCompressType compression_type
14: optional list<Types.TNetworkAddress> broker_addresses;
}

enum TDictLayoutType {
Expand Down
Loading