Skip to content
This repository was archived by the owner on Jan 28, 2022. It is now read-only.

Commit a608c87

Browse files
committed
[iobench] Added Cassandra backend.
1 parent 51e0b2d commit a608c87

File tree

4 files changed

+382
-4
lines changed

4 files changed

+382
-4
lines changed

CMake/FindCassandra.cmake

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# - Try to find the Cassandra installation.
2+
#
3+
# The following are set after configuration is done:
4+
# CASSANDRA_FOUND - True if Cassandra installation is found and Cassandra not disabled by the user
5+
# CASSANDRA_INCLUDE_DIRS - Cassandra libraries path
6+
# CASSANDRA_LIBRARY_DIRS - Cassandra headers path
7+
# CASSANDRA_LIBRARIES - List of libraries when using Cassandra
8+
9+
# Use 'cmake -DNEUROMAPP_DISABLE_CASSANDRA=TRUE' to disable Cassandra
10+
# or add 'PATH_TO_CASSANDRA_ROOT_DIR' to CMAKE_PREFIX_PATH'
11+
# or set 'CASSANDRA_PATH=PATH_TO_CASSANDRA_ROOT_DIR to help cmake find the Cassandra installation
12+
13+
if (NOT NEUROMAPP_DISABLE_CASSANDRA)
14+
find_path(CASSANDRA_INCLUDE_DIR NAMES cassandra.h
15+
HINTS ${CMAKE_PREFIX_PATH}/include $ENV{CASSANDRA_PATH}/include)
16+
find_library(CASSANDRA_LIBRARY_CASS NAMES cassandra
17+
HINTS ${CMAKE_PREFIX_PATH}/lib $ENV{CASSANDRA_PATH}/lib)
18+
19+
get_filename_component(CASSANDRA_LIBRARY_DIRS "${CASSANDRA_LIBRARY_CASS}" PATH)
20+
21+
find_library(CASSANDRA_LIBRARY_UV NAMES uv
22+
HINTS ${CASSANDRA_LIBRARY_DIRS} ${CMAKE_PREFIX_PATH}/lib $ENV{CASSANDRA_PATH}/lib)
23+
24+
set(CASSANDRA_LIBRARIES ${CASSANDRA_LIBRARY_CASS} ${CASSANDRA_LIBRARY_UV})
25+
set(CASSANDRA_INCLUDE_DIRS ${CASSANDRA_INCLUDE_DIR})
26+
get_filename_component(CASSANDRA_LIBRARY_DIRS "${CASSANDRA_LIBRARY_CASS}" PATH)
27+
28+
include(FindPackageHandleStandardArgs)
29+
# Handle the QUIETLY and REQUIRED arguments and set the CASSANDRA_FOUND to TRUE
30+
# if all listed variables are TRUE
31+
find_package_handle_standard_args(CASSANDRA DEFAULT_MSG CASSANDRA_LIBRARY_CASS CASSANDRA_LIBRARY_UV CASSANDRA_INCLUDE_DIR)
32+
33+
mark_as_advanced(CASSANDRA_INCLUDE_DIR CASSANDRA_LIBRARY_CASS CASSANDRA_LIBRARY_UV)
34+
else()
35+
set(CASSANDRA_FOUND "FALSE")
36+
set(CASSANDRA_INCLUDE_DIRS "")
37+
set(CASSANDRA_LIBRARY_DIRS "")
38+
set(CASSANDRA_LIBRARIES "")
39+
endif()
40+
41+
42+
# Print summary
43+
if(CASSANDRA_FOUND)
44+
message("Cassandra include dir = ${CASSANDRA_INCLUDE_DIR}")
45+
message("Cassandra lib = ${CASSANDRA_LIBRARY_CASS}")
46+
47+
message("CASSANDRA_FOUND: ${CASSANDRA_FOUND}")
48+
message("CASSANDRA_INCLUDE_DIRS: ${CASSANDRA_INCLUDE_DIRS}")
49+
message("CASSANDRA_LIBRARY_DIRS: ${CASSANDRA_LIBRARY_DIRS}")
50+
message("CASSANDRA_LIBRARIES: ${CASSANDRA_LIBRARIES}")
51+
else()
52+
if(NEUROMAPP_DISABLE_CASSANDRA)
53+
message(STATUS "Cassandra disabled by user.")
54+
else(NEUROMAPP_DISABLE_CASSANDRA)
55+
message(STATUS "Couldn't find Cassandra library, ignoring it.")
56+
endif(NEUROMAPP_DISABLE_CASSANDRA)
57+
endif()

neuromapp/iobench/CMakeLists.txt

+15-2
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,30 @@ else()
2525
set(LEVELDB_LIBRARIES "")
2626
endif()
2727

28+
# Cassandra backend
29+
find_package(Cassandra)
30+
if (${CASSANDRA_FOUND})
31+
include_directories(${CASSANDRA_INCLUDE_DIRS})
32+
add_definitions(-DIO_CASS -DCASS_USE_OPENSSL -D_GNU_SOURCE)
33+
34+
set(CASSANDRA_LIBRARIES ${CASSANDRA_LIBRARIES} "-lssl -lcrypto")
35+
36+
install (FILES backends/cassandra.h DESTINATION include)
37+
else()
38+
set(CASSANDRA_LIBRARIES "")
39+
endif()
40+
2841
# OMP executable
2942
add_executable(iobench-omp iobench.cpp benchmark.cpp)
30-
target_link_libraries (iobench-omp ${LEVELDB_LIBRARIES} ${Boost_PROGRAM_OPTIONS_LIBRARIES})
43+
target_link_libraries (iobench-omp ${LEVELDB_LIBRARIES} ${CASSANDRA_LIBRARIES} ${Boost_PROGRAM_OPTIONS_LIBRARIES})
3144

3245
if(MPI_FOUND)
3346
# MPI+OMP executable (add -DIO_MPI to compile flags)
3447
add_executable(MPI_Exec_io iobench.cpp benchmark.cpp)
3548
set_target_properties(MPI_Exec_io PROPERTIES
3649
COMPILE_FLAGS "${MPI_C_COMPILE_FLAGS} ${MPI_CXX_COMPILE_FLAGS} -DIO_MPI")
3750
# Adding MPI_LIBRARIES adds also the -Bdynamic flag, which makes execution crash on BG/Q
38-
target_link_libraries (MPI_Exec_io ${LEVELDB_LIBRARIES} ${MPI_CXX_LIBRARIES} ${Boost_PROGRAM_OPTIONS_LIBRARIES})
51+
target_link_libraries (MPI_Exec_io ${LEVELDB_LIBRARIES} ${CASSANDRA_LIBRARIES} ${MPI_CXX_LIBRARIES} ${Boost_PROGRAM_OPTIONS_LIBRARIES})
3952
endif()
4053

4154

+308
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
/*
2+
* Neuromapp - cassandra.h, Copyright (c), 2015,
3+
* Judit Planas - Swiss Federal Institute of technology in Lausanne,
4+
5+
* All rights reserved.
6+
*
7+
* This program is free software; you can redistribute it and/or
8+
* modify it under the terms of the GNU General Public License
9+
* as published by the Free Software Foundation; either version 2
10+
* of the License, or (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.. See the GNU
16+
* Lesser General Public License for more details.
17+
*
18+
* You should have received a copy of the GNU Lesser General Public
19+
* License along with this library.
20+
*/
21+
22+
/**
23+
* @file neuromapp/iobench/backends/cassandra.h
24+
* iobench Miniapp: Cassandra backend
25+
*/
26+
27+
#ifndef MAPP_IOBENCH_CASS_
28+
#define MAPP_IOBENCH_CASS_
29+
30+
#include <string>
31+
#include <iostream>
32+
#include <stdio.h>
33+
#include <stdlib.h>
34+
35+
// Get OMP header if available
36+
#include "utils/omp/compatibility.h"
37+
#include "iobench/backends/basic.h"
38+
39+
// Cassandra header
40+
#include <cassandra.h>
41+
42+
43+
/** \fn createDB()
44+
\brief Create the appropriate DB backend
45+
\param backend name of the desired DB to create
46+
\param db std::vector returning the initialized DBs
47+
*/
48+
inline void createDB (const std::string & backend, std::vector<BaseKV *> & db);
49+
50+
51+
class KVStatusCass : public KVStatus {
52+
public:
53+
void * _key;
54+
void * _value;
55+
size_t _key_size;
56+
size_t _value_size;
57+
CassFuture * _future;
58+
59+
bool success() { return cass_future_error_code(_future) == CASS_OK; }
60+
61+
~KVStatusCass() { cass_future_free(_future); }
62+
};
63+
64+
65+
class CassKV : public BaseKV {
66+
67+
private:
68+
CassCluster * _cluster;
69+
CassSession * _session;
70+
const char * _wr_query;
71+
const char * _rd_query;
72+
73+
void print_error(CassFuture* future);
74+
CassCluster* create_cluster();
75+
CassError connect_session(CassSession* session, const CassCluster* cluster);
76+
CassError execute_query(CassSession* session, const char* query);
77+
78+
public:
79+
CassKV() : _cluster(NULL), _session(NULL), _wr_query(NULL), _rd_query(NULL) {}
80+
void initDB(iobench::args &a);
81+
void finalizeDB();
82+
inline void putKV(KVStatus * kvs, void * key, size_t key_size, void * value, size_t value_size);
83+
inline size_t getKV (KVStatus * kvs, void * key, size_t key_size, void * value, size_t value_size);
84+
inline void waitKVput(std::vector<KVStatus *> &status, int start, int end);
85+
inline void waitKVget(std::vector<KVStatus *> &status, int start, int end);
86+
void deleteDB();
87+
std::string getDBName() { return "cassandra"; }
88+
89+
void createKVStatus(int n, std::vector<KVStatus *> &status);
90+
};
91+
92+
93+
/** \fn void CassKV::print_error(CassFuture* future)
94+
\brief Function to print Cassandra errors
95+
*/
96+
void CassKV::print_error(CassFuture* future) {
97+
const char* message;
98+
size_t message_length;
99+
cass_future_error_message(future, &message, &message_length);
100+
fprintf(stderr, "Error: %.*s\n", (int)message_length, message);
101+
}
102+
103+
/** \fn CassCluster* CassKV::create_cluster()
104+
\brief Create a Cassandra cluster object
105+
*/
106+
CassCluster* CassKV::create_cluster() {
107+
CassCluster* cluster = cass_cluster_new();
108+
cass_cluster_set_contact_points(cluster, "127.0.0.1");
109+
cass_cluster_set_protocol_version(cluster, 3);
110+
cass_cluster_set_write_bytes_high_water_mark(cluster, 1024*1024*10); // 10 MB
111+
cass_cluster_set_write_bytes_high_water_mark(cluster, 1024 * 4*1024 * 200); // Make sure we have enough buffer space
112+
return cluster;
113+
}
114+
115+
/** \fn CassError CassKV::connect_session(CassSession* session, const CassCluster* cluster)
116+
\brief Connect to a Cassandra cluster
117+
*/
118+
CassError CassKV::connect_session(CassSession* session, const CassCluster* cluster) {
119+
CassError rc = CASS_OK;
120+
CassFuture* future = cass_session_connect(session, cluster);
121+
122+
cass_future_wait(future);
123+
rc = cass_future_error_code(future);
124+
if (rc != CASS_OK) {
125+
print_error(future);
126+
}
127+
cass_future_free(future);
128+
129+
return rc;
130+
}
131+
132+
/** \fn CassError CassKV::execute_query(CassSession* session, const char* query)
133+
\brief Execute a Cassandra query
134+
*/
135+
CassError CassKV::execute_query(CassSession* session, const char* query) {
136+
CassError rc = CASS_OK;
137+
CassFuture* future = NULL;
138+
CassStatement* statement = cass_statement_new(query, 0);
139+
140+
future = cass_session_execute(session, statement);
141+
cass_future_wait(future);
142+
143+
rc = cass_future_error_code(future);
144+
if (rc != CASS_OK) {
145+
print_error(future);
146+
}
147+
148+
cass_future_free(future);
149+
cass_statement_free(statement);
150+
151+
return rc;
152+
}
153+
154+
/** \fn void initDB(iobench::args &a)
155+
\brief Init the needed data for the specific DB
156+
*/
157+
void CassKV::initDB(iobench::args &a)
158+
{
159+
_cluster = create_cluster();
160+
_session = cass_session_new();
161+
162+
if (connect_session(_session, _cluster) != CASS_OK) {
163+
cass_cluster_free(_cluster);
164+
cass_session_free(_session);
165+
return;
166+
}
167+
168+
_wr_query = "INSERT INTO my_kv (key, value) VALUES (?, ?);";
169+
_rd_query = "SELECT value FROM my_kv WHERE key = ?;";
170+
171+
execute_query(_session,
172+
"CREATE KEYSPACE IF NOT EXISTS my_keyspace WITH replication = { \
173+
'class': 'SimpleStrategy', 'replication_factor': '1' };");
174+
175+
execute_query(_session,
176+
"CREATE TABLE IF NOT EXISTS my_keyspace.my_kv (key text, \
177+
value text, PRIMARY KEY (key, value));");
178+
//WITH compression = { 'sstable_compression' : '' };");
179+
180+
execute_query(_session, "USE my_keyspace");
181+
182+
std::cout << "Cassandra connection: OK!" << std::endl;
183+
}
184+
185+
/** \fn void finalizeDB()
186+
\brief Finalize the needed data for the specific DB
187+
*/
188+
void CassKV::finalizeDB()
189+
{
190+
CassFuture * close_future = cass_session_close(_session);
191+
cass_future_wait(close_future);
192+
cass_future_free(close_future);
193+
194+
cass_cluster_free(_cluster);
195+
cass_session_free(_session);
196+
printf("Cassandra disconnection: OK!\n");
197+
}
198+
199+
/** \fn void putKV(KVStatus * kvs, void * key, size_t key_size, void * value, size_t value_size)
200+
\brief Insert the given k/v pair into the DB
201+
*/
202+
inline void CassKV::putKV(KVStatus * kvs, void * key, size_t key_size, void * value, size_t value_size)
203+
{
204+
CassStatement* statement = cass_statement_new(_wr_query, 2);
205+
cass_statement_bind_string(statement, 0, (char *) key);
206+
cass_statement_bind_string(statement, 1, (char *) value);
207+
208+
((KVStatusCass *) kvs)->_future = cass_session_execute(_session, statement);
209+
210+
cass_statement_free(statement);
211+
}
212+
213+
/** \fn size_t putKV(KVStatus * kvs, void * key, size_t key_size, void * value, size_t value_size)
214+
\brief Retrieve from the DB the associated value to the given key. Returns retrieved value size
215+
*/
216+
inline size_t CassKV::getKV (KVStatus * kvs, void * key, size_t key_size, void * value, size_t value_size)
217+
{
218+
CassStatement* statement = cass_statement_new(_rd_query, 1);
219+
cass_statement_bind_string(statement, 0, (char *) key);
220+
221+
((KVStatusCass *) kvs)->_future = cass_session_execute(_session, statement);
222+
((KVStatusCass *) kvs)->_key = key;
223+
((KVStatusCass *) kvs)->_value = value;
224+
((KVStatusCass *) kvs)->_key_size = key_size;
225+
((KVStatusCass *) kvs)->_value_size = value_size;
226+
227+
cass_statement_free(statement);
228+
229+
return value_size;
230+
}
231+
232+
/** \fn void waitKVput(std::vector<KVStatus *> &status, int start, int end)
233+
\brief Wait until all the insertions associated to status are committed to the DB
234+
*/
235+
inline void CassKV::waitKVput(std::vector<KVStatus *> &status, int start, int end)
236+
{
237+
for (int i = start; i < end; i++) {
238+
CassFuture* future = ((KVStatusCass *) status[i])->_future;
239+
cass_future_wait(future);
240+
}
241+
}
242+
243+
/** \fn void waitKVget(std::vector<KVStatus *> &status, int start, int end)
244+
\brief Wait until all the queries associated to status are retrieved from the DB
245+
*/
246+
inline void CassKV::waitKVget(std::vector<KVStatus *> &status, int start, int end)
247+
{
248+
for (int i = start; i < end; i++) {
249+
CassFuture* future = ((KVStatusCass *) status[i])->_future;
250+
cass_future_wait(future);
251+
252+
const CassResult* result = cass_future_get_result(future);
253+
254+
if (cass_result_row_count(result) > 0) {
255+
const CassRow* row = cass_result_first_row(result);
256+
257+
const CassValue* column1 = cass_row_get_column(row, 0);
258+
259+
if (column1 != NULL) {
260+
const char* string_value;
261+
size_t string_value_length;
262+
cass_value_get_string(column1, &string_value, &string_value_length);
263+
264+
std::memcpy(((KVStatusCass *) status[i])->_value, string_value, std::min(((KVStatusCass *) status[i])->_value_size, string_value_length));
265+
266+
//std::cout << "Row count is: " << cass_result_row_count(result) << std::endl
267+
// << "Got value with strlength: " << string_value_length << " ;;; value: " << string_value << std::endl;
268+
} //else {
269+
// std::cout << "Value NULL!" << std::endl;
270+
// break;
271+
//}
272+
}
273+
cass_result_free(result);
274+
}
275+
}
276+
277+
/** \fn void deleteDB()
278+
\brief Clear DB contents
279+
*/
280+
void CassKV::deleteDB()
281+
{
282+
execute_query(_session,
283+
"DROP KEYSPACE my_keyspace");
284+
285+
execute_query(_session,
286+
"CREATE KEYSPACE IF NOT EXISTS my_keyspace WITH replication = { \
287+
'class': 'SimpleStrategy', 'replication_factor': '1' };");
288+
289+
execute_query(_session,
290+
"CREATE TABLE IF NOT EXISTS my_keyspace.my_kv (key text, \
291+
value text, PRIMARY KEY (key, value));");
292+
//WITH compression = { 'sstable_compression' : '' };");
293+
294+
execute_query(_session, "USE my_keyspace");
295+
}
296+
297+
/** \fn void createKVStatus(int n, std::vector<KVStatus *> &status)
298+
\brief Create the needed structures to handle asynchronous insertions. Opaque class from the outside
299+
*/
300+
void CassKV::createKVStatus(int n, std::vector<KVStatus *> &status)
301+
{
302+
for (int i = 0; i < n; i++) {
303+
status.push_back(new KVStatusCass());
304+
}
305+
}
306+
307+
308+
#endif // MAPP_IOBENCH_CASS_

0 commit comments

Comments
 (0)