Skip to content

Commit 12eb189

Browse files
Merge pull request #21 from Haox-liu/streamLoader_http_timeout
Fix stream loader http hang
2 parents 206dc62 + 84d8f7d commit 12eb189

File tree

3 files changed

+28
-0
lines changed

3 files changed

+28
-0
lines changed

src/main/assembly/plugin.conf

+6
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,11 @@ secret_key=
5454
# Whether to generate sql digest for all queries
5555
enable_compute_all_query_digest=false
5656

57+
# Http connectTimeout ms
58+
connect_timeout=1000
59+
60+
# Http readTimeout ms
61+
read_timeout=1000
62+
5763
# Filter conditions when importing audit information
5864
filter=

src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java

+11
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,8 @@ public static class AuditLoaderConf {
308308
public static final String QE_SLOW_LOG_MS = "qe_slow_log_ms";
309309
public static final String MAX_QUEUE_SIZE = "max_queue_size";
310310
public static final String ENABLE_COMPUTE_ALL_QUERY_DIGEST = "enable_compute_all_query_digest";
311+
public static final String CONNECT_TIMEOUT = "connect_timeout";
312+
public static final String READ_TIMEOUT = "read_timeout";
311313

312314
public static final String STREAM_LOAD_FILTER = "filter";
313315

@@ -325,6 +327,9 @@ public static class AuditLoaderConf {
325327
public int maxQueueSize = 1000;
326328

327329
public boolean enableComputeAllQueryDigest = false;
330+
331+
public int connectTimeout = 1000;
332+
public int readTimeout = 1000;
328333
public String streamLoadFilter = "";
329334

330335
public static final String PROP_SECRET_KEY = "secret_key";
@@ -374,6 +379,12 @@ public void init(Map<String, String> properties) throws PluginException {
374379
if (properties.containsKey(STREAM_LOAD_FILTER)) {
375380
streamLoadFilter = properties.get(STREAM_LOAD_FILTER);
376381
}
382+
if (properties.containsKey(CONNECT_TIMEOUT)) {
383+
connectTimeout = Integer.parseInt(properties.get(CONNECT_TIMEOUT));
384+
}
385+
if (properties.containsKey(READ_TIMEOUT)) {
386+
readTimeout = Integer.parseInt(properties.get(READ_TIMEOUT));
387+
}
377388
} catch (Exception e) {
378389
throw new PluginException(e.getMessage());
379390
}

src/main/java/com/starrocks/plugin/audit/StarrocksStreamLoader.java

+11
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ public class StarrocksStreamLoader {
4444
private String loadUrlStr;
4545
private String authEncoding;
4646
private String feIdentity;
47+
48+
private int connectTimeout;
49+
private int readTimeout;
50+
4751
private String streamLoadFilter;
4852

4953
public StarrocksStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) {
@@ -65,6 +69,10 @@ public StarrocksStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) {
6569
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
6670
// currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label
6771
this.feIdentity = conf.feIdentity.replaceAll("\\.", "_");
72+
73+
this.connectTimeout = conf.connectTimeout;
74+
this.readTimeout = conf.readTimeout;
75+
6876
this.streamLoadFilter = conf.streamLoadFilter;
6977
}
7078

@@ -89,6 +97,9 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx
8997
conn.setDoOutput(true);
9098
conn.setDoInput(true);
9199

100+
conn.setConnectTimeout(connectTimeout);
101+
conn.setReadTimeout(readTimeout);
102+
92103
return conn;
93104
}
94105

0 commit comments

Comments
 (0)