Skip to content

Commit

Permalink
实现双向流客户端、服务端骨架
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xiao-shuang committed Jun 18, 2024
1 parent 3afafe2 commit e688fd5
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* @author lixiaoshuang
*/
@Data
public class AdminApplication {
public class BrokerApplication {
/**
* 应用名
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @author lixiaoshuang
*/
@Data
public class AdminService {
public class BrokerService {
/**
* 全类名
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024 service plus open source organization.
*
* Licensed under the Apache License,Version2.0(the"License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.serviceplus.broker.register.service;

import io.grpc.stub.StreamObserver;
import org.serviceplus.register.proto.SpServiceRegisterGrpc;
import org.serviceplus.register.proto.SpServiceRegisterOuterClass;

/**
* @author lixiaoshuang
*/
public class BrokerServiceRegisterService extends SpServiceRegisterGrpc.SpServiceRegisterImplBase {
@Override
public StreamObserver<SpServiceRegisterOuterClass.ClientRegisterRequest> bidirectionalStreamingMethod(StreamObserver<SpServiceRegisterOuterClass.ServerRegisterRequest> responseObserver) {
StreamObserver<SpServiceRegisterOuterClass.ClientRegisterRequest> streamObserver = new StreamObserver<SpServiceRegisterOuterClass.ClientRegisterRequest>() {
@Override
public void onNext(SpServiceRegisterOuterClass.ClientRegisterRequest clientRegisterRequest) {
boolean b = clientRegisterRequest.hasServiceRegister();
if (b) {
System.out.println("收到注册服务请求:" + clientRegisterRequest.getServiceRegister().getApplicationName());
}
}

@Override
public void onError(Throwable throwable) {
System.out.println("收到注册服务请求异常");
}

@Override
public void onCompleted() {
System.out.println("收到注册服务请求完成");
}
};
// 向客户端返回调用结果
SpServiceRegisterOuterClass.ServiceRegisterResponse registerResponse = SpServiceRegisterOuterClass.ServiceRegisterResponse.newBuilder().setMessage("注册服务成功").setCode("200").build();
responseObserver.onNext(SpServiceRegisterOuterClass.ServerRegisterRequest.newBuilder().setServiceRegisterResponse(registerResponse).build());
return streamObserver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.serviceplus.broker.register;
package org.serviceplus.broker.register.storage;

import org.serviceplus.broker.model.AdminApplication;
import org.serviceplus.broker.model.BrokerApplication;

import java.util.List;

Expand All @@ -29,7 +29,7 @@ public interface ApplicationRegisterCenter {
* @param applicationName 应用名
* @param application 应用
*/
void registerApplication(String applicationName, AdminApplication application);
void registerApplication(String applicationName, BrokerApplication application);

/**
* 获取应用名列表
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.serviceplus.broker.register;
package org.serviceplus.broker.register.storage;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.serviceplus.broker.model.AdminApplication;
import org.serviceplus.broker.model.BrokerApplication;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
Expand All @@ -37,15 +37,15 @@ public class MemoryApplicationRegisterCenter implements ApplicationRegisterCente
* 应用信息存储
* 应用名 -> 应用列表
*/
private static final Map<String, Set<AdminApplication>> APPLICATION_MAP = new ConcurrentHashMap<>();
private static final Map<String, Set<BrokerApplication>> APPLICATION_MAP = new ConcurrentHashMap<>();
/**
* 应用ip存储
* 应用名 -> 应用ip
*/
public static final Map<String, Set<String>> APPLICATION_IP_MAP = new ConcurrentHashMap<>();

@Override
public void registerApplication(String applicationName, AdminApplication application) {
public void registerApplication(String applicationName, BrokerApplication application) {
if (StringUtils.isBlank(applicationName) || application == null) {
log.error("register application error, applicationName: {}, application: {}", applicationName, application);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.serviceplus.broker.register;
package org.serviceplus.broker.register.storage;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.serviceplus.broker.model.AdminService;
import org.serviceplus.broker.model.BrokerService;
import org.springframework.stereotype.Component;

import java.util.Map;
Expand All @@ -34,10 +34,10 @@ public class MemoryServiceRegisterCenter implements ServiceRegisterCenter {
* 服务存储
* 应用名 -> 应用ip -> 服务列表
*/
public static final Map<String, Map<String, Set<AdminService>>> SERVICE_MAP = new ConcurrentHashMap<>();
public static final Map<String, Map<String, Set<BrokerService>>> SERVICE_MAP = new ConcurrentHashMap<>();

@Override
public void registerService(String applicationName, String applicationIp, AdminService service) {
public void registerService(String applicationName, String applicationIp, BrokerService service) {
if (StringUtils.isBlank(applicationName) || StringUtils.isBlank(applicationIp) || service == null) {
log.error("register service error, applicationName: {}, applicationIp: {}, service: {}", applicationName, applicationIp, service);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.serviceplus.broker.register;
package org.serviceplus.broker.register.storage;

import org.serviceplus.broker.model.AdminService;
import org.serviceplus.broker.model.BrokerService;

/**
* @author lixiaoshuang
Expand All @@ -28,5 +28,5 @@ public interface ServiceRegisterCenter {
* @param applicationIp ip
* @param service 服务
*/
void registerService(String applicationName, String applicationIp, AdminService service);
void registerService(String applicationName, String applicationIp, BrokerService service);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.*;
import org.serviceplus.broker.kv.service.BrokerServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.serviceplus.broker.server;

import io.grpc.Server;
import org.serviceplus.broker.kv.service.BrokerServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -45,7 +44,7 @@ public void start() {
try {
server.start();
Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown));
LOGGER.info("The grpc server at the broker tier is successfully started.");
LOGGER.info("The grpc server at the broker tier is successfully started. port: {}", port);
} catch (IOException e) {
LOGGER.error("The grpc server at the broker tier fails to be started.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.serviceplus.broker.kv.service;
package org.serviceplus.broker.server;

import io.grpc.BindableService;
import org.serviceplus.broker.kv.service.BrokerKvStorageService;
import org.serviceplus.broker.register.service.BrokerServiceRegisterService;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -42,6 +44,7 @@ public static BrokerServiceManager getInstance() {

public void initialized() {
this.bindService(new BrokerKvStorageService());
this.bindService(new BrokerServiceRegisterService());
}

public void bindService(BindableService bindableService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.grpc.stub.StreamObserver;
import org.serviceplus.client.kv.DefaultKvClient;
import org.serviceplus.client.model.SpApplication;
import org.serviceplus.client.model.SpService;
import org.serviceplus.register.proto.SpServiceRegisterGrpc;
import org.serviceplus.register.proto.SpServiceRegisterOuterClass;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
Expand Down Expand Up @@ -50,24 +53,91 @@ public void register(SpApplication spApplication) {
if (spApplication == null) {
throw new IllegalArgumentException("spApplication is null");
}
spServiceRegisterStub.bidirectionalStreamingMethod(new StreamObserver<SpServiceRegisterOuterClass.ServerRegisterRequest>() {
StreamObserver<SpServiceRegisterOuterClass.ClientRegisterRequest> streamObserver = spServiceRegisterStub.bidirectionalStreamingMethod(new StreamObserver<SpServiceRegisterOuterClass.ServerRegisterRequest>() {
@Override
public void onNext(SpServiceRegisterOuterClass.ServerRegisterRequest serverRegisterRequest) {

boolean hassed = serverRegisterRequest.hasServiceRegisterResponse();
if (hassed) {
SpServiceRegisterOuterClass.ServiceRegisterResponse serviceRegisterResponse = serverRegisterRequest.getServiceRegisterResponse();
System.out.println("接收到服务注册响应:");
System.out.println(serviceRegisterResponse.getMessage());
System.out.println(serviceRegisterResponse.getCode());
}
}

@Override
public void onError(Throwable throwable) {

System.out.println("服务注册失败:" + throwable.getMessage());
}

@Override
public void onCompleted() {

System.out.println("服务注册完成");
}
});
// 构建客户端请求
SpServiceRegisterOuterClass.ServiceRegister serviceRegister = SpServiceRegisterOuterClass.ServiceRegister.newBuilder()
.setApplicationName(spApplication.getApplicationName())
.setIp(spApplication.getIp())
.setPort(spApplication.getPort())
.addAllClientService(this.convertSpServices(spApplication.getSpServices()))
.build();

SpServiceRegisterOuterClass.ClientRegisterRequest clientRegisterRequest = SpServiceRegisterOuterClass.ClientRegisterRequest.newBuilder()
.setServiceRegister(serviceRegister)
.build();
streamObserver.onNext(clientRegisterRequest);
streamObserver.onCompleted();
}


/**
* 将SpService转换为Protobuf消息
*
* @param spServices SpService列表
* @return Protobuf消息列表
*/
public List<SpServiceRegisterOuterClass.ClientService> convertSpServices(List<SpService> spServices) {
List<SpServiceRegisterOuterClass.ClientService> clientServices = new ArrayList<>();
for (SpService spService : spServices) {
SpServiceRegisterOuterClass.ClientService.Builder clientServiceBuilder =
SpServiceRegisterOuterClass.ClientService.newBuilder();
clientServiceBuilder.setClassName(spService.getClassName());
clientServiceBuilder.setSimpleClassName(spService.getSimpleClassName());
clientServiceBuilder.setServiceName(spService.getServiceName());
clientServiceBuilder.setMethodName(spService.getMethodName());
if (spService.getParamNames() != null) {
clientServiceBuilder.addAllParamNames(spService.getParamNames());
}
if (spService.getParamDesc() != null) {
clientServiceBuilder.addAllParamDesc(spService.getParamDesc());
}
// 将Class<?>类型的参数转化为字符串(即类的全限定名)
if (spService.getParamTypes() != null) {
List<String> paramTypeNames = new ArrayList<>();
for (Class<?> paramType : spService.getParamTypes()) {
paramTypeNames.add(paramType.getName());
}
clientServiceBuilder.addAllParamTypes(paramTypeNames);
}

// 设置返回值名称和类型(需要将Class<?>类型的返回值转换为字符串)
clientServiceBuilder.setReturnNames(spService.getReturnNames());
if (spService.getReturnTypes() != null) {
clientServiceBuilder.setReturnTypes(spService.getReturnTypes().getName());
}
clientServices.add(clientServiceBuilder.build());
}
return clientServices;
}


/**
* 单例模式
*
* @param properties 配置
* @return DefaultRegisterClient
*/
public static DefaultRegisterClient getInstance(Properties properties) {
if (instance == null) {
synchronized (DefaultKvClient.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ private void scanner() {
spApplication.setIp(NetworkUtils.getLocalHostExactAddress());
spApplication.setPort(this.getPort());
spApplication.setSpServices(spServices);
RegisterClient registerClient = ServicePlusFactory.createRegisterClient(new Properties());
//todo 修改为读配置文件
Properties properties = new Properties();
properties.setProperty("host", "127.0.0.1");
properties.setProperty("port", "8766");
RegisterClient registerClient = ServicePlusFactory.createRegisterClient(properties);
registerClient.register(spApplication);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class StorageStartUp {

private static final Logger LOGGER = LoggerFactory.getLogger(StorageStartUp.class);

public static void main(String[] args) {
public static void main(String[] args) throws InterruptedException {
StorageGrpcServerBuilder storageGrpcServerBuilder = StorageGrpcServerBuilder.forPort(8866);
Server server = storageGrpcServerBuilder.addService(StorageServiceManager.getBindableServiceList()).build();
try {
Expand All @@ -41,8 +41,10 @@ public static void main(String[] args) {
LOGGER.info("The grpc server at the storage tier is successfully started.");
} catch (IOException e) {
LOGGER.error("The grpc server at the storage tier fails to be started.", e);
}
while (true) {
}finally {
if (server!=null){
server.awaitTermination();
}
}
}
}

0 comments on commit e688fd5

Please sign in to comment.