Skip to content

Commit

Permalink
定义GRPC双向流接口
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xiao-shuang committed Jun 17, 2024
1 parent dfa7e3f commit 3afafe2
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ public class BrokerServiceManager {

private static final List<BindableService> BINDABLE_SERVICE_LIST = new ArrayList<>();

private BrokerServiceManager() {
}

public static class BrokerServiceManagerHolder {
private static final BrokerServiceManager INSTANCE = new BrokerServiceManager();
}

public static BrokerServiceManager getInstance() {
return BrokerServiceManagerHolder.INSTANCE;
}

public void initialized() {
this.bindService(new BrokerKvStorageService());
}
Expand All @@ -37,7 +48,7 @@ public void bindService(BindableService bindableService) {
BINDABLE_SERVICE_LIST.add(bindableService);
}

public static List<BindableService> getBindableServiceList() {
public List<BindableService> getBindableServiceList() {
return BINDABLE_SERVICE_LIST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.serviceplus.broker.register;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.serviceplus.broker.model.AdminService;
import org.springframework.stereotype.Component;
Expand All @@ -27,17 +28,18 @@
* @author lixiaoshuang
*/
@Component
@Slf4j
public class MemoryServiceRegisterCenter implements ServiceRegisterCenter {
/**
* 服务存储
* 应用名 -> 应用ip -> 服务列表
*/
public static final Map<String, Map<String, Set<AdminService>>> SERVICE_MAP = new ConcurrentHashMap<>();


@Override
public void registerService(String applicationName, String applicationIp, AdminService service) {
if (StringUtils.isBlank(applicationName) || StringUtils.isBlank(applicationIp) || service == null) {
log.error("register service error, applicationName: {}, applicationIp: {}, service: {}", applicationName, applicationIp, service);
return;
}
SERVICE_MAP.computeIfAbsent(applicationName, k -> new ConcurrentHashMap<>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
*
* @author lixiaoshuang
*/
public class BrokerGrpcServerBuilder extends ServerBuilder {
public class BrokerGrpcServerBuilder extends ServerBuilder<BrokerGrpcServerBuilder> {

private static final Logger LOGGER = LoggerFactory.getLogger(BrokerGrpcServerBuilder.class);

private final ServerBuilder<?> serverBuilder;

public BrokerGrpcServerBuilder(ServerBuilder<?> serverBuilder) {
this.serverBuilder = serverBuilder;
BrokerServiceManager brokerServiceManager = new BrokerServiceManager();
brokerServiceManager.initialized();
BrokerServiceManager.getInstance().initialized();
}

/**
Expand All @@ -60,7 +59,7 @@ public static BrokerGrpcServerBuilder forPort(int port) {
* @param bindableServices 服务实现
* @return StoreGrpcServerBuilder
*/
public ServerBuilder addService(List<BindableService> bindableServices) {
public BrokerGrpcServerBuilder addService(List<BindableService> bindableServices) {
for (BindableService bindableService : bindableServices) {
this.serverBuilder.addService(bindableService);
LOGGER.info("binding:" + bindableService.bindService().getServiceDescriptor().getName());
Expand All @@ -70,48 +69,48 @@ public ServerBuilder addService(List<BindableService> bindableServices) {


@Override
public ServerBuilder directExecutor() {
public BrokerGrpcServerBuilder directExecutor() {
return executor(MoreExecutors.directExecutor());
}

@Override
public ServerBuilder executor(@Nullable Executor executor) {
public BrokerGrpcServerBuilder executor(@Nullable Executor executor) {
this.serverBuilder.executor(executor);
return this;
}

@Override
public ServerBuilder addService(ServerServiceDefinition service) {
public BrokerGrpcServerBuilder addService(ServerServiceDefinition service) {
this.serverBuilder.addService(service);
return this;
}

@Override
public ServerBuilder addService(BindableService bindableService) {
public BrokerGrpcServerBuilder addService(BindableService bindableService) {
this.serverBuilder.addService(bindableService);
return this;
}

@Override
public ServerBuilder fallbackHandlerRegistry(@Nullable HandlerRegistry fallbackRegistry) {
public BrokerGrpcServerBuilder fallbackHandlerRegistry(@Nullable HandlerRegistry fallbackRegistry) {
this.serverBuilder.fallbackHandlerRegistry(fallbackRegistry);
return this;
}

@Override
public ServerBuilder useTransportSecurity(File certChain, File privateKey) {
public BrokerGrpcServerBuilder useTransportSecurity(File certChain, File privateKey) {
this.serverBuilder.useTransportSecurity(certChain, privateKey);
return this;
}

@Override
public ServerBuilder decompressorRegistry(@Nullable DecompressorRegistry registry) {
public BrokerGrpcServerBuilder decompressorRegistry(@Nullable DecompressorRegistry registry) {
this.serverBuilder.decompressorRegistry(registry);
return this;
}

@Override
public ServerBuilder compressorRegistry(@Nullable CompressorRegistry registry) {
public BrokerGrpcServerBuilder compressorRegistry(@Nullable CompressorRegistry registry) {
this.serverBuilder.compressorRegistry(registry);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.serviceplus.broker.kv.service.BrokerServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
Expand All @@ -34,10 +35,13 @@ public class BrokerGrpcServerStartUp {

private static final Logger LOGGER = LoggerFactory.getLogger(BrokerGrpcServerStartUp.class);

@Value("${serviceplus.broker.grpc.port:8766}")
private int port;

@PostConstruct
public void start() {
BrokerGrpcServerBuilder brokerGrpcServerBuilder = BrokerGrpcServerBuilder.forPort(8766);
Server server = brokerGrpcServerBuilder.addService(BrokerServiceManager.getBindableServiceList()).build();
BrokerGrpcServerBuilder brokerGrpcServerBuilder = BrokerGrpcServerBuilder.forPort(port);
Server server = brokerGrpcServerBuilder.addService(BrokerServiceManager.getInstance().getBindableServiceList()).build();
try {
server.start();
Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown));
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ server.port=8080
server.servlet.context-path=/serviceplus
spring.application.name=serviceplus

serviceplus.broker.grpc.port=8766
serviceplus.storage.port=8866
serviceplus.storage.host=127.0.0.1
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ public static KvClient createKvClient(Properties properties) {
* @return 注册客户端
*/
public static RegisterClient createRegisterClient(Properties properties) {
return DefaultRegisterClient.getInstance();
return DefaultRegisterClient.getInstance(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,67 @@
*/
package org.serviceplus.client.register;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.serviceplus.client.kv.DefaultKvClient;
import org.serviceplus.client.model.SpApplication;
import org.serviceplus.register.proto.SpServiceRegisterGrpc;
import org.serviceplus.register.proto.SpServiceRegisterOuterClass;

import java.util.Properties;

/**
* @author lixiaoshuang
*/
public class DefaultRegisterClient implements RegisterClient {

private DefaultRegisterClient() {
private static volatile DefaultRegisterClient instance;

private final ManagedChannel channel;
private final SpServiceRegisterGrpc.SpServiceRegisterStub spServiceRegisterStub;

private DefaultRegisterClient(Properties properties) {
String host = properties.getProperty("host");
int post = Integer.parseInt(properties.getProperty("port"));
this.channel = ManagedChannelBuilder.forAddress(host, post)
.usePlaintext()
.build();
this.spServiceRegisterStub = SpServiceRegisterGrpc.newStub(channel);
Runtime.getRuntime().addShutdownHook(new Thread(channel::shutdown));
}

@Override
public void register(SpApplication spApplication) {
if (spApplication == null) {
throw new IllegalArgumentException("spApplication is null");
}
spServiceRegisterStub.bidirectionalStreamingMethod(new StreamObserver<SpServiceRegisterOuterClass.ServerRegisterRequest>() {
@Override
public void onNext(SpServiceRegisterOuterClass.ServerRegisterRequest serverRegisterRequest) {

}
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {

private static class DefaultRegisterClientHolder {
private static final DefaultRegisterClient INSTANCE = new DefaultRegisterClient();
}
});
}

public static DefaultRegisterClient getInstance() {
return DefaultRegisterClientHolder.INSTANCE;
public static DefaultRegisterClient getInstance(Properties properties) {
if (instance == null) {
synchronized (DefaultKvClient.class) {
if (instance == null) {
instance = new DefaultRegisterClient(properties);
}
}
}
return instance;
}
}
55 changes: 55 additions & 0 deletions common/src/main/proto/SpServiceRegister.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
syntax = "proto3";
option java_package = "org.serviceplus.register.proto";

service SpServiceRegister {
rpc BidirectionalStreamingMethod(stream ClientRegisterRequest) returns (stream ServerRegisterRequest) {}
}

message ClientRegisterRequest {
oneof ClientPayload {
ServiceRegister serviceRegister = 1; // 服务注册
ServiceInvokeResponse serviceInvokeResponse = 2; // 服务调用响应
}
}

message ServerRegisterRequest {
oneof ServerPayload {
ServiceInvoke serviceInvoke = 1; // 服务调用
ServiceRegisterResponse serviceRegisterResponse = 2; // 服务注册响应
}
}

message ServiceRegister{
string applicationName = 1; // 应用名
string ip = 2; // IP地址
string port = 3; // 端口
repeated ClientService clientService = 4; // 服务列表
}

message ClientService {
string className = 1; // 全类名
string simpleClassName = 2; // 简单类名
string serviceName = 3; // 服务名
string methodName = 4; // 方法名
repeated string paramNames = 5; // 参数名称,使用列表
repeated string paramDesc = 6; // 参数描述,使用列表
repeated string paramTypes = 7; // 参数类型,这里使用字符串列表表示类名
string returnNames = 8; // 返回值名称
string returnTypes = 9; // 返回值类型,使用字符串表示类名
}

message ServiceInvoke{

}

message ServiceRegisterResponse{
string code = 1; // 响应码
string message = 2; // 响应消息
string data = 3; // 响应数据
}

message ServiceInvokeResponse{
string code = 1; // 响应码
string message = 2; // 响应消息
string data = 3; // 响应数据
}

0 comments on commit 3afafe2

Please sign in to comment.