Skip to content

U/jgates/loader #426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 49 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
c71d1e5
Modified FileServer and added lader to build.
jgates108 Feb 2, 2018
f54924a
Modified FileServerConnection.
jgates108 Feb 7, 2018
2e2fb26
Modified FileServer and added lader to build.
jgates108 Feb 2, 2018
4c70f07
Modified FileServerConnection.
jgates108 Feb 7, 2018
4e2dcce
Added udp test code.
jgates108 Mar 7, 2018
068eace
Test binaries.
jgates108 Mar 7, 2018
04426b4
Changed to echo udp server and client.
jgates108 Mar 8, 2018
154a591
Some progress.
jgates108 Mar 8, 2018
cc53009
Added SConscript file.
jgates108 Mar 9, 2018
88127c0
Working UDP async echo server and sync client.
jgates108 Mar 9, 2018
192e51b
Implemented basic master and worker.
jgates108 Mar 16, 2018
bc23d69
Fixed errors from rebase.
jgates108 Aug 27, 2018
e940a6e
Changes to handleTest2
jgates108 Sep 7, 2018
2c05f59
TCP server working test.
jgates108 Sep 13, 2018
58a8d02
Passing basic tests, again.
jgates108 Sep 28, 2018
4b33b44
Added code so worker recognizes new neighbor.
jgates108 Oct 3, 2018
1830287
Added files.
jgates108 Oct 3, 2018
36390ab
Added range setting logic.
jgates108 Oct 11, 2018
243acfa
Range setting changes.
jgates108 Oct 15, 2018
432a258
Shift to right almost working.
jgates108 Oct 18, 2018
776ee94
Shift to right appears to work.
jgates108 Oct 19, 2018
849e6e3
Fixed some bugs. Added ability to send a key insert to a neighbor.
jgates108 Oct 24, 2018
17c4a91
Shifting appears to work.
jgates108 Oct 25, 2018
3e059f5
Fixed communication problem.
jgates108 Oct 26, 2018
3224bb0
Added code so the master will send ranges to all its clients.
jgates108 Oct 26, 2018
5025164
Cleaned up code.
jgates108 Oct 29, 2018
b6cdcf0
More code cleanup.
jgates108 Oct 30, 2018
cf808fb
More code cleanup.
jgates108 Oct 30, 2018
b034e43
Removed unused files.
jgates108 Nov 1, 2018
ef8f0c8
Made some review changes.
jgates108 Nov 9, 2018
dd7fd05
Made review changes.
jgates108 Nov 14, 2018
70eb7e1
Made review changes and cleaned up some todo items.
jgates108 Nov 16, 2018
70ff449
Moved MsgElement classes into their own header file.
jgates108 Nov 20, 2018
68e945d
Put DoListItem in its own header file.
jgates108 Nov 21, 2018
eb9f137
Fragile initialization order of Central moved to initialize function.
jgates108 Nov 29, 2018
f79e650
Fixed a race condition and some minor changes.
jgates108 Dec 3, 2018
64b88d1
Fixed a race condition and some minor changes.
jgates108 Dec 3, 2018
2e80568
Created WorkerListItemBase class.
jgates108 Dec 4, 2018
3bdcc10
Replace boost:bind calls with lambdas.
jgates108 Dec 6, 2018
02e4f5e
Removed a comment.
jgates108 Dec 7, 2018
cb5292e
Add configuration file classes.
jgates108 Dec 13, 2018
4481c78
Fixed a bug with TCP communications and added configuration file elem…
jgates108 Dec 18, 2018
7d25fc0
Cleaned up code.
jgates108 Dec 19, 2018
256cc4a
Added reasonable type checking to configuration file values.
jgates108 Jan 9, 2019
f491273
Implemented max number of concurrent client insert and lookup requests.
jgates108 Dec 20, 2018
eb3168f
Started converting to using CompositeKey instead of string keys.
jgates108 Dec 21, 2018
bb8902e
Changed to use CompositeKey instead of string for keys.
jgates108 Jan 10, 2019
0bb1406
Created separate programs for master, client, and server.
jgates108 Jan 18, 2019
6f0ba51
Added timers to client program.
jgates108 Jan 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/modules/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ shlibs["qmetaLib"] = dict(mods="qmeta:python",
libs="qserv_qmeta log",
SHLIBPREFIX='',
instDir="$python_prefix/lsst/qserv/qmeta")

# library of tools for building binary applications of the loader subsystem
shlibs["loader"] = dict(mods="""loader""",
libs="""qserv_common util protobuf boost_filesystem boost_system
log log4cxx""")

# get list of all modules
all_modules = sorted(str(d) for d in Glob('*', source=True) if os.path.isdir(d.srcnode().abspath))
Expand Down
2 changes: 1 addition & 1 deletion core/modules/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ std::string UserQuerySelect::getError() const {
return _qSession->getError() + div + _errorExtra;
}

/// Attempt to kill in progress.
/// Attempt to kill query in progress.
void UserQuerySelect::kill() {
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect kill");
std::lock_guard<std::mutex> lock(_killMutex);
Expand Down
200 changes: 200 additions & 0 deletions core/modules/loader/BufferUdp.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// -*- LSST-C++ -*-
/*
* LSST Data Management System
* Copyright 2018 AURA/LSST.
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "BufferUdp.h"

// qserv headers
#include "loader/LoaderMsg.h"

// LSST headers
#include "lsst/log/Log.h"

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.loader.BufferUdp");
}


namespace lsst {
namespace qserv {
namespace loader {


MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket, std::string const& note) {
// Repeatedly read a socket until a valid MsgElement is read, eof, or an error occurs.
for (;;) {
LOGS(_log, LOG_LVL_DEBUG, note << " readFromSocket");
boost::system::error_code error;

// If there's something in the buffer already, get it and return.
// This can happen when the previous read of socket read multiple elements.
MsgElement::Ptr msgElem = _safeRetrieve();
if (msgElem != nullptr) {
return msgElem;
}

size_t len = socket.read_some(boost::asio::buffer(_wCursor, getAvailableWriteLength()), error);
_wCursor += len; /// must advance the cursor.

// EOF is only a problem if no MsgElement was retrieved.
// ??? This is definitely the case in UDP, EOF as nothing more will show up.
// ??? But TCP is another issue as EOF is returned when the connection is still live but
// ??? there's no data (len=0). Why does read_some set error to eof before the tcp connection is closed?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah; according to the docs eof is because the peer closed the connection. You're sure the peer didn't close it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure I've seen the warning below show up in the log, but it has been a little while since I went looking for it.

if (error == boost::asio::error::eof) {
LOGS(_log, LOG_LVL_WARN, "readFromSocket eof");
break; // Connection closed cleanly by peer.
} else if (error && error != boost::asio::error::eof) {
throw LoaderMsgErr(ERR_LOC, "BufferUdp::readFromSocket note=" + note + " asio error=" + error.message());
}

/// Try to retrieve an element (there's no guarantee that an entire element got read in a single read.
// Store original cursor positions so they can be restored if the read fails.
msgElem = _safeRetrieve();
if (msgElem != nullptr) {
return msgElem;
}
}
return nullptr;
}


bool BufferUdp::releaseOwnership() {
if (_ourBuffer) {
_ourBuffer = false;
return true;
}
return false;
}


bool BufferUdp::append(const void* in, size_t len) {
if (isAppendSafe(len)) {
memcpy(_wCursor, in, len);
_wCursor += len;
return true;
}
return false;
}


void BufferUdp::advanceWriteCursor(size_t len) {
_wCursor += len;
if (not isAppendSafe(0)) {
// The buffer has overflowed.
throw std::overflow_error("BufferUdp advanceWriteCursor beyond buffer len=" +
std::to_string(len));
}
}


void BufferUdp::advanceReadCursor(size_t len) {
_rCursor += len;
if (_rCursor > _end) {
// Something read data outside of the buffer range.
throw std::overflow_error("BufferUdp advanceReadCursor beyond buffer len=" +
std::to_string(len));
}
}


std::shared_ptr<MsgElement> BufferUdp::_safeRetrieve() {
auto wCursorOriginal = _wCursor;
auto rCursorOriginal = _rCursor;
MsgElement::Ptr msgElem = MsgElement::retrieve(*this);
if (msgElem != nullptr) {
return msgElem;
} else {
_wCursor = wCursorOriginal;
_rCursor = rCursorOriginal;
}
return nullptr;
}


bool BufferUdp::isRetrieveSafe(size_t len) const {
auto newLen = (_rCursor + len);
return (newLen <= _end && newLen <= _wCursor);
}


bool BufferUdp::retrieve(void* out, size_t len) {
if (isRetrieveSafe(len)) {
memcpy(out, _rCursor, len);
_rCursor += len;
return true;
}
return false;
}


bool BufferUdp::retrieveString(std::string& out, size_t len) {
if (isRetrieveSafe(len)) {
const char* strEnd = _rCursor + len;
std::string str(_rCursor, strEnd);
_rCursor = strEnd;
out = str;
return true;
}
return false;
}


std::string BufferUdp::dumpStr(bool hexDump, bool charDump) const {
std::stringstream os;
dump(os, hexDump, charDump);
return os.str();
}


std::ostream& BufferUdp::dump(std::ostream &os, bool hexDump, bool charDump) const {
os << "maxLength=" << _length;
os << " buffer=" << (void*)_buffer;
os << " wCurLen=" << getAvailableWriteLength();
os << " wCursor=" << (void*)_wCursor;
os << " rCurLen=" << getBytesLeftToRead();
os << " rCursor=" << (void*)_rCursor;
os << " end=" << (void*)_end;

// hex dump
if (hexDump) {
os << "(";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LSSt Style Guide requires indentation in 4 white spaces.

for (const char* j=_buffer; j < _wCursor; ++j) {
os << std::hex << (int)*j << " ";
}
os << ")";
}

// character dump
if (charDump) {
os << "(" << std::string(_buffer, _wCursor) << ")";
}

return os;
}


std::ostream& operator<<(std::ostream& os, BufferUdp const& buf) {
return buf.dump(os, false, false);
}

}}} // namespace lsst:qserv:loader
171 changes: 171 additions & 0 deletions core/modules/loader/BufferUdp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// -*- LSST-C++ -*-
/*
* LSST Data Management System
* Copyright 2018 LSST Corporation.
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_BUFFERUDP_H
#define LSST_QSERV_LOADER_BUFFERUDP_H

// system headers
#include <arpa/inet.h>
#include <cstring>
#include <stdexcept>
#include <memory>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header needs to be moved one line up

#include <sstream>
#include <string>

// third party headers
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverse the order of the headers.
Also, put a comment line before including these headers:

/// Third party headers

#include <boost/bind.hpp>


namespace lsst {
namespace qserv {
namespace loader {

class MsgElement;


/// A buffer for reading and writing. Nothing can be read from the buffer until
/// something has written to it.
class BufferUdp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there unit tests for this class? it seems like it would be valuable to have some.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are test for UdpBuffer at the beginning of udpTest.cc. I think there's some room for discussion for the best way to setup unit tests for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mentioned in that file that it's an integration test. There is a lot of code here without testing at the 'unit' granularity, which I think is an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a separate ticket for creating unit tests DM-16649

public:
using Ptr = std::shared_ptr<BufferUdp>;

/// The absolute largest UDP message we would send.
/// Usually, they should be much smaller.
static int const MAX_MSG_SIZE = 6000;

/// Create the object with a new _buffer with 'length' bytes.
explicit BufferUdp(size_t length = MAX_MSG_SIZE) : _length(length) {
_buffer = new char[length];
_ourBuffer = true;
_setupBuffer();
}


/// Create a BufferUdp object that uses 'buf' as its buffer, with 'length'
/// indicating the number of bytes in the buffer. If 'buf' already
/// contains valid data, 'validBytes' must be set to how many bytes of the buffer
/// are valid.
/// If BufferUdp should take ownership of 'buf', i.e. delete 'buf' when it is done,
/// call makeOwnerOfBuffer().
BufferUdp(char* buf, size_t length, size_t validBytes) : _buffer(buf), _length(length) {
_setupBuffer();
advanceWriteCursor(validBytes);
}

BufferUdp(BufferUdp const&) = delete;
BufferUdp& operator=(BufferUdp const&) = delete;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the extra line separating members is only required in the CC files, not in the header ones.

~BufferUdp() { if (_ourBuffer) { delete[] _buffer; } }

/// Resets the cursors in the buffer so it is effectively empty.
void reset() { _setupBuffer(); }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line?

/// Return true only if this object owns the buffer.
bool releaseOwnership();

/// Make this object is responsible for deleting _buffer.
void makeOwnerOfBuffer() { _ourBuffer = true; }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line

/// Return true if there's at least 'len' room left in the buffer.
bool isAppendSafe(size_t len) const { return (_wCursor + len) <= _end; }

/// Append 'len' bytes at 'in' to the end of _buffer, but only if it is safe to do so.
bool append(const void* in, size_t len);

/// Advance the write cursor. This is usually needed after some other object has been
/// allowed to write directly to the buffer. (boost::asio)
void advanceWriteCursor(size_t len);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line

/// Advance the read cursor, which usually needs to be done after another object
/// has been allowed to read directly from the buffer. (boost::asio)
void advanceReadCursor(size_t len);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line

/// Repeatedly read a socket until a valid MsgElement is read, eof, or an error occurs.
/// Errors throw LoaderMsgErr
std::shared_ptr<MsgElement> readFromSocket(boost::asio::ip::tcp::socket& socket, std::string const& note);

/// Return the total length of _buffer.
size_t getMaxLength() const { return _length; }

/// Returns the number of bytes left to be read from the buffer.
int getBytesLeftToRead() const { return _wCursor - _rCursor; }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra lines

/// Returns the amount of room left in the buffer after the write cursor.
size_t getAvailableWriteLength() const { return _end - _wCursor; }

/// Returns a char* pointing to data to be read from the buffer.
const char* getReadCursor() const { return _rCursor; }

/// Returns a char* pointing to where data should be written to the buffer.
char* getWriteCursor() const { return _wCursor; }

/// Returns true if retrieving 'len' bytes from the buffer will not violate the buffer rules.
bool isRetrieveSafe(size_t len) const;

/// Returns true if 'len' bytes could be copied to out without violating _buffer rules.
bool retrieve(void* out, size_t len);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why void* instead of char*?


/// Returns true if 'len' bytes could be copied to 'out' without violating _buffer rules.
bool retrieveString(std::string& out, size_t len);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this method needs to be called retrieveString? The overloaded version of retrieve would also work.


/// Dumps basic data to a string. If 'hexDump' is true, include a complete dump of
/// _buffer in hex.
std::string dumpStr(bool hexDump=true) const { return dumpStr(hexDump, false); }

/// Dumps basic data to a string. If 'hexDump' is true, include a complete dump of
/// _buffer in hex. If 'charDump' is true, include a complete dump of the buffer
/// in ascii.
std::string dumpStr(bool hexDump, bool charDump) const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain the meaning of both parameters!


std::ostream& dump(std::ostream &os, bool hexDump, bool charDump) const;

private:
void _setupBuffer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the body of the method into the CC file.

_end = _buffer + _length;
_wCursor = _buffer;
_rCursor = _buffer;
}

/// Parse the valid portion of _buffer (starting at _rCursor) and see if one
/// MsgElement is available. If so, return the element and advance _rCursor.
/// Otherwise return nullptr.
/// If a message is not recovered, the buffer is left effectively unchanged.
std::shared_ptr<MsgElement> _safeRetrieve();


char* _buffer;
size_t _length; ///< Number of bytes in the array (total capacity of array).
char* _end; ///< Immediately after the last element in the array.
char* _wCursor; ///< Where new elements will be appended to the array.
const char* _rCursor; ///< Where data is read from the buffer.

bool _ourBuffer{false}; ///< true if this class object is responsible for deleting the buffer.
};

/// Print basic buffer information. Use BufferUdp::dump() directly if the buffer contents are needed.
std::ostream& operator<<(std::ostream& os, BufferUdp const& buf);

}}} // namespace lsst:qserv:loader

#endif // LSST_QSERV_LOADER_BUFFERUDP_H
Loading