Skip to content

Commit ffc3b93

Browse files
EvenLjjliujianjun.ljj
and
liujianjun.ljj
authored
Mesh Registry support custom group for sofa registry. (#1454)
Co-authored-by: liujianjun.ljj <[email protected]>
1 parent 60433bc commit ffc3b93

File tree

6 files changed

+92
-9
lines changed

6 files changed

+92
-9
lines changed

registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistry.java

+18
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alipay.sofa.rpc.client.ProviderInfo;
2121
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
2222
import com.alipay.sofa.rpc.common.utils.CommonUtils;
23+
import com.alipay.sofa.rpc.common.utils.StringUtils;
2324
import com.alipay.sofa.rpc.config.ConsumerConfig;
2425
import com.alipay.sofa.rpc.config.ProviderConfig;
2526
import com.alipay.sofa.rpc.config.RegistryConfig;
@@ -184,6 +185,10 @@ protected PublishServiceRequest buildPublishServiceRequest(String serviceName, S
184185
providerMetaInfo.setVersion(VERSION);
185186
providerMetaInfo.setProperties(providerInfo.getStaticAttrs());
186187
publishServiceRequest.setProviderMetaInfo(providerMetaInfo);
188+
String group = providerInfo.getStaticAttrs().get(SofaRegistryConstants.SOFA_GROUP_KEY);
189+
if (StringUtils.isNotBlank(group)) {
190+
publishServiceRequest.setGroup(group);
191+
}
187192
return publishServiceRequest;
188193
}
189194

@@ -233,6 +238,10 @@ protected UnPublishServiceRequest buildUnPublishServiceRequest(String serviceNam
233238
UnPublishServiceRequest unPublishServiceRequest = new UnPublishServiceRequest();
234239
unPublishServiceRequest.setServiceName(serviceName);
235240
unPublishServiceRequest.setProtocolType(providerInfo.getProtocolType());
241+
String group = providerInfo.getStaticAttr(SofaRegistryConstants.SOFA_GROUP_KEY);
242+
if (StringUtils.isNotBlank(group)) {
243+
unPublishServiceRequest.setGroup(group);
244+
}
236245
return unPublishServiceRequest;
237246
}
238247

@@ -303,6 +312,11 @@ protected SubscribeServiceRequest buildSubscribeServiceRequest(ConsumerConfig co
303312
SubscribeServiceRequest subscribeRequest = new SubscribeServiceRequest();
304313
subscribeRequest.setServiceName(key);
305314
subscribeRequest.setProtocolType(consumerConfig.getProtocol());
315+
subscribeRequest.setProperties(consumerConfig.getParameters());
316+
String group = consumerConfig.getParameter(SofaRegistryConstants.SOFA_GROUP_KEY);
317+
if (StringUtils.isNotBlank(group)) {
318+
subscribeRequest.setGroup(group);
319+
}
306320
return subscribeRequest;
307321
}
308322

@@ -368,6 +382,10 @@ protected UnSubscribeServiceRequest buildUnSubscribeServiceRequest(ConsumerConfi
368382
String key = MeshRegistryHelper.buildMeshKey(config, config.getProtocol());
369383
unsubscribeRequest.setServiceName(key);
370384
unsubscribeRequest.setProtocolType(config.getProtocol());
385+
String group = config.getParameter(SofaRegistryConstants.SOFA_GROUP_KEY);
386+
if (StringUtils.isNotBlank(group)) {
387+
unsubscribeRequest.setGroup(group);
388+
}
371389
return unsubscribeRequest;
372390
}
373391

registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/PublishServiceRequest.java

+11
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public class PublishServiceRequest {
3232

3333
private boolean onlyPublishInCloud;
3434

35+
private String group;
36+
3537
public String getServiceName() {
3638
return serviceName;
3739
}
@@ -64,13 +66,22 @@ public void setOnlyPublishInCloud(boolean onlyPublishInCloud) {
6466
this.onlyPublishInCloud = onlyPublishInCloud;
6567
}
6668

69+
public String getGroup() {
70+
return group;
71+
}
72+
73+
public void setGroup(String group) {
74+
this.group = group;
75+
}
76+
6777
@Override
6878
public String toString() {
6979
final StringBuffer sb = new StringBuffer("PublishServiceRequest{");
7080
sb.append("serviceName='").append(serviceName).append('\'');
7181
sb.append(", protocolType='").append(protocolType).append('\'');
7282
sb.append(", providerMetaInfo=").append(providerMetaInfo);
7383
sb.append(", onlyPublishInCloud=").append(onlyPublishInCloud);
84+
sb.append(", group='").append(group).append('\'');
7485
sb.append('}');
7586
return sb.toString();
7687
}

registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/SubscribeServiceRequest.java

+29-6
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,31 @@
1616
*/
1717
package com.alipay.sofa.rpc.registry.mesh.model;
1818

19+
import java.util.Map;
20+
1921
/**
2022
* @author bystander
2123
* @version $Id: PublishServiceRequest.java, v 0.1 2018年04月03日 11:27 AM bystander Exp $
2224
*/
2325
public class SubscribeServiceRequest {
2426

25-
private String serviceName;
27+
private String serviceName;
2628

2729
//这个值是类似DEFAULT/XFIRE这种,也有可能是tr
28-
private String protocolType;
30+
private String protocolType;
2931

3032
//this should be xxx-pool.alipay.com or xxx.alipay.com,can be null
31-
private String targetAppAddress;
33+
private String targetAppAddress;
34+
35+
private boolean vipEnforce;
36+
37+
private boolean vipOnly;
3238

33-
private boolean vipEnforce;
39+
private boolean localCloudFirst;
3440

35-
private boolean vipOnly;
41+
private String group;
3642

37-
private boolean localCloudFirst;
43+
private Map<String, String> properties;
3844

3945
public String getServiceName() {
4046
return serviceName;
@@ -84,6 +90,22 @@ public void setLocalCloudFirst(boolean localCloudFirst) {
8490
this.localCloudFirst = localCloudFirst;
8591
}
8692

93+
public String getGroup() {
94+
return group;
95+
}
96+
97+
public void setGroup(String group) {
98+
this.group = group;
99+
}
100+
101+
public Map<String, String> getProperties() {
102+
return properties;
103+
}
104+
105+
public void setProperties(Map<String, String> properties) {
106+
this.properties = properties;
107+
}
108+
87109
@Override
88110
public String toString() {
89111
final StringBuffer sb = new StringBuffer("SubscribeServiceRequest{");
@@ -93,6 +115,7 @@ public String toString() {
93115
sb.append(", vipEnforce=").append(vipEnforce);
94116
sb.append(", vipOnly=").append(vipOnly);
95117
sb.append(", localCloudFirst=").append(localCloudFirst);
118+
sb.append(", group='").append(group).append('\'');
96119
sb.append('}');
97120
return sb.toString();
98121
}

registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnPublishServiceRequest.java

+10
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public class UnPublishServiceRequest {
2727
//这个值是类似DEFAULT/XFIRE这种,也有可能是tr
2828
private String protocolType;
2929

30+
private String group;
31+
3032
public String getServiceName() {
3133
return serviceName;
3234
}
@@ -43,6 +45,14 @@ public void setProtocolType(String protocolType) {
4345
this.protocolType = protocolType;
4446
}
4547

48+
public String getGroup() {
49+
return group;
50+
}
51+
52+
public void setGroup(String group) {
53+
this.group = group;
54+
}
55+
4656
@Override
4757
public String toString() {
4858
final StringBuffer sb = new StringBuffer("UnPublishServiceRequest{");

registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnSubscribeServiceRequest.java

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class UnSubscribeServiceRequest {
3030
//这个值是类似DEFAULT/XFIRE这种,也有可能是tr
3131
private String protocolType;
3232

33+
private String group;
34+
3335
public String getServiceName() {
3436
return serviceName;
3537
}
@@ -54,12 +56,21 @@ public void setProtocolType(String protocolType) {
5456
this.protocolType = protocolType;
5557
}
5658

59+
public String getGroup() {
60+
return group;
61+
}
62+
63+
public void setGroup(String group) {
64+
this.group = group;
65+
}
66+
5767
@Override
5868
public String toString() {
5969
final StringBuffer sb = new StringBuffer("UnSubscribeServiceRequest{");
6070
sb.append("serviceName='").append(serviceName).append('\'');
6171
sb.append(", targetAppAddress='").append(targetAppAddress).append('\'');
6272
sb.append(", protocolType='").append(protocolType).append('\'');
73+
sb.append(", group='").append(group).append('\'');
6374
sb.append('}');
6475
return sb.toString();
6576
}

registry/registry-mesh/src/test/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistryTest.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ public void testOnlyPublish() throws InterruptedException {
167167
public void testAll() throws Exception {
168168

169169
int timeoutPerSub = 1000;
170+
Map<String, String> parameter = new HashMap<>();
171+
parameter.put(SofaRegistryConstants.SOFA_GROUP_KEY, "SOFA_TEST");
170172

171173
ServerConfig serverConfig = new ServerConfig()
172174
.setProtocol("bolt")
@@ -183,7 +185,8 @@ public void testAll() throws Exception {
183185
.setSerialization("hessian2")
184186
.setServer(serverConfig)
185187
.setWeight(222)
186-
.setTimeout(3000);
188+
.setTimeout(3000)
189+
.setParameters(parameter);
187190

188191
// 注册
189192
registry.register(provider);
@@ -196,7 +199,8 @@ public void testAll() throws Exception {
196199
.setSubscribe(true)
197200
.setSerialization("java")
198201
.setInvokeType("sync")
199-
.setTimeout(4444);
202+
.setTimeout(4444)
203+
.setParameters(parameter);
200204

201205
String tag0 = MeshRegistryHelper.buildMeshKey(provider, serverConfig.getProtocol());
202206
String tag1 = MeshRegistryHelper.buildMeshKey(consumer, consumer.getProtocol());
@@ -205,6 +209,7 @@ public void testAll() throws Exception {
205209
PublishServiceRequest publishServiceRequest = registry.buildPublishServiceRequest(tag0,
206210
serverConfig.getProtocol(), providerInfo, "test-server");
207211
Assert.assertEquals(serverConfig.getProtocol(), publishServiceRequest.getProtocolType());
212+
Assert.assertEquals("SOFA_TEST", publishServiceRequest.getGroup());
208213

209214
// 订阅
210215
MeshRegistryTest.MockProviderInfoListener providerInfoListener = new MeshRegistryTest.MockProviderInfoListener();
@@ -216,6 +221,8 @@ public void testAll() throws Exception {
216221
Assert.assertTrue(ps.toString(), ps.size() == 1);
217222
SubscribeServiceRequest subscribeServiceRequest = registry.buildSubscribeServiceRequest(consumer);
218223
Assert.assertEquals(consumer.getProtocol(), subscribeServiceRequest.getProtocolType());
224+
Assert.assertEquals("SOFA_TEST", subscribeServiceRequest.getGroup());
225+
Assert.assertNotNull(subscribeServiceRequest.getProperties());
219226

220227
// 反注册
221228
CountDownLatch latch = new CountDownLatch(1);
@@ -226,6 +233,7 @@ public void testAll() throws Exception {
226233
Assert.assertTrue(ps.size() == 1);
227234
UnPublishServiceRequest unPublishServiceRequest = registry.buildUnPublishServiceRequest(tag0, providerInfo);
228235
Assert.assertEquals(serverConfig.getProtocol(), unPublishServiceRequest.getProtocolType());
236+
Assert.assertEquals("SOFA_TEST", unPublishServiceRequest.getGroup());
229237

230238
// 一次发2个端口的再次注册
231239
latch = new CountDownLatch(1);
@@ -246,7 +254,8 @@ public void testAll() throws Exception {
246254
.setSubscribe(true)
247255
.setSerialization("java")
248256
.setInvokeType("sync")
249-
.setTimeout(4444);
257+
.setTimeout(4444)
258+
.setParameters(parameter);
250259
CountDownLatch latch2 = new CountDownLatch(1);
251260
MeshRegistryTest.MockProviderInfoListener providerInfoListener2 = new MeshRegistryTest.MockProviderInfoListener();
252261
providerInfoListener2.setCountDownLatch(latch2);
@@ -261,6 +270,7 @@ public void testAll() throws Exception {
261270
registry.unSubscribe(consumer);
262271
UnSubscribeServiceRequest unSubscribeServiceRequest = registry.buildUnSubscribeServiceRequest(consumer);
263272
Assert.assertEquals(consumer.getProtocol(), unSubscribeServiceRequest.getProtocolType());
273+
Assert.assertEquals("SOFA_TEST", unSubscribeServiceRequest.getGroup());
264274

265275
// 批量反注册,判断订阅者2的数据
266276
latch = new CountDownLatch(1);

0 commit comments

Comments
 (0)