Skip to content

Commit 5ea8025

Browse files
authored
[feat](refactor-param)Integrate New Storage System Support for BACKUP/RESTORE/LOAD/TVF (#50849)
Issue Number: #50238 #### Key Changes Enhanced core functionalities (BACKUP, RESTORE, LOAD, TVF) to support new storage parameters Unified storage path and parameter parsing logic across modules to enable compatibility with multiple storage backends (S3, OSS, COS, etc.). #### Details Refactored the storage parameter handling logic to automatically detect the schema and route to the corresponding storage system implementation. Each operation (BACKUP/RESTORE/LOAD/TVF) now uses a unified file system interface to ensure consistent behavior and extensibility. Maintains full backward compatibility with existing storage formats such as HDFS and local file systems. #### Tests Added comprehensive unit and integration tests covering: Storage parameter parsing across different systems Execution flow for each operation under new storage systems Edge cases including invalid parameters, permission errors, and non-existent paths
1 parent 29cb291 commit 5ea8025

File tree

82 files changed

+2857
-841
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+2857
-841
lines changed

fe/fe-core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ under the License.
102102
<groupId>commons-pool</groupId>
103103
<artifactId>commons-pool</artifactId>
104104
</dependency>
105+
<dependency>
106+
<groupId>com.google.re2j</groupId>
107+
<artifactId>re2j</artifactId>
108+
</dependency>
105109
<dependency>
106110
<groupId>org.apache.commons</groupId>
107111
<artifactId>commons-text</artifactId>

fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,19 @@
2020
import org.apache.doris.analysis.StorageBackend.StorageType;
2121
import org.apache.doris.catalog.Env;
2222
import org.apache.doris.common.FeMetaVersion;
23+
import org.apache.doris.common.UserException;
2324
import org.apache.doris.common.io.Text;
2425
import org.apache.doris.common.io.Writable;
2526
import org.apache.doris.common.util.PrintableMap;
26-
import org.apache.doris.datasource.property.S3ClientBEProperties;
27-
import org.apache.doris.datasource.property.constants.BosProperties;
28-
import org.apache.doris.fs.PersistentFileSystem;
27+
import org.apache.doris.datasource.property.storage.StorageProperties;
28+
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
2929
import org.apache.doris.persist.gson.GsonUtils;
3030
import org.apache.doris.thrift.TFileType;
3131

3232
import com.google.common.collect.Maps;
3333
import com.google.gson.annotations.SerializedName;
34+
import org.apache.commons.collections.MapUtils;
35+
import org.apache.commons.lang3.StringUtils;
3436
import org.apache.logging.log4j.LogManager;
3537
import org.apache.logging.log4j.Logger;
3638

@@ -53,6 +55,7 @@ public class BrokerDesc extends StorageDesc implements Writable {
5355
// just for multi load
5456
public static final String MULTI_LOAD_BROKER = "__DORIS_MULTI_LOAD_BROKER__";
5557
public static final String MULTI_LOAD_BROKER_BACKEND_KEY = "__DORIS_MULTI_LOAD_BROKER_BACKEND__";
58+
@Deprecated
5659
@SerializedName("cts3")
5760
private boolean convertedToS3 = false;
5861

@@ -75,42 +78,56 @@ public BrokerDesc(String name, Map<String, String> properties) {
7578
if (properties != null) {
7679
this.properties.putAll(properties);
7780
}
81+
// Assume the storage type is BROKER by default
82+
// If it's a multi-load broker, override the storage type to LOCAL
7883
if (isMultiLoadBroker()) {
7984
this.storageType = StorageBackend.StorageType.LOCAL;
8085
} else {
8186
this.storageType = StorageBackend.StorageType.BROKER;
8287
}
83-
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
84-
this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, this.storageType);
85-
if (this.convertedToS3) {
86-
this.storageType = StorageBackend.StorageType.S3;
88+
89+
// Try to determine the actual storage type from properties if available
90+
if (MapUtils.isNotEmpty(this.properties)) {
91+
try {
92+
// Create primary storage properties from the given configuration
93+
this.storageProperties = StorageProperties.createPrimary(this.properties);
94+
// Override the storage type based on property configuration
95+
this.storageType = StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
96+
} catch (StoragePropertiesException e) {
97+
// Currently ignored: these properties might be broker-specific.
98+
// Support for broker properties will be added in the future.
99+
LOG.info("Failed to create storage properties for broker: {}, properties: {}", name, properties, e);
100+
}
101+
}
102+
//only storage type is broker
103+
if (StringUtils.isBlank(this.name) && (this.getStorageType() != StorageType.BROKER)) {
104+
this.name = this.storageType().name();
87105
}
88106
}
89107

90108
public BrokerDesc(String name, StorageBackend.StorageType storageType, Map<String, String> properties) {
91109
this.name = name;
92110
this.properties = Maps.newHashMap();
111+
this.storageType = storageType;
93112
if (properties != null) {
94113
this.properties.putAll(properties);
95114
}
96-
this.storageType = storageType;
97-
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
98-
this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, this.storageType);
99-
if (this.convertedToS3) {
100-
this.storageType = StorageBackend.StorageType.S3;
115+
if (MapUtils.isNotEmpty(this.properties) && StorageType.REFACTOR_STORAGE_TYPES.contains(storageType)) {
116+
this.storageProperties = StorageProperties.createPrimary(properties);
101117
}
118+
102119
}
103120

104-
public String getFileLocation(String location) {
105-
return this.convertedToS3 ? BosProperties.convertPathToS3(location) : location;
121+
public String getFileLocation(String location) throws UserException {
122+
return (null != storageProperties) ? storageProperties.validateAndNormalizeUri(location) : location;
106123
}
107124

108125
public static BrokerDesc createForStreamLoad() {
109126
return new BrokerDesc("", StorageType.STREAM, null);
110127
}
111128

112129
public boolean isMultiLoadBroker() {
113-
return this.name.equalsIgnoreCase(MULTI_LOAD_BROKER);
130+
return StringUtils.isNotBlank(this.name) && this.name.equalsIgnoreCase(MULTI_LOAD_BROKER);
114131
}
115132

116133
public TFileType getFileType() {
@@ -150,16 +167,18 @@ public void readFields(DataInput in) throws IOException {
150167
final String val = Text.readString(in);
151168
properties.put(key, val);
152169
}
153-
StorageBackend.StorageType st = StorageBackend.StorageType.BROKER;
154-
String typeStr = properties.remove(PersistentFileSystem.STORAGE_TYPE);
155-
if (typeStr != null) {
170+
if (MapUtils.isNotEmpty(properties)) {
156171
try {
157-
st = StorageBackend.StorageType.valueOf(typeStr);
158-
} catch (IllegalArgumentException e) {
159-
LOG.warn("set to BROKER, because of exception", e);
172+
this.storageProperties = StorageProperties.createPrimary(properties);
173+
this.storageType = StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
174+
} catch (RuntimeException e) {
175+
// Currently ignored: these properties might be broker-specific.
176+
// Support for broker properties will be added in the future.
177+
LOG.warn("Failed to create storage properties for broker: {}, properties: {}", name, properties, e);
178+
this.storageType = StorageBackend.StorageType.BROKER;
160179
}
180+
161181
}
162-
storageType = st;
163182
}
164183

165184
public static BrokerDesc read(DataInput in) throws IOException {

fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java

Lines changed: 13 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,21 @@
2121
import org.apache.doris.catalog.Env;
2222
import org.apache.doris.catalog.KeysType;
2323
import org.apache.doris.catalog.OlapTable;
24-
import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB;
2524
import org.apache.doris.cloud.security.SecurityChecker;
26-
import org.apache.doris.cloud.storage.RemoteBase;
27-
import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
2825
import org.apache.doris.common.AnalysisException;
2926
import org.apache.doris.common.Config;
3027
import org.apache.doris.common.DdlException;
3128
import org.apache.doris.common.InternalErrorCode;
3229
import org.apache.doris.common.UserException;
3330
import org.apache.doris.common.util.PrintableMap;
3431
import org.apache.doris.common.util.TimeUtils;
35-
import org.apache.doris.datasource.property.constants.AzureProperties;
36-
import org.apache.doris.datasource.property.constants.S3Properties;
32+
import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
33+
import org.apache.doris.fsv2.FileSystemFactory;
3734
import org.apache.doris.load.EtlJobType;
3835
import org.apache.doris.load.loadv2.LoadTask;
3936
import org.apache.doris.mysql.privilege.PrivPredicate;
4037
import org.apache.doris.qe.ConnectContext;
38+
import org.apache.doris.thrift.TFileType;
4139

4240
import com.google.common.base.Function;
4341
import com.google.common.base.Joiner;
@@ -102,15 +100,11 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser {
102100
// deprecated, keeping this property to make LoadStmt#checkProperties() happy
103101
public static final String USE_NEW_LOAD_SCAN_NODE = "use_new_load_scan_node";
104102

105-
// for load data from Baidu Object Store(BOS)
103+
// for load data from Baidu Object Store(BOS) todo wait new property support
106104
public static final String BOS_ENDPOINT = "bos_endpoint";
107105
public static final String BOS_ACCESSKEY = "bos_accesskey";
108106
public static final String BOS_SECRET_ACCESSKEY = "bos_secret_accesskey";
109107

110-
// for S3 load check
111-
public static final List<String> PROVIDERS =
112-
new ArrayList<>(Arrays.asList("cos", "oss", "s3", "obs", "bos", "azure"));
113-
114108
// mini load params
115109
public static final String KEY_IN_PARAM_COLUMNS = "columns";
116110
public static final String KEY_IN_PARAM_SET = "set";
@@ -454,8 +448,6 @@ public void analyze(Analyzer analyzer) throws UserException {
454448
for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
455449
String location = brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
456450
dataDescription.getFilePaths().set(i, location);
457-
StorageBackend.checkPath(dataDescription.getFilePaths().get(i),
458-
brokerDesc.getStorageType(), "DATA INFILE must be specified.");
459451
dataDescription.getFilePaths().set(i, dataDescription.getFilePaths().get(i));
460452
}
461453
}
@@ -522,27 +514,6 @@ public void analyze(Analyzer analyzer) throws UserException {
522514
user = ConnectContext.get().getQualifiedUser();
523515
}
524516

525-
526-
private String getProviderFromEndpoint() {
527-
Map<String, String> properties = brokerDesc.getProperties();
528-
for (Map.Entry<String, String> entry : properties.entrySet()) {
529-
if (entry.getKey().equalsIgnoreCase(S3Properties.PROVIDER)) {
530-
// S3 Provider properties should be case insensitive.
531-
return entry.getValue().toUpperCase();
532-
}
533-
}
534-
return S3Properties.S3_PROVIDER;
535-
}
536-
537-
private String getBucketFromFilePath(String filePath) throws Exception {
538-
String[] parts = filePath.split("\\/\\/");
539-
if (parts.length < 2) {
540-
throw new Exception("filePath is not valid");
541-
}
542-
String buckt = parts[1].split("\\/")[0];
543-
return buckt;
544-
}
545-
546517
public String getComment() {
547518
return comment;
548519
}
@@ -630,21 +601,17 @@ private void checkEndpoint(String endpoint) throws UserException {
630601
}
631602

632603
public void checkS3Param() throws UserException {
633-
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
634-
if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT)
635-
&& brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY)
636-
&& brokerDescProperties.containsKey(S3Properties.Env.SECRET_KEY)
637-
&& brokerDescProperties.containsKey(S3Properties.Env.REGION)) {
638-
String endpoint = brokerDescProperties.get(S3Properties.Env.ENDPOINT);
639-
endpoint = endpoint.replaceFirst("^http://", "");
640-
endpoint = endpoint.replaceFirst("^https://", "");
641-
brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint);
604+
if (brokerDesc.getFileType() != null && brokerDesc.getFileType().equals(TFileType.FILE_S3)) {
605+
606+
ObjectStorageProperties storageProperties = (ObjectStorageProperties) brokerDesc.getStorageProperties();
607+
String endpoint = storageProperties.getEndpoint();
608+
checkEndpoint(endpoint);
642609
checkWhiteList(endpoint);
643-
if (AzureProperties.checkAzureProviderPropertyExist(brokerDescProperties)) {
644-
return;
610+
//should add connectivity test
611+
boolean connectivityTest = FileSystemFactory.get(brokerDesc.getStorageProperties()).connectivityTest();
612+
if (!connectivityTest) {
613+
throw new UserException("Failed to access object storage, message=connectivity test failed");
645614
}
646-
checkEndpoint(endpoint);
647-
checkAkSk();
648615
}
649616
}
650617

@@ -657,47 +624,6 @@ public void checkWhiteList(String endpoint) throws UserException {
657624
}
658625
}
659626

660-
private void checkAkSk() throws UserException {
661-
RemoteBase remote = null;
662-
ObjectInfo objectInfo = null;
663-
String curFile = null;
664-
try {
665-
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
666-
String provider = getProviderFromEndpoint();
667-
for (DataDescription dataDescription : dataDescriptions) {
668-
for (String filePath : dataDescription.getFilePaths()) {
669-
curFile = filePath;
670-
String bucket = getBucketFromFilePath(filePath);
671-
objectInfo = new ObjectInfo(ObjectStoreInfoPB.Provider.valueOf(provider.toUpperCase()),
672-
brokerDescProperties.get(S3Properties.Env.ACCESS_KEY),
673-
brokerDescProperties.get(S3Properties.Env.SECRET_KEY),
674-
bucket, brokerDescProperties.get(S3Properties.Env.ENDPOINT),
675-
brokerDescProperties.get(S3Properties.Env.REGION), "");
676-
remote = RemoteBase.newInstance(objectInfo);
677-
// RemoteBase#headObject does not throw exception if key does not exist.
678-
remote.headObject("1");
679-
remote.listObjects(null);
680-
remote.close();
681-
}
682-
}
683-
} catch (Exception e) {
684-
LOG.warn("Failed to access object storage, file={}, proto={}, err={}", curFile, objectInfo, e.toString());
685-
String msg;
686-
if (e instanceof UserException) {
687-
msg = ((UserException) e).getDetailMessage();
688-
} else {
689-
msg = e.getMessage();
690-
}
691-
throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
692-
"Failed to access object storage, message=" + msg, e);
693-
} finally {
694-
if (remote != null) {
695-
remote.close();
696-
}
697-
}
698-
699-
}
700-
701627
@Override
702628
public StmtType stmtType() {
703629
return StmtType.LOAD;

0 commit comments

Comments
 (0)