Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mesh Registry support custom group for sofa registry. #1454

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.RegistryConfig;
Expand Down Expand Up @@ -184,6 +185,10 @@ protected PublishServiceRequest buildPublishServiceRequest(String serviceName, S
providerMetaInfo.setVersion(VERSION);
providerMetaInfo.setProperties(providerInfo.getStaticAttrs());
publishServiceRequest.setProviderMetaInfo(providerMetaInfo);
String group = providerInfo.getStaticAttrs().get(SofaRegistryConstants.SOFA_GROUP_KEY);
if (StringUtils.isNotBlank(group)) {
publishServiceRequest.setGroup(group);
}
return publishServiceRequest;
}

Expand Down Expand Up @@ -233,6 +238,10 @@ protected UnPublishServiceRequest buildUnPublishServiceRequest(String serviceNam
UnPublishServiceRequest unPublishServiceRequest = new UnPublishServiceRequest();
unPublishServiceRequest.setServiceName(serviceName);
unPublishServiceRequest.setProtocolType(providerInfo.getProtocolType());
String group = providerInfo.getStaticAttr(SofaRegistryConstants.SOFA_GROUP_KEY);
if (StringUtils.isNotBlank(group)) {
unPublishServiceRequest.setGroup(group);
}
return unPublishServiceRequest;
}

Expand Down Expand Up @@ -303,6 +312,11 @@ protected SubscribeServiceRequest buildSubscribeServiceRequest(ConsumerConfig co
SubscribeServiceRequest subscribeRequest = new SubscribeServiceRequest();
subscribeRequest.setServiceName(key);
subscribeRequest.setProtocolType(consumerConfig.getProtocol());
subscribeRequest.setProperties(consumerConfig.getParameters());
String group = consumerConfig.getParameter(SofaRegistryConstants.SOFA_GROUP_KEY);
if (StringUtils.isNotBlank(group)) {
subscribeRequest.setGroup(group);
}
return subscribeRequest;
}

Expand Down Expand Up @@ -368,6 +382,10 @@ protected UnSubscribeServiceRequest buildUnSubscribeServiceRequest(ConsumerConfi
String key = MeshRegistryHelper.buildMeshKey(config, config.getProtocol());
unsubscribeRequest.setServiceName(key);
unsubscribeRequest.setProtocolType(config.getProtocol());
String group = config.getParameter(SofaRegistryConstants.SOFA_GROUP_KEY);
if (StringUtils.isNotBlank(group)) {
unsubscribeRequest.setGroup(group);
}
return unsubscribeRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class PublishServiceRequest {

private boolean onlyPublishInCloud;

private String group;

public String getServiceName() {
return serviceName;
}
Expand Down Expand Up @@ -64,13 +66,22 @@ public void setOnlyPublishInCloud(boolean onlyPublishInCloud) {
this.onlyPublishInCloud = onlyPublishInCloud;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

@Override
public String toString() {
final StringBuffer sb = new StringBuffer("PublishServiceRequest{");
sb.append("serviceName='").append(serviceName).append('\'');
sb.append(", protocolType='").append(protocolType).append('\'');
sb.append(", providerMetaInfo=").append(providerMetaInfo);
sb.append(", onlyPublishInCloud=").append(onlyPublishInCloud);
sb.append(", group='").append(group).append('\'');
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@
*/
package com.alipay.sofa.rpc.registry.mesh.model;

import java.util.Map;

/**
* @author bystander
* @version $Id: PublishServiceRequest.java, v 0.1 2018年04月03日 11:27 AM bystander Exp $
*/
public class SubscribeServiceRequest {

private String serviceName;
private String serviceName;

//这个值是类似DEFAULT/XFIRE这种,也有可能是tr
private String protocolType;
private String protocolType;

//this should be xxx-pool.alipay.com or xxx.alipay.com,can be null
private String targetAppAddress;
private String targetAppAddress;

private boolean vipEnforce;

private boolean vipOnly;

private boolean vipEnforce;
private boolean localCloudFirst;

private boolean vipOnly;
private String group;

private boolean localCloudFirst;
private Map<String, String> properties;

public String getServiceName() {
return serviceName;
Expand Down Expand Up @@ -84,6 +90,22 @@ public void setLocalCloudFirst(boolean localCloudFirst) {
this.localCloudFirst = localCloudFirst;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

public Map<String, String> getProperties() {
return properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}

@Override
public String toString() {
final StringBuffer sb = new StringBuffer("SubscribeServiceRequest{");
Expand All @@ -93,6 +115,7 @@ public String toString() {
sb.append(", vipEnforce=").append(vipEnforce);
sb.append(", vipOnly=").append(vipOnly);
sb.append(", localCloudFirst=").append(localCloudFirst);
sb.append(", group='").append(group).append('\'');
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class UnPublishServiceRequest {
//这个值是类似DEFAULT/XFIRE这种,也有可能是tr
private String protocolType;

private String group;

public String getServiceName() {
return serviceName;
}
Expand All @@ -43,6 +45,14 @@ public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

@Override
public String toString() {
final StringBuffer sb = new StringBuffer("UnPublishServiceRequest{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class UnSubscribeServiceRequest {
//这个值是类似DEFAULT/XFIRE这种,也有可能是tr
private String protocolType;

private String group;

public String getServiceName() {
return serviceName;
}
Expand All @@ -54,12 +56,21 @@ public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

@Override
public String toString() {
final StringBuffer sb = new StringBuffer("UnSubscribeServiceRequest{");
sb.append("serviceName='").append(serviceName).append('\'');
sb.append(", targetAppAddress='").append(targetAppAddress).append('\'');
sb.append(", protocolType='").append(protocolType).append('\'');
sb.append(", group='").append(group).append('\'');
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public void testOnlyPublish() throws InterruptedException {
public void testAll() throws Exception {

int timeoutPerSub = 1000;
Map<String, String> parameter = new HashMap<>();
parameter.put(SofaRegistryConstants.SOFA_GROUP_KEY, "SOFA_TEST");

ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt")
Expand All @@ -183,7 +185,8 @@ public void testAll() throws Exception {
.setSerialization("hessian2")
.setServer(serverConfig)
.setWeight(222)
.setTimeout(3000);
.setTimeout(3000)
.setParameters(parameter);

// 注册
registry.register(provider);
Expand All @@ -196,7 +199,8 @@ public void testAll() throws Exception {
.setSubscribe(true)
.setSerialization("java")
.setInvokeType("sync")
.setTimeout(4444);
.setTimeout(4444)
.setParameters(parameter);

String tag0 = MeshRegistryHelper.buildMeshKey(provider, serverConfig.getProtocol());
String tag1 = MeshRegistryHelper.buildMeshKey(consumer, consumer.getProtocol());
Expand All @@ -205,6 +209,7 @@ public void testAll() throws Exception {
PublishServiceRequest publishServiceRequest = registry.buildPublishServiceRequest(tag0,
serverConfig.getProtocol(), providerInfo, "test-server");
Assert.assertEquals(serverConfig.getProtocol(), publishServiceRequest.getProtocolType());
Assert.assertEquals("SOFA_TEST", publishServiceRequest.getGroup());

// 订阅
MeshRegistryTest.MockProviderInfoListener providerInfoListener = new MeshRegistryTest.MockProviderInfoListener();
Expand All @@ -216,6 +221,8 @@ public void testAll() throws Exception {
Assert.assertTrue(ps.toString(), ps.size() == 1);
SubscribeServiceRequest subscribeServiceRequest = registry.buildSubscribeServiceRequest(consumer);
Assert.assertEquals(consumer.getProtocol(), subscribeServiceRequest.getProtocolType());
Assert.assertEquals("SOFA_TEST", subscribeServiceRequest.getGroup());
Assert.assertNotNull(subscribeServiceRequest.getProperties());

// 反注册
CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -226,6 +233,7 @@ public void testAll() throws Exception {
Assert.assertTrue(ps.size() == 1);
UnPublishServiceRequest unPublishServiceRequest = registry.buildUnPublishServiceRequest(tag0, providerInfo);
Assert.assertEquals(serverConfig.getProtocol(), unPublishServiceRequest.getProtocolType());
Assert.assertEquals("SOFA_TEST", unPublishServiceRequest.getGroup());

// 一次发2个端口的再次注册
latch = new CountDownLatch(1);
Expand All @@ -246,7 +254,8 @@ public void testAll() throws Exception {
.setSubscribe(true)
.setSerialization("java")
.setInvokeType("sync")
.setTimeout(4444);
.setTimeout(4444)
.setParameters(parameter);
CountDownLatch latch2 = new CountDownLatch(1);
MeshRegistryTest.MockProviderInfoListener providerInfoListener2 = new MeshRegistryTest.MockProviderInfoListener();
providerInfoListener2.setCountDownLatch(latch2);
Expand All @@ -261,6 +270,7 @@ public void testAll() throws Exception {
registry.unSubscribe(consumer);
UnSubscribeServiceRequest unSubscribeServiceRequest = registry.buildUnSubscribeServiceRequest(consumer);
Assert.assertEquals(consumer.getProtocol(), unSubscribeServiceRequest.getProtocolType());
Assert.assertEquals("SOFA_TEST", unSubscribeServiceRequest.getGroup());

// 批量反注册,判断订阅者2的数据
latch = new CountDownLatch(1);
Expand Down
Loading