Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions camerad/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ add_library(camera STATIC

find_library(CCFITS_LIB CCfits NAMES libCCfits PATHS /usr/local/lib)
find_library(CFITS_LIB cfitsio NAMES libcfitsio PATHS /usr/local/lib)
find_library(ZMQ_LIB zmq NAMES libzmq PATH /usr/local/lib)

find_package(Threads)

Expand All @@ -151,6 +152,7 @@ target_link_libraries(camerad
${CMAKE_THREAD_LIBS_INIT}
${CCFITS_LIB}
${CFITS_LIB}
${ZMQ_LIB}
)

target_link_libraries(camerad ${CARC_BASE})
Expand Down
170 changes: 169 additions & 1 deletion camerad/archon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace Archon {
this->is_longexposure_set = false;
this->is_window = false;
this->is_autofetch = false;
this->is_zmq = false;
this->win_hstart = 0;
this->win_hstop = 2047;
this->win_vstart = 0;
Expand Down Expand Up @@ -4527,6 +4528,160 @@ namespace Archon {
}
/**************** Archon::Interface::autofetch *******************************/

/**************** Archon::Interface::zmq ******************************/
/**
* @fn zmq
* @brief turn zmq server on/off
* @param state_in, string "TRUE, FALSE, 0, or 1"
* @return ERROR or NO_ERROR
*
* NOTE: this assumes LVDS is module 10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a misplaced comment, as there's no relationship between ZMQ and LVDS?

* This function does the following:
* 1) turns zmq server on
*
*/
long Interface::zmq(std::string state_in, std::string &state_out) {
std::string function = "Archon::Interface::zmq";
std::stringstream message;
long error = NO_ERROR;
const std::string& endpoint = "tcp://*:5555";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make endpoint configurable rather than hard-coded?


if ( !state_in.empty() ) {
try {
std::transform( state_in.begin(), state_in.end(), state_in.begin(), ::toupper ); // make uppercase

if ( state_in == "FALSE" || state_in == "0" ) {
// Shut down ZMQ server
if (!this->is_zmq) {
return (error);
}

this->is_zmq = false;

if (this->serverThread_.joinable()) {
this->serverThread_.join();
}
this->publisher_.reset();
this->context_.reset();

if (error != NO_ERROR) {
message << "shutting down ZMQ server: ";
} else {
message << "shutting down ZMQ server: ";
}

logwrite(function, message.str());
} else if ( state_in == "TRUE" || state_in == "1" ) {
// Turn on ZMQ server
if (this->is_zmq) {
return (error);
}

std::lock_guard<std::mutex> lock(this->serverMutex_);

try {
context_ = std::make_unique<zmq::context_t>(1);
this->publisher_ = std::make_unique<zmq::socket_t>(*context_, zmq::socket_type::xpub);
this->publisher_->bind(endpoint);
std::cout << "ZMQ server started in XPUB mode on " << endpoint << std::endl;
this->is_zmq = true;

serverThread_ = std::thread([this]() {
while (this->is_zmq) {
try {
zmq::message_t message;
publisher_->recv(message, zmq::recv_flags::dontwait); // Non-blocking receive
if (message.size() > 0) {
std::string received_message(static_cast<char*>(message.data()), message.size());
// Process subscription messages
std::cout << "Received ZMQ message: " << received_message << std::endl;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here? Maybe this puts data in a dequeue?

}
} catch (const zmq::error_t& e) {
if (e.num() != EWOULDBLOCK) { // Ignore would block errors.
std::cerr << "ZMQ receive error: " << e.what() << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // Prevent busy waiting
}

if(this->publisher_){
this->publisher_->close();
}

if(this->context_){
this->context_->shutdown();
}

std::cout << "ZMQ server stopped." << std::endl;
});
} catch (const zmq::error_t& e) {
std::cerr << "Error starting ZMQ server: " << e.what() << std::endl;
this->is_zmq = false; // Ensure the flag is reset in case of error.
}

if (error != NO_ERROR) {
message << "turning on ZMQ server: ";
} else {
message << "turning on ZMQ server: ";
}

logwrite(function, message.str());
} else {
message.str(""); message << "ZMQ server " << state_in << " is invalid. Expecting {true,false,0,1}";
this->camera.log_error( function, message.str() );
return ERROR;
}

} catch (...) {
message.str(""); message << "unknown exception converting zmq state " << state_in << " to uppercase";
this->camera.log_error( function, message.str() );
return ERROR;
}
}

state_out = ( this->is_zmq ? "true" : "false" );

if (error != NO_ERROR) {
message.str(""); message << "setting autofetch state to " << state_in;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

autofetch state?

this->camera.log_error( function, message.str() );
return ERROR;
}

return (error);
}
/**************** Archon::Interface::zmq *******************************/

/**************** Archon::Interface::write_to_zmq **************************/
/**
* @fn write_to_zmq
* @brief write data on zmq socket
* @param string
* @return ERROR or NO_ERROR
*
*/
long Interface::write_to_zmq(const std::string& message) {
std::string function = "Archon::Interface::write_to_zmq";
std::lock_guard<std::mutex> lock(this->serverMutex_); // Protect access
long error = NO_ERROR;

if (this->is_zmq && this->publisher_) {
try {
zmq::message_t zmq_message(message.size());
memcpy(zmq_message.data(), message.data(), message.size());
publisher_->send(zmq_message, zmq::send_flags::none);
error = NO_ERROR;
} catch (const zmq::error_t& e) {
std::cerr << "Error publishing message: " << e.what() << std::endl;
}
} else {
error = ERROR;
std::cerr << "ZMQ server is not running or publisher is not available." << std::endl;
}

return (error);
}
/**************** Archon::Interface::get_parameter **************************/

/**************** Archon::Interface::hexpose ******************************/
/**
* @fn hexpose
Expand Down Expand Up @@ -4723,6 +4878,19 @@ namespace Archon {
return error;
}

std::cout << "Image bitpix: " << std::to_string(this->camera_info.bitpix) << ", data type: " << std::to_string(this->camera_info.datatype) << std::endl;
int16_t *cbuf16s;
cbuf16s = (int16_t *)this->image_data; // cast to 16b signed int
std::stringstream ss;
for (size_t i=0; i < this->camera_info.section_size; i++) {
ss << cbuf16s[i];
if (i < this->camera_info.section_size - 1) {
ss << ",";
}
}
// std::cout << "Image data: " << ss.str() << std::endl;
this->write_to_zmq(ss.str());

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if write_to_zmq( accepted *buf, bufsz ) and then did something like,

zmq::message_t msg(cbuf16s, sizeof(int) * section_size);
write_to_zmq(msg)

// ASYNC status message on completion of each readout
nread++;
message.str(""); message << "READOUT COMPLETE (" << nread << " of " << nseq << " read)";
Expand Down Expand Up @@ -5437,7 +5605,7 @@ namespace Archon {
message << "waiting for new frame: current frame=" << this->lastframe << " current buffer=" << this->frame.index+1;
logwrite(function, message.str());

usleep( 700 ); // tune for size of window
// usleep( 700 ); // tune for size of window

this->frame.index += 1;

Expand Down
12 changes: 12 additions & 0 deletions camerad/archon.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
#pragma once

#include <zmq.hpp>
#include <CCfits/CCfits> //!< needed here for types in set_axes()
#include <atomic>
#include <chrono>
Expand Down Expand Up @@ -86,6 +87,12 @@ namespace Archon {
unsigned long int start_timer, finish_timer; //!< Archon internal timer, start and end of exposure
int n_hdrshift; //!< number of right-shift bits for Archon buffer in HDR mode

// For ZMQ
std::unique_ptr<zmq::context_t> context_;
std::unique_ptr<zmq::socket_t> publisher_;
std::thread serverThread_;
std::mutex serverMutex_;

public:
Interface();

Expand Down Expand Up @@ -116,6 +123,7 @@ namespace Archon {
bool is_longexposure_set; //!< true for long exposure mode (exptime in sec), false for exptime in msec
bool is_window; //!< true if in window mode for h2rg, false if not
bool is_autofetch;
bool is_zmq;
int win_hstart;
int win_hstop;
int win_vstart;
Expand Down Expand Up @@ -243,6 +251,10 @@ namespace Archon {

long autofetch(std::string state_in, std::string &state_out);

long zmq(std::string state_in, std::string &state_out);

long int write_to_zmq(const std::string& message);

long video();

long wait_for_exposure();
Expand Down
9 changes: 8 additions & 1 deletion camerad/camerad.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,14 @@ void doit(Network::TcpSocket sock) {
ret = server.autofetch(args, retstring);
if (!retstring.empty()) {
sock.Write(retstring);
sock.Write( " ");
sock.Write(" ");
}
}
else if (cmd=="zmq") {
ret = server.zmq(args, retstring);
if (!retstring.empty()) {
sock.Write(retstring);
sock.Write(" ");
}
}
else if ( cmd == "fetchlog" ) {
Expand Down
Loading