Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sofa-pbrpc cookie #81

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
16 changes: 13 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ OPT ?= -O2 # (A) Production use (optimized mode)
include depends.mk

LIB=libsofa-pbrpc.a
LIB_FULL=libsofa-pbrpc-full.a
LIB_SRC=$(wildcard src/sofa/pbrpc/*.cc)
PLUGIN_SRC=$(wildcard src/sofa/pbrpc/plugin/*/*.cc)
LIB_OBJ=$(patsubst %.cc,%.o,$(LIB_SRC))
PLUGIN_OBJ=$(patsubst %.cc,%.o,$(PLUGIN_SRC))
PROTO=$(wildcard src/sofa/pbrpc/*.proto)
PROTO_SRC=$(patsubst %.proto,%.pb.cc,$(PROTO))
PROTO_HEADER=$(patsubst %.proto,%.pb.h,$(PROTO))
Expand All @@ -45,6 +48,7 @@ PUB_INC=src/sofa/pbrpc/pbrpc.h src/sofa/pbrpc/closure_helper.h src/sofa/pbrpc/cl
src/sofa/pbrpc/fast_lock.h src/sofa/pbrpc/rw_lock.h src/sofa/pbrpc/scoped_locker.h \
src/sofa/pbrpc/condition_variable.h src/sofa/pbrpc/wait_event.h src/sofa/pbrpc/http.h \
src/sofa/pbrpc/buffer.h src/sofa/pbrpc/buf_handle.h src/sofa/pbrpc/profiling_linker.h \
src/sofa/pbrpc/rpc_attachment.h \
$(PROTO) $(PROTO_HEADER)

#-----------------------------------------------
Expand Down Expand Up @@ -84,7 +88,7 @@ check_depends:
@if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi

clean:
rm -f $(LIB) $(BIN) $(LIB_OBJ) $(PROTO_OBJ) $(BIN_OBJ) $(PROTO_HEADER) $(PROTO_SRC)
rm -f $(LIB) $(LIB_FULL) $(BIN) $(LIB_OBJ) $(PLUGIN_OBJ) $(PROTO_OBJ) $(BIN_OBJ) $(PROTO_HEADER) $(PROTO_SRC)

rebuild: clean all

Expand All @@ -95,6 +99,9 @@ $(LIB_OBJ): $(PROTO_HEADER)
$(LIB): $(LIB_OBJ) $(PROTO_OBJ)
ar crs $@ $(LIB_OBJ) $(PROTO_OBJ)

$(LIB_FULL): $(PLUGIN_OBJ) $(LIB_OBJ) $(PROTO_OBJ)
ar crs $@ $(PLUGIN_OBJ) $(LIB_OBJ) $(PROTO_OBJ)

$(BIN): $(LIB) $(BIN_OBJ)
$(CXX) $(BIN_OBJ) -o $@ $(LIB) $(LDFLAGS)

Expand All @@ -104,19 +111,22 @@ $(BIN): $(LIB) $(BIN_OBJ)
%.o: %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@

build: $(LIB) $(BIN)
build: $(LIB) $(LIB_FULL) $(BIN)
@echo
@echo 'Build succeed, run "make install" to install sofa-pbrpc to "'$(PREFIX)'".'

install: $(LIB) $(BIN)
install: $(LIB) $(LIB_FULL) $(BIN)
mkdir -p $(PREFIX)/include/sofa/pbrpc
cp -r $(PUB_INC) $(TARGET_DIRECTORY) $(PREFIX)/include/sofa/pbrpc/
mkdir -p $(PREFIX)/include/sofa/pbrpc/smart_ptr
cp src/sofa/pbrpc/smart_ptr/*.hpp $(PREFIX)/include/sofa/pbrpc/smart_ptr
mkdir -p $(PREFIX)/include/sofa/pbrpc/smart_ptr/detail
cp src/sofa/pbrpc/smart_ptr/detail/*.hpp $(PREFIX)/include/sofa/pbrpc/smart_ptr/detail
mkdir -p $(PREFIX)/include/sofa/pbrpc/plugin/cookie
cp src/sofa/pbrpc/plugin/cookie/*.h $(PREFIX)/include/sofa/pbrpc/plugin/cookie
mkdir -p $(PREFIX)/lib
cp $(LIB) $(PREFIX)/lib/
cp $(LIB_FULL) $(PREFIX)/lib/
mkdir -p $(PREFIX)/bin
cp $(BIN) $(PREFIX)/bin/
@echo
Expand Down
6 changes: 3 additions & 3 deletions sample/echo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ INCPATH=-I. -I$(SOFA_PBRPC)/include -I$(PROTOBUF_DIR)/include \
-I$(SNAPPY_DIR)/include -I$(ZLIB_DIR)/include
CXXFLAGS += $(OPT) -pipe -W -Wall -fPIC -D_GNU_SOURCE -D__STDC_LIMIT_MACROS $(INCPATH)

LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a
LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc-full.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a
LDFLAGS += -L$(ZLIB_DIR)/lib -lpthread -lz

UNAME_S := $(shell uname -s)
Expand All @@ -57,10 +57,10 @@ check_depends:
@if [ ! -f "$(SNAPPY_DIR)/include/snappy.h" ]; then echo "ERROR: need snappy header"; exit 1; fi
@if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/include/sofa/pbrpc/pbrpc.h" ]; then echo "ERROR: need sofa-pbrpc header"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc.a" ]; then echo "ERROR: need sofa-pbrpc lib"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc-full.a" ]; then echo "ERROR: need sofa-pbrpc-full lib"; exit 1; fi

clean:
@rm -f $(BIN) *.o *.pb.*
@rm -f $(BIN) *.o *.pb.*

rebuild: clean all

Expand Down
18 changes: 18 additions & 0 deletions sample/echo/client_async.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

#include <unistd.h>
#include <sofa/pbrpc/pbrpc.h>
#include <sofa/pbrpc/plugin/cookie/rpc_cookie.h>
#include "echo_service.pb.h"

typedef sofa::pbrpc::shared_ptr<sofa::pbrpc::Cookie> CookiePtr;
sofa::pbrpc::RpcCookieManager cookie_manager;

void EchoCallback(sofa::pbrpc::RpcController* cntl,
sofa::pbrpc::test::EchoRequest* request,
sofa::pbrpc::test::EchoResponse* response,
Expand All @@ -26,6 +30,14 @@ void EchoCallback(sofa::pbrpc::RpcController* cntl,
}
else {
SLOG(NOTICE, "request succeed: %s", response->message().c_str());
CookiePtr cookie(new sofa::pbrpc::Cookie(&cookie_manager));
if (cntl->GetResponseAttachment(cookie.get()))
{
std::string version;
cookie->Get("version", version);
SLOG(NOTICE, "cookie version=%s", version.c_str());
cookie->Store();
}
}

delete cntl;
Expand All @@ -46,9 +58,15 @@ int main()
sofa::pbrpc::RpcChannelOptions channel_options;
sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321", channel_options);

CookiePtr cookie(new sofa::pbrpc::Cookie(&cookie_manager));
cookie->Load();
cookie->Set("type", "async");
cookie->Set("logid", "123456");

// Prepare parameters.
sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
cntl->SetTimeout(3000);
cntl->SetRequestAttachment(cookie.get());
sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest();
request->set_message("Hello from qinzuoyan01");
sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse();
Expand Down
17 changes: 17 additions & 0 deletions sample/echo/client_sync.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
// Author: [email protected] (Qin Zuoyan)

#include <sofa/pbrpc/pbrpc.h>
#include <sofa/pbrpc/plugin/cookie/rpc_cookie.h>
#include "echo_service.pb.h"

typedef sofa::pbrpc::shared_ptr<sofa::pbrpc::Cookie> CookiePtr;
sofa::pbrpc::RpcCookieManager cookie_manager;

// Using global RpcClient object can help share resources such as threads and buffers.
sofa::pbrpc::RpcClient g_rpc_client;

Expand All @@ -21,6 +25,11 @@ int main()
// Prepare parameters.
sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
cntl->SetTimeout(3000);
CookiePtr cookie(new sofa::pbrpc::Cookie(&cookie_manager));
cookie->Load();
cookie->Set("type", "sync");
cookie->Set("logid", "123456");
cntl->SetRequestAttachment(cookie.get());
sofa::pbrpc::test::EchoRequest* request =
new sofa::pbrpc::test::EchoRequest();
request->set_message("Hello from qinzuoyan01");
Expand Down Expand Up @@ -50,6 +59,14 @@ int main()
else
{
SLOG(NOTICE, "request succeed: %s", response->message().c_str());
cookie.reset(new sofa::pbrpc::Cookie(&cookie_manager));
if (cntl->GetResponseAttachment(cookie.get()))
{
std::string version;
cookie->Get("version", version);
SLOG(NOTICE, "cookie version=%s", version.c_str());
cookie->Store();
}
}

// Destroy objects.
Expand Down
14 changes: 14 additions & 0 deletions sample/echo/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
#include <signal.h>
#include <unistd.h>
#include <sofa/pbrpc/pbrpc.h>
#include <sofa/pbrpc/plugin/cookie/rpc_cookie.h>
#include "echo_service.pb.h"

typedef sofa::pbrpc::shared_ptr<sofa::pbrpc::Cookie> CookiePtr;

bool WebServlet(const sofa::pbrpc::HTTPRequest& request, sofa::pbrpc::HTTPResponse& response)
{
SLOG(INFO, "WebServlet(): request message from %s:%u",
Expand Down Expand Up @@ -53,6 +56,17 @@ class EchoServerImpl : public sofa::pbrpc::test::EchoServer
SLOG(INFO, "Header[\"%s\"]=\"%s\"", it->first.c_str(), it->second.c_str());
}
}
CookiePtr cookie(new sofa::pbrpc::Cookie());
if (cntl->GetRequestAttachment(cookie.get()))
{
std::string type;
std::string logid;
cookie->Get("type", type);
cookie->Get("logid", logid);
SLOG(INFO, "cookie info : type=%s, logid=%s", type.c_str(), logid.c_str());
}
cookie->Set("version", "1.00");
cntl->SetResponseAttachment(cookie.get());
response->set_message("echo message: " + request->message());
done->Run();
}
Expand Down
48 changes: 45 additions & 3 deletions src/sofa/pbrpc/binary_rpc_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,48 @@ void BinaryRpcRequest::ProcessRequest(
bool parse_request_return = false;
if (compress_type == CompressTypeNone)
{
parse_request_return = request->ParseFromZeroCopyStream(_req_body.get());
parse_request_return =
request->ParseFromBoundedZeroCopyStream(_req_body.get(), _req_header.data_size);
}
else
{
ReadBufferPtr read_buffer(new ReadBuffer());
int bytes_read = 0;
while (bytes_read < _req_header.data_size)
{
const char* read_pos = NULL;
int cur_size;
int bytes_remain = _req_header.data_size - bytes_read;
char* handle_data = _req_body->CurrentHandle();
int handle_offset = _req_body->CurrentHandleOffset();
if (!_req_body->Next(reinterpret_cast<const void**>(&read_pos), &cur_size))
{
#if defined( LOG )
LOG(ERROR) << "ProcessRequest(): " << RpcEndpointToString(_remote_endpoint)
<< ": {" << _req_meta.sequence_id() << "}"
<< ": bad request buffer";
#else
SLOG(ERROR, "ProcessRequest(): %s: {%lu}: bad request buffer",
RpcEndpointToString(_remote_endpoint).c_str(),
_req_meta.sequence_id());
#endif
SendFailedResponse(stream, RPC_ERROR_PARSE_REQUEST_MESSAGE, "bad request buffer");
return;
}
if (bytes_remain >= cur_size)
{
read_buffer->Append(BufHandle(handle_data, cur_size, handle_offset));
bytes_read += cur_size;
}
else
{
_req_body->BackUp(cur_size - bytes_remain);
read_buffer->Append(BufHandle(handle_data, bytes_remain, handle_offset));
bytes_read += bytes_remain;
}
}
sofa::pbrpc::scoped_ptr<AbstractCompressedInputStream> is(
get_compressed_input_stream(_req_body.get(), compress_type));
get_compressed_input_stream(read_buffer.get(), compress_type));
parse_request_return = request->ParseFromZeroCopyStream(is.get());
}
if (!parse_request_return)
Expand Down Expand Up @@ -123,6 +159,9 @@ void BinaryRpcRequest::ProcessRequest(
cntl->SetResponseCompressType(_req_meta.has_expected_response_compress_type() ?
_req_meta.expected_response_compress_type() : CompressTypeNone);

cntl->SetRequestSize(_req_header.data_size);
cntl->SetRequestAttachBuffer(_req_body);

CallMethod(method_board, controller, request, response);
}

Expand Down Expand Up @@ -170,7 +209,10 @@ ReadBufferPtr BinaryRpcRequest::AssembleSucceedResponse(
return ReadBufferPtr();
}
header.data_size = write_buffer.ByteCount() - header_pos - header_size - header.meta_size;
header.message_size = header.meta_size + header.data_size;
write_buffer.Append(cntl->GetResponseAttachBuffer()->ToString());
int attach_size = write_buffer.ByteCount() - header_pos - header_size - header.meta_size - header.data_size;
header.message_size = header.meta_size + header.data_size + attach_size;

write_buffer.SetData(header_pos, reinterpret_cast<const char*>(&header), header_size);

ReadBufferPtr read_buffer(new ReadBuffer());
Expand Down
10 changes: 10 additions & 0 deletions src/sofa/pbrpc/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ bool ReadBuffer::Next(const void** data, int* size)
}
}

char* ReadBuffer::CurrentHandle()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个接口是否有必要

{
return _cur_it->data;
}

int ReadBuffer::CurrentHandleOffset()
{
return _cur_it->offset + _cur_pos;
}

// BackUp() can only be called after a successful Next().
// "count" should be greater than or equal to 0.
void ReadBuffer::BackUp(int count)
Expand Down
7 changes: 7 additions & 0 deletions src/sofa/pbrpc/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ class ReadBuffer : public google::protobuf::io::ZeroCopyInputStream

// implements ZeroCopyInputStream ----------------------------------
bool Next(const void** data, int* size);

// Get the address of current buffer handle.
char* CurrentHandle();

// Get the offset of current buffer handle.
int CurrentHandleOffset();

void BackUp(int count);
bool Skip(int count);
int64 ByteCount() const;
Expand Down
Loading