diff --git a/build/build.sh b/build/build.sh old mode 100644 new mode 100755 index 98a277d..71bd36d --- a/build/build.sh +++ b/build/build.sh @@ -23,5 +23,3 @@ echo "--- prepare dependencies ---" echo "--- building arion-agent ---" cmake . && make - -fi diff --git a/build/machine-init.sh b/build/machine-init.sh old mode 100644 new mode 100755 index 1192d3f..1c2228e --- a/build/machine-init.sh +++ b/build/machine-init.sh @@ -129,7 +129,10 @@ echo "5--- installing ebpf dependencies ---" && \ cd /var/local/git && \ git clone https://github.com/futurewei-cloud/zeta && \ cd zeta && \ - ./build.sh && \ + git submodule update --init --recursive && \ + cd src/extern/libbpf/src && \ + mkdir build root && \ + BUILD_STATIC_ONLY=y OBJDIR=build DESTDIR=root make install && \ cd ~ echo "6--- installing sqlite3 database ---" && \ @@ -149,7 +152,16 @@ echo "7--- installing sqlite orm lib dependencies ---" && \ sudo cmake --build build --target install && \ cd ~ -echo "8--- installing double conversion for folly ---" && \ +echo "8--- installing glog for folly ---" && \ + cd /var/local/git && \ + git clone https://github.com/google/glog.git && \ + cd glog && \ + sudo cmake -S . -B build -G "Unix Makefiles" && \ + sudo cmake --build build && \ + sudo cmake --build build --target install && \ + cd ~ + +echo "9--- installing double conversion for folly ---" && \ cd /var/local/git && \ git clone https://github.com/google/double-conversion.git && \ cd double-conversion && \ @@ -158,7 +170,7 @@ echo "8--- installing double conversion for folly ---" && \ sudo make install && \ cd ~ -echo "9--- installing folly lib for concurrent hashmap ---" && \ +echo "10--- installing folly lib for concurrent hashmap ---" && \ cd /var/local/git && \ git clone https://github.com/facebook/folly.git && \ cd folly && \ @@ -168,3 +180,13 @@ echo "9--- installing folly lib for concurrent hashmap ---" && \ make && \ sudo make install && \ cd ~ + +echo "11--- downloading xdp-project files" + cd /var/local/git && \ + git clone https://github.com/xdp-project/xdp-tutorial&& \ +# cd xdp-tutorial && \ +# git submodule update --init --recursive && \ +# cd src/extern/libbpf/src && \ +# mkdir build root && \ +# BUILD_STATIC_ONLY=y OBJDIR=build DESTDIR=root make install && \ + cd ~ \ No newline at end of file diff --git a/include/af_xdp_user_multi_thread.h b/include/af_xdp_user_multi_thread.h new file mode 100644 index 0000000..c9116c1 --- /dev/null +++ b/include/af_xdp_user_multi_thread.h @@ -0,0 +1,33 @@ +// +// Created by ubuntu on 10/4/22. +// + +#ifndef ARIONAGENT_AF_XDP_USER_MULTI_THREADED_H +#define ARIONAGENT_AF_XDP_USER_MULTI_THREADED_H + +#include "logger.h" +#include +#include +#include +#include +#ifdef __cplusplus +extern "C" +{ +#include "common_params.h" +#include "common_user_bpf_xdp.h" +#include "common_libbpf.h" +} +#endif +static const char *__d__ = "AF_XDP kernel bypass example multi threaded\n"; + +class af_xdp_user_multi_thread { +public: + af_xdp_user_multi_thread() { + printf("%s", "Start of multithread af_xdp userspace program."); + } + static void* run_af_xdp_multi_threaded(void* args/*std::string table_name_neighbor_ebpf_map*/); +private: + +}; + +#endif //ARIONAGENT_AF_XDP_USER_H diff --git a/include/db_client.h b/include/db_client.h index b797d4b..7ed221d 100644 --- a/include/db_client.h +++ b/include/db_client.h @@ -15,10 +15,14 @@ #include #include #include +#include #include "dispatch_queue.h" +#include "xdp/trn_datamodel.h" +#include "util.h" using namespace sqlite_orm; + struct Neighbor { int vni; std::string vpc_ip; @@ -28,36 +32,230 @@ struct Neighbor { int version; }; // local db table 1 - neighbor info table that stores the latest neighbors (if there are version updates per neighbor) received from ArionMaster -struct ProgrammingState { +struct NeighborProgrammingState { int version; }; // local db table 2 - neighbor ebpf programmed version -std::string g_local_db_path = "/var/local/arion/arion_wing.db"; - -// Schema definition (create DB if not exists) or retrieved handle (get DB if exists already) of local db -auto local_db = make_storage(g_local_db_path, - make_table("neighbor", - make_column("vni", &Neighbor::vni), - make_column("vpc_ip", &Neighbor::vpc_ip), - make_column("host_ip", &Neighbor::host_ip), - make_column("vpc_mac", &Neighbor::vpc_mac), - make_column("host_mac", &Neighbor::host_mac), - make_column("version", &Neighbor::version), - primary_key(&Neighbor::vni, &Neighbor::vpc_ip) - ), - make_table("journal", - make_column("version", &ProgrammingState::version), - primary_key(&ProgrammingState::version) - ) -); - -// Create local db writer single thread execution queue -dispatch_queue local_db_writer_queue("Local db background write queue", 1); - -static int FindLKGVersion() { - int lkg_ver = 0; - - /* original sql is +struct SecurityGroupPortBinding { + std::string port_id; // vni-vpc_ip + std::string security_group_id; + int version; +}; // local db table 3, stores the mapping between port and security group, 1 group can have multiple rules. + +struct SecurityGroupRule { + std::string id; //UUID, should be key in DB + std::string security_group_id; + std::string remote_group_id; + std::string direction; + std::string remote_ip_prefix; + std::string protocol; + int port_range_max; + int port_range_min; + std::string ether_type; + int vni; + int version; +}; // local db table 3, security group rule table that stores the latest security group rules (if there are version updates per neighbor) received from ArionMaster + +struct SecurityGroupPortBindingProgrammingState { + int version; +}; // local db table 2 - security rule ebpf programmed version + +// copied from arp_hash in ACA +struct EndpointHash { + size_t operator()(const endpoint_key_t &e) const{ + return std::hash<__u32>()(e.vni) ^ (std::hash<__u32>()(e.ip) << 1); + } +}; + +struct EndpointEqual { + bool operator() (const endpoint_key_t &e, const endpoint_key_t &f) const { + return (e.vni == f.vni) && (e.ip == f.ip); + } +}; + +struct SecurityGroupRuleHash { + size_t operator()(const sg_cidr_key_t &e) const{ + return std::hash<__u32>()(e.prefixlen) ^ std::hash<__u32>()(e.vni) ^ std::hash<__u16>()(e.port) ^ + std::hash<__u8>()(e.direction) ^ std::hash<__u8>()(e.protocol) ^ std::hash<__u32>()(e.local_ip) ^ + std::hash<__u32>()(e.remote_ip); + } +}; + +struct SecurityGroupRuleEqual { + bool operator() (const sg_cidr_key_t &e, const sg_cidr_key_t &f) const { + return (e.remote_ip == f.remote_ip) && (e.local_ip == f.local_ip) && (e.protocol == f.protocol) && + (e.direction == f.direction) && (e.port == f.port) && (e.vni == f.vni) && (e.prefixlen == f.prefixlen); + } +}; + +static std::string g_local_db_path = "/var/local/arion/arion_wing.db"; + +inline auto make_storage_query () { + auto storage = make_storage(g_local_db_path, + make_table("neighbor", + make_column("vni", &Neighbor::vni), + make_column("vpc_ip", &Neighbor::vpc_ip), + make_column("host_ip", &Neighbor::host_ip), + make_column("vpc_mac", &Neighbor::vpc_mac), + make_column("host_mac", &Neighbor::host_mac), + make_column("version", &Neighbor::version), + primary_key(&Neighbor::vni, &Neighbor::vpc_ip) + ), + make_table("journal_neighbor", + make_column("version", &NeighborProgrammingState::version), + primary_key(&NeighborProgrammingState::version) + ), + make_table("security_group_rule", + make_column("id", &SecurityGroupRule::id), + make_column("security_group_id", &SecurityGroupRule::security_group_id), + make_column("remote_group_id", &SecurityGroupRule::remote_group_id), + make_column("direction", &SecurityGroupRule::direction), + make_column("remote_ip_prefix", &SecurityGroupRule::remote_ip_prefix), + make_column("protocol", &SecurityGroupRule::protocol), + make_column("port_range_max", &SecurityGroupRule::port_range_max), + make_column("port_range_min", &SecurityGroupRule::port_range_min), + make_column("ether_type", &SecurityGroupRule::ether_type), + make_column("vni", &SecurityGroupRule::vni), + make_column("version", &SecurityGroupRule::version), + primary_key(&SecurityGroupRule::id) + ), + make_table("security_group_port_binding", + make_column("port_id", &SecurityGroupPortBinding::port_id), + make_column("security_group_id", &SecurityGroupPortBinding::security_group_id), + make_column("version", &SecurityGroupPortBinding::version), + primary_key(&SecurityGroupPortBinding::port_id, &SecurityGroupPortBinding::security_group_id, &SecurityGroupPortBinding::version) + ), + // 1 version is written when all related SecurityGroupRules of a SecurityGroupPortBinding + // is programmed into the eBPF map and written into the DB. + make_table("journal_security_group_rules", + make_column("version", &SecurityGroupPortBindingProgrammingState::version), + primary_key(&SecurityGroupPortBindingProgrammingState::version) + ) + ); + storage.sync_schema(); + return storage; +}; + +using Storage = decltype(make_storage_query()); + +class db_client { +public: + static db_client &get_instance() { + static db_client instance; + return instance; + }; + + Storage local_db = make_storage_query(); + + using NeighborPrepareStatement = decltype(local_db.prepare(select(columns(&Neighbor::host_ip, &Neighbor::vpc_mac, &Neighbor::host_mac), + where(is_equal((&Neighbor::vni), 0) and is_equal((&Neighbor::vpc_ip), "127.0.0.1"))))); + NeighborPrepareStatement query_neighbor_statement = local_db.prepare( + select( + columns(&Neighbor::host_ip, &Neighbor::vpc_mac, &Neighbor::host_mac), + where( + is_equal((&Neighbor::vni), 0) + and + is_equal((&Neighbor::vpc_ip), "127.0.0.1") + ) + ) + ); + // Create local db writer single thread execution queue + dispatch_queue local_db_writer_queue = dispatch_queue("Local db background write queue", 1); + + std::unordered_map endpoint_cache; + + std::unordered_map sg_rule_cache; + + + // function that will be called at the beginning of the program, reads rows from the neighbor table + // and fills the in-memory endpoint cache, which is used for fast lookup. + void FillEndpointCacheFromDB() { + std::string table_name_sg_ebpf_map = "/sys/fs/bpf/security_group_map"; + int fd_security_group_ebpf_map = bpf_obj_get(table_name_sg_ebpf_map.c_str()); + printf("DB Client: sg map fd: %ld\n", fd_security_group_ebpf_map); + + std::string table_name_sg_cidr_map = "/sys/fs/bpf/sg_cidr_map"; + int fd_sg_cidr_ebpf_map = bpf_obj_get(table_name_sg_cidr_map.c_str()); + printf("DB Client: sg cidr map fd: %ld\n", fd_security_group_ebpf_map); + + // Get all neighbors from SQLite Database + auto get_all_neighbors_statement = local_db.prepare( + select( + columns(&Neighbor::vni, &Neighbor::vpc_ip, &Neighbor::host_mac, &Neighbor::vpc_mac, &Neighbor::host_ip) + ) + ); + auto rows = local_db.execute(get_all_neighbors_statement); + printf("Retrieved %ld neighbors from local DB\n", rows.size()); + for (auto & row : rows) { + int vni = get<0>(row); + auto vpc_ip = get<1>(row).c_str(); + auto host_ip = get<4>(row).c_str(); + auto vpc_mac = get<3>(row).c_str(); + auto host_mac = get<2>(row).c_str(); +// printf("Retrieved this endpoint from local DB: VNI: %ld, vpc_ip: %s, host_mac: %s, vpc_mac: %s, host_ip: %s\n", +//// get<0>(row), get<1>(row).c_str(), get<2>(row).c_str(), get<3>(row).c_str(), get<4>(row).c_str() +// vni, vpc_ip, host_mac, vpc_mac, host_ip +// ); + endpoint_key_t key; + key.vni = vni; //(get<0>(row)); + struct sockaddr_in endpoint_vpc_ip_socket; + inet_pton(AF_INET, vpc_ip, &(endpoint_vpc_ip_socket.sin_addr)); + key.ip = endpoint_vpc_ip_socket.sin_addr.s_addr; + endpoint_t value; + std::sscanf(vpc_mac, "%02x:%02x:%02x:%02x:%02x:%02x", + &value.mac[0], &value.mac[1], &value.mac[2], + &value.mac[3], &value.mac[4], &value.mac[5]); + + std::sscanf(host_mac, "%02x:%02x:%02x:%02x:%02x:%02x", + &value.hmac[0], &value.hmac[1], &value.hmac[2], + &value.hmac[3], &value.hmac[4], &value.hmac[5]); + struct sockaddr_in endpoint_host_ip_socket; + inet_pton(AF_INET, host_ip, &(endpoint_host_ip_socket.sin_addr)); + value.hip = endpoint_host_ip_socket.sin_addr.s_addr; + endpoint_cache[key] = value; +// endpoint_cache.insert(key, value); +// printf("Inserted this endpoint into cache: VNI: %ld, vpc_ip: %s, ", key.vni, inet_ntoa(endpoint_vpc_ip_socket.sin_addr)); +// printf("host_mac: %x:%x:%x:%x:%x:%x, vpc_mac: %x:%x:%x:%x:%x:%x, host_ip: %s\n", +// value.hmac[0],value.hmac[1],value.hmac[2],value.hmac[3],value.hmac[4],value.hmac[5], +// value.mac[0],value.mac[1],value.mac[2],value.mac[3],value.mac[4],value.mac[5], +// inet_ntoa(endpoint_host_ip_socket.sin_addr) +// ); +// printf("Finished one endpoint\n"); + /* + security_group_key_t sg_key; + sg_key.vni = vni; + sg_key.ip = endpoint_vpc_ip_socket.sin_addr.s_addr; + sg_key.direction = 0; + security_group_t sg_value; + sg_value.sg_id = 12345; + sg_value.action = 1; +// int sg_map_insert_rc = bpf_map_update_elem(fd_security_group_ebpf_map, &sg_key, &sg_value, BPF_ANY); +// printf("Sg map insert rc: %ld\n", sg_map_insert_rc); + sg_cidr_key_t sg_cidr_key; + // add the number of bits for all fields, except prefexlen and dst_ip, then add the cidr range, in this case it is /24 + sg_cidr_key.prefixlen = (32 + 16 + 8 + 8 + 32 + 24); +// inet_pton(AF_INET, vpc_ip, sg_cidr_key.lpm_key.data); + sg_cidr_key.local_ip = endpoint_vpc_ip_socket.sin_addr.s_addr; + sg_cidr_key.remote_ip = endpoint_vpc_ip_socket.sin_addr.s_addr; + sg_cidr_key.vni = vni; + sg_cidr_key.direction = 1; + sg_cidr_key.protocol = IPPROTO_TCP; + sg_cidr_key.port = 888; + int sg_map_insert_rc = bpf_map_update_elem(fd_sg_cidr_ebpf_map, &sg_cidr_key, &sg_value, BPF_ANY); + if (sg_map_insert_rc != 0) { + printf("Error for inserting into lpm map: %s", std::strerror(errno)); + } + */ +// printf("Sg map insert rc: %ld\n", sg_map_insert_rc); + + + } + printf("Finished retrieving from local DB, not endpoint cache has %ld endpoints\n", endpoint_cache.size()); + } + int FindLKGVersion() { + int lkg_ver = 0; + + /* original sql is SELECT MIN(mo.version) + 1 FROM journal AS mo WHERE NOT EXISTS @@ -68,24 +266,63 @@ static int FindLKGVersion() { ); */ - using als_mo = alias_a; - using als_mi = alias_b; - auto ver_gaps = local_db.select(alias_column(&ProgrammingState::version), - from(), - where(not exists( - select(0 - c(alias_column(&ProgrammingState::version)), - from(), - where(is_equal(c(alias_column(&ProgrammingState::version)) + 1, alias_column(&ProgrammingState::version))) - )))); - - // lkg version: - // case 1 - if no ver gap, the query above will return the max version (since this version is already programmed, so return max + 1) - // case 2 - if there's ver gap, then always locate the min ver gap (as above, return minVerGap + 1) - // case 3 - if the table is empty like new launched instance, then always sync/watch from server with version 1 - // (since server syncs including the version agent provides, so sync/watch from version 1 means sync everything - if (ver_gaps.size() > 0) { - lkg_ver = *std::min_element(ver_gaps.begin(), ver_gaps.end()); + using als_mo = alias_a; + using als_mi = alias_b; + auto ver_gaps = local_db.select(alias_column(&NeighborProgrammingState::version), + from(), + where(not exists( + select(0 - c(alias_column(&NeighborProgrammingState::version)), + from(), + where(is_equal(c(alias_column(&NeighborProgrammingState::version)) + 1, alias_column(&NeighborProgrammingState::version))) + )))); + + // lkg version: + // case 1 - if no ver gap, the query above will return the max version (since this version is already programmed, so return max + 1) + // case 2 - if there's ver gap, then always locate the min ver gap (as above, return minVerGap + 1) + // case 3 - if the table is empty like new launched instance, then always sync/watch from server with version 1 + // (since server syncs including the version agent provides, so sync/watch from version 1 means sync everything + if (ver_gaps.size() > 0) { + lkg_ver = *std::min_element(ver_gaps.begin(), ver_gaps.end()); + } + + return lkg_ver + 1; + }; + + endpoint_t GetNeighbor(int vni, std::string vpc_ip) { + endpoint_t found_neighbor; + found_neighbor.hip = 0; +// printf("GetNeighbor with VNI: [%d], vpc_ip: [%s]\n", vni, vpc_ip.c_str()); + get<0>(query_neighbor_statement) = vni; + get<1>(query_neighbor_statement) = vpc_ip.c_str(); +// printf("Statement: %s\n", query_neighbor_statement.sql().c_str()); + auto rows = local_db.execute(query_neighbor_statement); +// printf("Found %ld rows\n", rows.size()); + for (auto& row : rows) { + struct sockaddr_in ep_hip; + inet_pton(AF_INET, get<0>(row).c_str(), &(ep_hip.sin_addr)); + found_neighbor.hip = ep_hip.sin_addr.s_addr; + + std::sscanf(get<1>(row).c_str(), "%02x:%02x:%02x:%02x:%02x:%02x", + &found_neighbor.mac[0], &found_neighbor.mac[1], &found_neighbor.mac[2], + &found_neighbor.mac[3], &found_neighbor.mac[4], &found_neighbor.mac[5]); + + std::sscanf(get<2>(row).c_str(), "%02x:%02x:%02x:%02x:%02x:%02x", + &found_neighbor.hmac[0], &found_neighbor.hmac[1], &found_neighbor.hmac[2], + &found_neighbor.hmac[3], &found_neighbor.hmac[4], &found_neighbor.hmac[5]); + +// printf("host_ip: %s, vpc_mac: %s, host_mac: %s\n", get<0>(row).c_str(), get<1>(row).c_str(), get<2>(row).c_str()); + } + return found_neighbor; } - return lkg_ver + 1; -} + endpoint_t GetNeighborInMemory(endpoint_key_t key) { + auto iterator = endpoint_cache.find(key); + if (iterator == endpoint_cache.end()) { + return { + .hip = 0, + }; + } + auto endpoint_value = iterator->second;//endpoint_cache[*key]; + return endpoint_value; + } +}; \ No newline at end of file diff --git a/include/grpc_client.h b/include/grpc_client.h index 5b59d6d..2413360 100644 --- a/include/grpc_client.h +++ b/include/grpc_client.h @@ -38,14 +38,18 @@ class ArionMasterWatcherImpl final : public Watch::Service { explicit ArionMasterWatcherImpl() {} - void RequestNeighborRules(ArionWingRequest *request, grpc::CompletionQueue *cq); + void RequestArionMaster(std::vector *request_vector, grpc::CompletionQueue *cq); void ConnectToArionMaster(); - void RunClient(std::string ip, std::string port, std::string group, std::string table); + void RunClient(std::string ip, std::string port, std::string group, std::string neighbor_table, std::string security_group_rules_table); bool a = chan_ == nullptr; + int fd_neighbor_ebpf_map = -1; + + int fd_security_group_ebpf_map = -1; + private: std::string server_address; @@ -55,18 +59,20 @@ class ArionMasterWatcherImpl final : public Watch::Service { std::string table_name_neighbor_ebpf_map; - int fd_neighbor_ebpf_map = -1; + std::string table_name_sg_ebpf_map; // key std::string is '-', value is inserted version of this neighbor folly::ConcurrentHashMap neighbor_task_map; + // key std::string is 'securitygroupid', value is inserted version of this security group rule + folly::ConcurrentHashMap security_group_rule_task_map; // segment lock for neighbor key version control SegmentLock segment_lock; }; struct AsyncClientCall { - arion::schema::NeighborRule reply; + arion::schema::ArionWingResponse reply; grpc::ClientContext context; grpc::Status status; - std::unique_ptr > stream; + std::unique_ptr > stream; }; diff --git a/include/util.h b/include/util.h index f68053f..9c72c90 100644 --- a/include/util.h +++ b/include/util.h @@ -17,6 +17,8 @@ #include #include +#include +using namespace std; // the number of characters needed to store the HEX form of IP address #define HEX_IP_BUFFER_SIZE 12 @@ -37,7 +39,7 @@ static inline long ip4tol(const string ip) { struct sockaddr_in sa; if (inet_pton(AF_INET, ip.c_str(), &(sa.sin_addr)) != 1) { - throw std::invalid_argument("Virtual ipv4 address is not in the expected format"); +// throw std::invalid_argument("Virtual ipv4 address is not in the expected format"); } return sa.sin_addr.s_addr; } @@ -49,4 +51,129 @@ static inline std::uint8_t getNum(char hexChar) { return (hexChar - 'A' + 10); } + +static inline __sum16 csum16_add(__sum16 csum, __be16 addend) +{ + uint16_t res = (uint16_t)csum; + + res += (__u16)addend; + return (__sum16)(res + (res < (__u16)addend)); +} + +static inline __sum16 csum16_sub(__sum16 csum, __be16 addend) +{ + return csum16_add(csum, ~addend); +} + +static inline void csum_replace2(__sum16 *sum, __be16 old, __be16 present) +{ + *sum = ~csum16_add(csum16_sub(~(*sum), old), present); +} + +static inline void trn_set_mac(void *dst, unsigned char *mac) +{ + unsigned short *d = static_cast(dst); + unsigned short *s = (unsigned short *)mac; + + d[0] = s[0]; + d[1] = s[1]; + d[2] = s[2]; +} + +static inline void trn_set_dst_mac(void *data, unsigned char *dst_mac) +{ + trn_set_mac(data, dst_mac); +} + +static inline void trn_set_src_mac(void *data, unsigned char *src_mac) +{ + uint8_t *tmp = static_cast(data); + trn_set_mac((void*)(tmp + 6), src_mac); +} + +static __be32 trn_get_vni(const __u8 *vni) +{ + /* Big endian! */ + return (vni[0] << 16) | (vni[1] << 8) | vni[2]; +} + +static inline void trn_set_src_ip(void *data, void *data_end, __u32 saddr) +{ + int off = offsetof(struct iphdr, saddr); + uint8_t *tmp = static_cast(data); + + __u32 *addr = (__u32*)(tmp + off); + if ((void *)addr > data_end) + return; + + *addr = saddr; +} + +static inline void trn_set_dst_ip(void *data, void *data_end, __u32 daddr) +{ + int off = offsetof(struct iphdr, daddr); + uint8_t *tmp = static_cast(data); + + __u32 *addr = (__u32 *)(tmp + off); + if ((void *)addr > data_end) + return; + + *addr = daddr; +} + +static inline __u16 trn_csum_fold_helper(__u64 csum) +{ + int i; +#pragma unroll + for (i = 0; i < 4; i++) { + if (csum >> 16) + csum = (csum & 0xffff) + (csum >> 16); + } + return ~csum; +} + +static inline void trn_ipv4_csum_inline(void *iph, __u64 *csum) +{ + __u16 *next_iph_u16 = (__u16 *)iph; +#pragma clang loop unroll(full) + for (int i = 0; i> 1; i++) { + *csum += *next_iph_u16++; + } + *csum = trn_csum_fold_helper(*csum); +} + +static inline void trn_set_src_dst_ip_csum(struct iphdr *ip, + __u32 saddr, __u32 daddr, void *data_end) +{ + /* Since the packet destination is being rewritten we also + decrement the TTL */ + ip->ttl--; + + __u64 csum = 0; + trn_set_src_ip(ip, data_end, saddr); + trn_set_dst_ip(ip, data_end, daddr); + csum = 0; + ip->check = 0; + trn_ipv4_csum_inline(ip, &csum); + ip->check = csum; + + // printf("Modified IP Address, src: 0x%x, dst: 0x%x, csum: 0x%x\n", + // ip->saddr, ip->daddr, ip->check); +} + +static inline void trn_swap_src_dst_mac(void *data) +{ + unsigned short *p = static_cast(data); + unsigned short tmp[3]; + + tmp[0] = p[0]; + tmp[1] = p[1]; + tmp[2] = p[2]; + p[0] = p[3]; + p[1] = p[4]; + p[2] = p[5]; + p[3] = tmp[0]; + p[4] = tmp[1]; + p[5] = tmp[2]; +} #endif diff --git a/include/xdp/trn_datamodel.h b/include/xdp/trn_datamodel.h index 7ab0321..1048e0d 100644 --- a/include/xdp/trn_datamodel.h +++ b/include/xdp/trn_datamodel.h @@ -29,6 +29,7 @@ #include #include #include +#include "bpf.h" #define __ALIGNED_64__ __attribute__((aligned(64))) #define __ALWAYS_INLINE__ __attribute__((__always_inline__)) @@ -147,6 +148,12 @@ typedef struct { unsigned char hmac[6]; } __attribute__((packed, aligned(4))) endpoint_t; +typedef struct { + bool action; // 0 or 1 + __u16 port_range[2]; // assume it supports only 1 range, such as [9000,9016] + __u16 remote_group; // remote group ID +} __attribute__((packed, aligned(4))) security_group_rule_t; + typedef struct { __u32 ip; // IP used for ZGC access __u16 announced; // non-zero indicates the MAC has been announced locally @@ -215,3 +222,81 @@ typedef struct { dp_encap_opdata_t encap; } opdata; } __attribute__((packed, aligned(8))) flow_ctx_t; + +// #if connTrack +struct ipv4_tuple_t { + __u32 saddr; + __u32 daddr; + + /* ports */ + __u16 sport; + __u16 dport; + + /* Addresses */ + __u8 protocol; + + /*TODO: include TCP flags, no use case for the moment! */ + +} __attribute__((packed)); + + +typedef struct { + __u32 vni; + struct ipv4_tuple_t tuple; +} __attribute__((packed)) contrack_key_t; + + +typedef struct { + __u32 hip; + unsigned char mac[6]; + unsigned char hmac[6]; +} __attribute__ ((packed, aligned(4))) contrack_t; + +// #endif + +// #if sgSupport + +typedef struct { + __u32 prefixlen; /* up to 32 for AF_INET, 128 for AF_INET6*/ + __u32 vni; + __u16 port; + __u8 direction; + __u8 protocol; + __u32 local_ip; + __u32 remote_ip; +} __attribute__((packed, aligned(4))) sg_cidr_key_t; + + +typedef struct { + __u32 sg_id; + __u8 action; +} __attribute__((packed, aligned(4))) sg_cidr_t; + + +typedef struct { + __u32 vni; + __u32 ip; + __u8 direction; +} __attribute__((packed, aligned(4))) security_group_key_t; + + +typedef struct { + __u32 sg_id; + __u8 action; +} __attribute__((packed, aligned(4))) security_group_t; + + +typedef struct { + __u32 vni; + __u32 ip; + __u8 direction; +} __attribute__((packed, aligned(4))) port_range_key_t; + +typedef struct { + __u16 port_min1; + __u16 port_max1; + __u16 port_min2; + __u16 port_max2; +} __attribute__((packed, aligned(4))) port_range_t; + +// #endif \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c7a69c3..70b5ba8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,14 +4,16 @@ set(SOURCES ./util/dispatch_queue.cpp ./util/segment_lock.cpp ./comm/grpc_client.cpp - ) + # db/db_client.cpp + comm/af_xdp_user_multi_thread.cpp ) +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -idirafter /usr/src/linux-headers-5.10.4/include/") #FIND_LIBRARY(LIBUUID_LIBRARIES uuid) #link_libraries(/usr/lib/x86_64-linux-gnu/libuuid.so) link_libraries(/usr/lib/x86_64-linux-gnu/libevent_pthreads.so) link_libraries(/usr/lib/x86_64-linux-gnu/libpthread.so) link_libraries(/var/local/git/marl/marl/build/libmarl.a) #this was built by machine-init.sh -link_libraries(/var/local/git/zeta/src/extern/libbpf/src/libbpf.a) #this was built by machine-init.sh +link_libraries(/var/local/git/zeta/src/extern/libbpf/src/build/libbpf.a) #this was built by machine-init.sh link_libraries(/usr/lib/x86_64-linux-gnu/libelf.a) link_libraries(/usr/local/lib/libfolly.a) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) @@ -20,7 +22,43 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/grpc) include_directories(/var/local/git/marl/marl/include) include_directories(/usr/local/include/folly) include_directories(/var/local/git/zeta/src/extern/libbpf/src) #libbpf.h +include_directories(/var/local/git/xdp-tutorial/common) include_directories(/usr/local/include/sqlite_orm) #sqlite_orm.h +include_directories(/usr/include/glog) +#include_directories(SYSTEM $ENV{SDKTARGETSYSROOT} /usr/src/linux-headers-5.10.4/include/) +#include_directories(SYSTEM $ENV{SDKTARGETSYSROOT} /usr/src/linux-headers-5.10.4/arch/x86/include) +#include_directories(SYSTEM $ENV{SDKTARGETSYSROOT} /usr/src/linux-headers-5.10.4/include/linux) # try to include vxlan.h in order to parse vxlan header + + +# Try to find the installed headers - Start +# Find the kernel release +execute_process( + COMMAND uname -r + OUTPUT_VARIABLE KERNEL_RELEASE + OUTPUT_STRIP_TRAILING_WHITESPACE +) + +# Find the headers +find_path(KERNELHEADERS_DIR + include/linux/user.h + PATHS /usr/src/linux-headers-${KERNEL_RELEASE} + ) + +message(STATUS "Kernel release: ${KERNEL_RELEASE}") +message(STATUS "Kernel headers: ${KERNELHEADERS_DIR}") + +if (KERNELHEADERS_DIR) + set(KERNELHEADERS_INCLUDE_DIRS + ${KERNELHEADERS_DIR}/include + ${KERNELHEADERS_DIR}/arch/x86/include + CACHE PATH "Kernel headers include dirs" + ) +# set(KERNELHEADERS_FOUND 1 CACHE STRING "Set to 1 if kernel headers were found") +#else (KERNELHEADERS_DIR) +# set(KERNELHEADERS_FOUND 0 CACHE STRING "Set to 1 if kernel headers were found") +endif (KERNELHEADERS_DIR) +# Try to find the installed headers - End + # Find Protobuf installation # Looks for protobuf-config.cmake file installed by Protobuf's cmake installation. @@ -46,6 +84,8 @@ find_package(GLog REQUIRED) find_package(fmt REQUIRED) add_library(ArionAgentLib STATIC ${SOURCES}) +target_include_directories(ArionAgentLib PUBLIC /var/local/git/xdp-tutorial/common) +target_link_directories(ArionAgentLib PUBLIC /var/local/git/xdp-tutorial/common) #target_link_libraries(ArionAgentLib event) target_link_libraries(ArionAgentLib ssl) target_link_libraries(ArionAgentLib crypto) diff --git a/src/comm/af_xdp_user_multi_thread.cpp b/src/comm/af_xdp_user_multi_thread.cpp new file mode 100644 index 0000000..6118d4f --- /dev/null +++ b/src/comm/af_xdp_user_multi_thread.cpp @@ -0,0 +1,1589 @@ +// +// Created by user on 12/16/22. +// + +// SPDX-License-Identifier: GPL-2.0 +/* Copyright(c) 2020 - 2022 Intel Corporation. */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +//#include +#include +#include + +//#include +#include +//#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "util.h" +#include "xdp/trn_datamodel.h" +#include +#include + + +#define VXL_DSTPORT 0xb512 // UDP dport 4789(0x12b5) for VxLAN overlay +#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0])) + +typedef __u64 u64; +typedef __u32 u32; +typedef __u16 u16; +typedef __u8 u8; + +struct arp_message { + uint16_t hrd; + uint16_t pro; + uint8_t hln; + uint8_t pln; + uint16_t op; + uint8_t sha[6]; + uint32_t spa; + uint8_t tha[6]; + uint32_t tpa; +} __attribute__((__packed__)); + +struct vxlanhdr_internal { + /* Big endian! */ + __u8 rsvd1 : 3; + __u8 i_flag : 1; + __u8 rsvd2 : 4; + __u8 rsvd3[3]; + __u8 vni[3]; + __u8 rsvd4; +}; + +/* This program illustrates the packet forwarding between multiple AF_XDP + * sockets in multi-threaded environment. All threads are sharing a common + * buffer pool, with each socket having its own private buffer cache. + * + * Example 1: Single thread handling two sockets. The packets received by socket + * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue + * QB), while the packets received by socket B are forwarded to socket A. The + * thread is running on CPU core X: + * + * ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X + * + * Example 2: Two threads, each handling two sockets. The thread running on CPU + * core X forwards all the packets received by socket A to socket B, and all the + * packets received by socket B to socket A. The thread running on CPU core Y is + * performing the same packet forwarding between sockets C and D: + * + * ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD + * -c CX -c CY + */ + +/* + * Buffer pool and buffer cache + * + * For packet forwarding, the packet buffers are typically allocated from the + * pool for packet reception and freed back to the pool for further reuse once + * the packet transmission is completed. + * + * The buffer pool is shared between multiple threads. In order to minimize the + * access latency to the shared buffer pool, each thread creates one (or + * several) buffer caches, which, unlike the buffer pool, are private to the + * thread that creates them and therefore cannot be shared with other threads. + * The access to the shared pool is only needed either (A) when the cache gets + * empty due to repeated buffer allocations and it needs to be replenished from + * the pool, or (B) when the cache gets full due to repeated buffer free and it + * needs to be flushed back to the pull. + * + * In a packet forwarding system, a packet received on any input port can + * potentially be transmitted on any output port, depending on the forwarding + * configuration. For AF_XDP sockets, for this to work with zero-copy of the + * packet buffers when, it is required that the buffer pool memory fits into the + * UMEM area shared by all the sockets. + */ + +struct bpool_params { + u32 n_buffers; + u32 buffer_size; + int mmap_flags; + + u32 n_users_max; + u32 n_buffers_per_slab; +}; + +/* This buffer pool implementation organizes the buffers into equally sized + * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the + * pool that are completely filled with buffer pointers (full slabs). + * + * Each buffer cache has a slab for buffer allocation and a slab for buffer + * free, with both of these slabs initially empty. When the cache's allocation + * slab goes empty, it is swapped with one of the available full slabs from the + * pool, if any is available. When the cache's free slab goes full, it is + * swapped for one of the empty slabs from the pool, which is guaranteed to + * succeed. + * + * Partially filled slabs never get traded between the cache and the pool + * (except when the cache itself is destroyed), which enables fast operation + * through pointer swapping. + */ +struct bpool { + struct bpool_params params; + pthread_mutex_t lock; + void *addr; + + u64 **slabs; + u64 **slabs_reserved; + u64 *buffers; + u64 *buffers_reserved; + + u64 n_slabs; + u64 n_slabs_reserved; + u64 n_buffers; + + u64 n_slabs_available; + u64 n_slabs_reserved_available; + + struct xsk_umem_config umem_cfg; + struct xsk_ring_prod umem_fq; + struct xsk_ring_cons umem_cq; + struct xsk_umem *umem; +}; + +static bool xsk_page_aligned(void *buffer) +{ + unsigned long addr = (unsigned long)buffer; + + return !(addr & (getpagesize() - 1)); +} + +static struct bpool * +bpool_init(struct bpool_params *params, + struct xsk_umem_config *umem_cfg) +{ + struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY}; + u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved; + u64 slabs_size, slabs_reserved_size; + u64 buffers_size, buffers_reserved_size; + u64 total_size, i; + struct bpool *bp; + u8 *p; + int status; + + /* mmap prep. */ + if (setrlimit(RLIMIT_MEMLOCK, &r)) + return NULL; + + /* bpool internals dimensioning. */ + n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) / + params->n_buffers_per_slab; + printf("bpool_init: n_slabs = %ld\n", n_slabs); + n_slabs_reserved = params->n_users_max * 2; + n_buffers = n_slabs * params->n_buffers_per_slab; + n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab; + + slabs_size = n_slabs * sizeof(u64 *); + slabs_reserved_size = n_slabs_reserved * sizeof(u64 *); + buffers_size = n_buffers * sizeof(u64); + buffers_reserved_size = n_buffers_reserved * sizeof(u64); + + total_size = sizeof(struct bpool) + + slabs_size + slabs_reserved_size + + buffers_size + buffers_reserved_size; + + /* bpool memory allocation. */ + p = static_cast(calloc(total_size, sizeof(u8))); + if (!p) + return NULL; + + /* bpool memory initialization. */ + bp = (struct bpool *)p; + memcpy(&bp->params, params, sizeof(*params)); + bp->params.n_buffers = n_buffers; + + bp->slabs = (u64 **)&p[sizeof(struct bpool)]; + bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) + + slabs_size]; + bp->buffers = (u64 *)&p[sizeof(struct bpool) + + slabs_size + slabs_reserved_size]; + bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) + + slabs_size + slabs_reserved_size + buffers_size]; + + bp->n_slabs = n_slabs; + bp->n_slabs_reserved = n_slabs_reserved; + bp->n_buffers = n_buffers; + + for (i = 0; i < n_slabs; i++) + bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab]; + bp->n_slabs_available = n_slabs; + + for (i = 0; i < n_slabs_reserved; i++) + bp->slabs_reserved[i] = &bp->buffers_reserved[i * + params->n_buffers_per_slab]; + bp->n_slabs_reserved_available = n_slabs_reserved; + + for (i = 0; i < n_buffers; i++) + bp->buffers[i] = i * params->buffer_size; + + /* lock. */ + status = pthread_mutex_init(&bp->lock, NULL); + if (status) { + free(p); + return NULL; + } + + /* mmap. */ + bp->addr = mmap(NULL, + n_buffers * params->buffer_size, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags, + -1, + 0); + if (bp->addr == MAP_FAILED) { + pthread_mutex_destroy(&bp->lock); + free(p); + return NULL; + } + + printf("xsk_umem__create: size: %ld, xsk_page_aligned: %b\n", + bp->params.n_buffers * bp->params.buffer_size, xsk_page_aligned(bp->addr)); + /* umem. */ + status = xsk_umem__create(&bp->umem, + bp->addr, + bp->params.n_buffers * bp->params.buffer_size, + &bp->umem_fq, + &bp->umem_cq, + umem_cfg); + if (status) { + printf("xsk_umem__create failed with status: %d\n", status); + munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size); + pthread_mutex_destroy(&bp->lock); + free(p); + return NULL; + } + memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg)); + + return bp; +} + +static void +bpool_free(struct bpool *bp) +{ + if (!bp) + return; + + xsk_umem__delete(bp->umem); + munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size); + pthread_mutex_destroy(&bp->lock); + free(bp); +} + +struct bcache { + struct bpool *bp; + + u64 *slab_cons; + u64 *slab_prod; + + u64 n_buffers_cons; + u64 n_buffers_prod; +}; + +static u32 +bcache_slab_size(struct bcache *bc) +{ + struct bpool *bp = bc->bp; + + return bp->params.n_buffers_per_slab; +} + +static struct bcache * +bcache_init(struct bpool *bp) +{ + struct bcache *bc; + + bc = static_cast(calloc(1, sizeof(struct bcache))); + if (!bc) + return NULL; + + bc->bp = bp; + bc->n_buffers_cons = 0; + bc->n_buffers_prod = 0; + + pthread_mutex_lock(&bp->lock); + if (bp->n_slabs_reserved_available == 0) { + pthread_mutex_unlock(&bp->lock); + free(bc); + return NULL; + } + + bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1]; + bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2]; + bp->n_slabs_reserved_available -= 2; + pthread_mutex_unlock(&bp->lock); + + return bc; +} + +static void +bcache_free(struct bcache *bc) +{ + struct bpool *bp; + + if (!bc) + return; + + /* In order to keep this example simple, the case of freeing any + * existing buffers from the cache back to the pool is ignored. + */ + + bp = bc->bp; + pthread_mutex_lock(&bp->lock); + bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod; + bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons; + bp->n_slabs_reserved_available += 2; + pthread_mutex_unlock(&bp->lock); + + free(bc); +} + +/* To work correctly, the implementation requires that the *n_buffers* input + * argument is never greater than the buffer pool's *n_buffers_per_slab*. This + * is typically the case, with one exception taking place when large number of + * buffers are allocated at init time (e.g. for the UMEM fill queue setup). + */ +static inline u32 +bcache_cons_check(struct bcache *bc, u32 n_buffers) +{ + struct bpool *bp = bc->bp; +// printf("n_buffers: %ld\nbp->params.n_buffers_per_slab: %ld\n", n_buffers, bp->params.n_buffers_per_slab); + u64 n_buffers_per_slab = bp->params.n_buffers_per_slab; + u64 n_buffers_cons = bc->n_buffers_cons; + u64 n_slabs_available; + u64 *slab_full; + + /* + * Consumer slab is not empty: Use what's available locally. Do not + * look for more buffers from the pool when the ask can only be + * partially satisfied. + */ + if (n_buffers_cons) + return (n_buffers_cons < n_buffers) ? + n_buffers_cons : + n_buffers; + + /* + * Consumer slab is empty: look to trade the current consumer slab + * (full) for a full slab from the pool, if any is available. + */ + pthread_mutex_lock(&bp->lock); +// printf("Locking bp\n"); + n_slabs_available = bp->n_slabs_available; + if (!n_slabs_available) { + printf("Unlocking bp because !n_slabs_available)\n"); + pthread_mutex_unlock(&bp->lock); + return 0; + } + + n_slabs_available--; + slab_full = bp->slabs[n_slabs_available]; + bp->slabs[n_slabs_available] = bc->slab_cons; + bp->n_slabs_available = n_slabs_available; +// printf("Unlocking bp because traded a slab from bpool\n"); + pthread_mutex_unlock(&bp->lock); + + bc->slab_cons = slab_full; + bc->n_buffers_cons = n_buffers_per_slab; +// printf("bc->n_buffers_cons = %ld\n", bc->n_buffers_cons); + return n_buffers; +} + +static inline u64 +bcache_cons(struct bcache *bc) +{ + u64 n_buffers_cons = bc->n_buffers_cons - 1; + u64 buffer; + + buffer = bc->slab_cons[n_buffers_cons]; + bc->n_buffers_cons = n_buffers_cons; + return buffer; +} + +static inline void +bcache_prod(struct bcache *bc, u64 buffer) +{ + struct bpool *bp = bc->bp; + u64 n_buffers_per_slab = bp->params.n_buffers_per_slab; + u64 n_buffers_prod = bc->n_buffers_prod; + u64 n_slabs_available; + u64 *slab_empty; + + /* + * Producer slab is not yet full: store the current buffer to it. + */ + if (n_buffers_prod < n_buffers_per_slab) { + bc->slab_prod[n_buffers_prod] = buffer; +// printf("bcache_prod: n_buffers_prod: %ld\nn_buffers_per_slab: %ld\n", n_buffers_prod, n_buffers_per_slab); + bc->n_buffers_prod = n_buffers_prod + 1; + return; + } + + /* + * Producer slab is full: trade the cache's current producer slab + * (full) for an empty slab from the pool, then store the current + * buffer to the new producer slab. As one full slab exists in the + * cache, it is guaranteed that there is at least one empty slab + * available in the pool. + */ + pthread_mutex_lock(&bp->lock); + n_slabs_available = bp->n_slabs_available; + slab_empty = bp->slabs[n_slabs_available]; + bp->slabs[n_slabs_available] = bc->slab_prod; +// printf("bcache_prod: bp->n_slabs_available = n_slabs_available + 1;\n"); + bp->n_slabs_available = n_slabs_available + 1; + pthread_mutex_unlock(&bp->lock); + + slab_empty[0] = buffer; + bc->slab_prod = slab_empty; + bc->n_buffers_prod = 1; +} + +/* + * Port + * + * Each of the forwarding ports sits on top of an AF_XDP socket. In order for + * packet forwarding to happen with no packet buffer copy, all the sockets need + * to share the same UMEM area, which is used as the buffer pool memory. + */ +#ifndef MAX_BURST_RX +#define MAX_BURST_RX 64 +#endif + +#ifndef MAX_BURST_TX +#define MAX_BURST_TX 64 +#endif + +struct burst_rx { + u64 addr[MAX_BURST_RX]; + u32 len[MAX_BURST_RX]; +}; + +struct burst_tx { + u64 addr[MAX_BURST_TX]; + u32 len[MAX_BURST_TX]; + u32 n_pkts; +}; + +struct port_params { + struct xsk_socket_config xsk_cfg; + struct bpool *bp; + const char *iface; + u32 iface_queue; +}; + +struct port { + struct port_params params; + + struct bcache *bc; + + struct xsk_ring_cons rxq; + struct xsk_ring_prod txq; + struct xsk_ring_prod umem_fq; + struct xsk_ring_cons umem_cq; + struct xsk_socket *xsk; + int umem_fq_initialized; + + u64 n_pkts_rx; + u64 n_pkts_tx; +}; + +static void +port_free(struct port *p) +{ + if (!p) + return; + + /* To keep this example simple, the code to free the buffers from the + * socket's receive and transmit queues, as well as from the UMEM fill + * and completion queues, is not included. + */ + + if (p->xsk) + xsk_socket__delete(p->xsk); + + bcache_free(p->bc); + + free(p); +} + +static struct port * +port_init(struct port_params *params) +{ + struct port *p; + u32 umem_fq_size, pos = 0; + int status, i; + + /* Memory allocation and initialization. */ + p = static_cast(calloc(sizeof(struct port), 1)); + if (!p) { + printf("port_init failed because memory allocation failed.\n"); + return NULL; + } + + memcpy(&p->params, params, sizeof(p->params)); + umem_fq_size = params->bp->umem_cfg.fill_size; + printf("port_init: umem_fq_size: %ld\n", umem_fq_size); + + /* bcache. */ + p->bc = bcache_init(params->bp); + if (!p->bc || + (bcache_slab_size(p->bc) < umem_fq_size) || + (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) { + port_free(p); + printf("port_init failed because bcache failed.\n(bcache_slab_size(p->bc) < umem_fq_size) : %s\n" + "(bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size) : %s", + ((bcache_slab_size(p->bc) < umem_fq_size) ? "true" : "false"), + ((bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size) ? "true" : "false") + ); + return NULL; + } + + /* xsk socket. */ + status = xsk_socket__create_shared(&p->xsk, + params->iface, + params->iface_queue, + params->bp->umem, + &p->rxq, + &p->txq, + &p->umem_fq, + &p->umem_cq, + ¶ms->xsk_cfg); + if (status) { + port_free(p); + printf("port_init failed because xsk_socket__create_shared failed. Status: %ld\n", status); + return NULL; + } + + /* umem fq. */ + xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos); + + for (i = 0; i < umem_fq_size; i++) + *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = + bcache_cons(p->bc); + + xsk_ring_prod__submit(&p->umem_fq, umem_fq_size); + p->umem_fq_initialized = 1; + printf("port init: queue: %d, n_buffers_cons: %ld, n_buffers_prod: %ld\n", + p->params.iface_queue, p->bc->n_buffers_cons, p->bc->n_buffers_prod); + return p; +} + +static inline u32 +port_rx_burst(struct port *p, struct burst_rx *b) +{ + u32 n_pkts, pos, i; + + /* Free buffers for FQ replenish. */ + n_pkts = ARRAY_SIZE(b->addr); +// if (p->bc->n_buffers_cons == 0) { +// printf("port_rx_burst: p->bc->n_buffers_cons == 0, need to trade slab from pool\n"); +// } + n_pkts = bcache_cons_check(p->bc, n_pkts); +// printf("Queue: %ld ons_check got %ld packets\n", p->params.iface_queue,n_pkts); + + if (!n_pkts) + return 0; + + /* RXQ. */ + n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos); +// printf("Queue: %ld RXQ got %ld packets\n", p->params.iface_queue,n_pkts); + + if (!n_pkts) { + if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) { + struct pollfd pollfd = { + .fd = xsk_socket__fd(p->xsk), + .events = POLLIN, + }; + + poll(&pollfd, 1, 0); + } + return 0; + } + + for (i = 0; i < n_pkts; i++) { + b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr; + b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len; + } + + xsk_ring_cons__release(&p->rxq, n_pkts); + p->n_pkts_rx += n_pkts; + + /* UMEM FQ. */ +// u64 counter = 0; + for ( ; ; ) { +// counter ++; + int status; + + status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos); + if (status == n_pkts) { +// printf("Queue: %ld Fill Queue got %ld packets, counter = %ld, breaking\n", counter, p->params.iface_queue,n_pkts); + break; + } + + if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) { + struct pollfd pollfd = { + .fd = xsk_socket__fd(p->xsk), + .events = POLLIN, + }; + + poll(&pollfd, 1, 0); +// printf("Queue: %ld Fill Queue poll for %ld packets, counter = %ld\n", p->params.iface_queue,n_pkts, counter); + } +// printf("Queue: %ld Fill Queue busy spinning, counter = %ld\n", p->params.iface_queue,n_pkts, counter); + } + + for (i = 0; i < n_pkts; i++) + *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = + bcache_cons(p->bc); + + xsk_ring_prod__submit(&p->umem_fq, n_pkts); +// printf("Queue: %ld rx burst got %ld packets\n", p->params.iface_queue,n_pkts); + return n_pkts; +} + +static inline void +port_tx_burst(struct port *p, struct burst_tx *b, struct port * p2) +{ + u32 n_pkts, pos, i; + int status; + + /* UMEM CQ. */ + n_pkts = p->params.bp->umem_cfg.comp_size; + + n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos); + + for (i = 0; i < n_pkts; i++) { + u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i); + + bcache_prod(p->bc, addr); + } + + xsk_ring_cons__release(&p->umem_cq, n_pkts); + + /* TXQ. */ + n_pkts = b->n_pkts; + +// u64 counter = 0; + for ( ; ; ) { +// counter ++; + status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos); + if (status == n_pkts) { +// printf("Queue: %ld TX Queue got %ld packets, counter = %ld, breaking\n", counter, p->params.iface_queue,n_pkts); + break; + } + + if (xsk_ring_prod__needs_wakeup(&p->txq)) { + sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, + NULL, 0); +// printf("Queue: %ld TX sendto %ld packets, counter = %ld\n", counter, p->params.iface_queue,n_pkts); + } +// printf("Queue: %ld TX busy spinning, counter = %ld\n", counter, p->params.iface_queue,n_pkts); + } + + for (i = 0; i < n_pkts; i++) { + xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i]; + xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i]; + } + + xsk_ring_prod__submit(&p->txq, n_pkts); + if (xsk_ring_prod__needs_wakeup(&p->txq)) + sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0); + p->n_pkts_tx += n_pkts; + if (p2->params.iface_queue != p->params.iface_queue) { + printf("TX Queue: %ld, RX Queue: %ld tx burst sent %ld packets\n", p2->params.iface_queue, p->params.iface_queue, n_pkts); + } +} + +/* + * Thread + * + * Packet forwarding threads. + */ +#ifndef MAX_PORTS_PER_THREAD +#define MAX_PORTS_PER_THREAD 16 +#endif + +struct thread_data { + struct port *ports_rx[MAX_PORTS_PER_THREAD]; + struct port *ports_tx[MAX_PORTS_PER_THREAD]; + u32 n_ports_rx; + struct burst_rx burst_rx; + struct burst_tx burst_tx[MAX_PORTS_PER_THREAD]; + u32 cpu_core_id; + int quit; +}; + +static void swap_mac_addresses(void *data) +{ + struct ether_header *eth = (struct ether_header *)data; + struct ether_addr *src_addr = (struct ether_addr *)ð->ether_shost; + struct ether_addr *dst_addr = (struct ether_addr *)ð->ether_dhost; + struct ether_addr tmp; + + tmp = *src_addr; + *src_addr = *dst_addr; + *dst_addr = tmp; +} + +static bool process_packet(void *pkt, uint32_t len/*,struct xsk_socket_info *xsk,*/ + /*uint64_t addr, , int* fd*/ + ) +{ +// printf(">>>>>>>>>> Begin processing packet >>>>>>>>>>\n"); + bpf_lpm_trie_key k; + if (true) { +// printf("Process packets: inside if (true)\n"); + /* + * TODO: Parse packet here, get VNI, IP, MAC, lookup locally in DB, and replace neigbor host IP if found; + * if NOT found, drop packet and remotely GET from Arion Master. + * */ + int ret; + uint32_t tx_idx = 0; + uint8_t tmp_mac[ETH_ALEN]; + // parse outer eth header + struct ethhdr *eth = (struct ethhdr *) pkt; + + if (ntohs(eth->h_proto) != ETH_P_IP) { +// printf("Process packets: returning false for this packet as it is NOT IP %u\n", ntohs(eth->h_proto)); + return false; + } +// printf("Packet length: %ld\n", len); +// printf("Outer eth src: %x:%x:%x:%x:%x:%x, dest: %x:%x:%x:%x:%x:%x; next proto: 0x%x\n" +// "eth size: %d\n", +// eth->h_source[0],eth->h_source[1],eth->h_source[2],eth->h_source[3],eth->h_source[4],eth->h_source[5], +// eth->h_dest[0],eth->h_dest[1],eth->h_dest[2],eth->h_dest[3],eth->h_dest[4],eth->h_dest[5], +// bpf_ntohs(eth->h_proto), +// sizeof(*eth)); + + // parse outer IP header + struct iphdr *ip = (struct iphdr *) (eth + 1/*sizeof(*eth)*/); + struct in_addr outer_ip_src; + outer_ip_src.s_addr = ip->saddr; + struct in_addr outer_ip_dest; + outer_ip_dest.s_addr = ip->daddr; +// printf("Outer ip src: %s,",inet_ntoa(outer_ip_src)); +// printf("ip dest: %s\n" +// "Outer ip ihl: %d, version: %d\n", +// inet_ntoa(outer_ip_dest), +// ip->ihl, ip->version); + + // parse UDP header + struct udphdr *udp = (struct udphdr *) (ip + 1/*sizeof(*ip)*/); +// printf("UDP dest: %d, UDP src: %d, == VXL_DSTPORT? %s\n", +// udp->dest, udp->source, (udp->dest==VXL_DSTPORT? "true" : "false")); + + // parse VXLAN header + struct vxlanhdr_internal* vxlan = (struct vxlanhdr_internal *)(udp + 1/*sizeof(*udp)*/); +// printf("VNI: %ld, \n",trn_get_vni(vxlan->vni)); + + // parse inner eth header + struct ethhdr *inner_eth = (struct ethhdr *)(vxlan + 1/*sizeof(*vxlan)*/); +// printf("inner eth src: %x:%x:%x:%x:%x:%x, dest: %x:%x:%x:%x:%x:%x; next proto: 0x%x\n", +// inner_eth->h_source[0],inner_eth->h_source[1],inner_eth->h_source[2],inner_eth->h_source[3],inner_eth->h_source[4],inner_eth->h_source[5], +// inner_eth->h_dest[0],inner_eth->h_dest[1],inner_eth->h_dest[2],inner_eth->h_dest[3],inner_eth->h_dest[4],inner_eth->h_dest[5], +// inner_eth->h_proto); + + if (ntohs(inner_eth->h_proto) == ETH_P_ARP) { + // parse inner arp header + arp_message *arp_msg = (struct arp_message *)(inner_eth + 1); +// struct in_addr arp_src_ip; +// arp_src_ip.s_addr = arp_msg->spa; + struct in_addr arp_dest_ip; + arp_dest_ip.s_addr = arp_msg->tpa; +// printf("arp op: %d\n", +// bpf_htons(arp_msg->op)); +// printf("arp source ip: %s, \n", +// inet_ntoa(arp_src_ip/*inner_arp_dest_ip*/) +// ); +// printf("arp dest ip: %s, \n", +// inet_ntoa(arp_dest_ip/*inner_arp_dest_ip*/) +// ); + endpoint_key_t epkey; + epkey.vni = trn_get_vni(vxlan->vni); + struct sockaddr_in ep_ip; + inet_pton(AF_INET, inet_ntoa(arp_dest_ip/*inner_arp_dest_ip*/), &(ep_ip.sin_addr)); + epkey.ip = ep_ip.sin_addr.s_addr; + auto ep_value = db_client::get_instance().GetNeighborInMemory(epkey); + // endpoint_t ep_value; + // ep_value = db_client::get_instance().GetNeighbor(trn_get_vni(vxlan->vni), inet_ntoa(arp_dest_ip)); + if (ep_value.hip != 0) { + // we now have key and value, can modify the packet and update the map now. + // int ebpf_rc = bpf_map_update_elem((*fd), &epkey, &ep_value, BPF_ANY); + // printf("AF_XDP: Inserted this neighbor into map: vip: %s, vni: %d, ebpf_rc: %d\n", + // inet_ntoa(arp_src_ip), trn_get_vni(vxlan->vni), 0); + + /* Modify pkt for inner ARP response */ + struct in_addr ep_ip_addr/*, ep_host_ip_addr*/; + ep_ip_addr.s_addr = epkey.ip; +// ep_host_ip_addr.s_addr = ep_value.hip; +// printf("Retrived this endpoint: HIP: %s ", inet_ntoa(ep_host_ip_addr)); +// printf("IP: %s, host_mac: %x:%x:%x:%x:%x:%x, mac: %x:%x:%x:%x:%x:%x\n", +// inet_ntoa(ep_ip_addr), +// ep_value.hmac[0],ep_value.hmac[1],ep_value.hmac[2],ep_value.hmac[3],ep_value.hmac[4],ep_value.hmac[5], +// ep_value.mac[0],ep_value.mac[1],ep_value.mac[2],ep_value.mac[3],ep_value.mac[4],ep_value.mac[5] +// ); + arp_msg->op = bpf_htons(ARPOP_REPLY); + trn_set_mac(arp_msg->tha, arp_msg->sha); + trn_set_mac(arp_msg->sha, ep_value.mac); + + __u32 tmp_ip = arp_msg->spa;//*sip; + arp_msg->spa = arp_msg->tpa;//*tip; + arp_msg->tpa = tmp_ip; + + /* Modify inner EitherHdr, pretend it's from target */ + trn_set_dst_mac(inner_eth, inner_eth->h_source); + trn_set_src_mac(inner_eth, ep_value.mac); + + /* Keep overlay header, swap outer IP header */ + trn_set_src_dst_ip_csum(ip, ip->daddr, ip->saddr, (eth + len)); + trn_swap_src_dst_mac(pkt); + + /* + * Packet modification finished, read packet content again, in order to verify the mod + * */ + +// struct ethhdr *eth = (struct ethhdr *) pkt; +// +// if (ntohs(eth->h_proto) != ETH_P_IP) { +//// printf("%s\n", "AFTER MOD: returning false for this packet as it is NOT IP"); +// return false; +// } +// printf("AFTER MOD: Packet length: %ld\n", len); +// printf("AFTER MOD: Outer eth src: %x:%x:%x:%x:%x:%x, dest: %x:%x:%x:%x:%x:%x; next proto: 0x%x\n" +// "eth size: %d\n", +// eth->h_source[0],eth->h_source[1],eth->h_source[2],eth->h_source[3],eth->h_source[4],eth->h_source[5], +// eth->h_dest[0],eth->h_dest[1],eth->h_dest[2],eth->h_dest[3],eth->h_dest[4],eth->h_dest[5], +// bpf_ntohs(eth->h_proto), +// sizeof(*eth)); +// +// // parse outer IP header +// struct iphdr *ip = (struct iphdr *) (eth + 1/*sizeof(*eth)*/); +// struct in_addr outer_ip_src; +// outer_ip_src.s_addr = ip->saddr; +// struct in_addr outer_ip_dest; +// outer_ip_dest.s_addr = ip->daddr; +// printf("AFTER MOD: Outer ip src: %s,", inet_ntoa(outer_ip_src)); +// printf("ip dest: %s\n" +// "AFTER MOD: Outer ip ihl: %d, version: %d\n", +// inet_ntoa(outer_ip_dest), +// ip->ihl, ip->version); +// +// // parse UDP header +// struct udphdr *udp = (struct udphdr *) (ip + 1/*sizeof(*ip)*/); +// printf("AFTER MOD: UDP dest: %d, UDP src: %d, == VXL_DSTPORT? %s\n", +// udp->dest, udp->source, (udp->dest==VXL_DSTPORT? "true" : "false")); +// +// // parse VXLAN header +// struct vxlanhdr_internal* vxlan = (struct vxlanhdr_internal *)(udp + 1/*sizeof(*udp)*/); +// printf("AFTER MOD: VNI: %ld, \n",trn_get_vni(vxlan->vni)); +// +// // parse inner eth header +// struct ethhdr *inner_eth = (struct ethhdr *)(vxlan + 1/*sizeof(*vxlan)*/); +// printf("AFTER MOD: inner eth src: %x:%x:%x:%x:%x:%x, dest: %x:%x:%x:%x:%x:%x; next proto: 0x%x\n", +// inner_eth->h_source[0],inner_eth->h_source[1],inner_eth->h_source[2],inner_eth->h_source[3],inner_eth->h_source[4],inner_eth->h_source[5], +// inner_eth->h_dest[0],inner_eth->h_dest[1],inner_eth->h_dest[2],inner_eth->h_dest[3],inner_eth->h_dest[4],inner_eth->h_dest[5], +// inner_eth->h_proto); +// +// // parse inner arp header +// arp_message *arp_msg = (struct arp_message *)(inner_eth + 1); +// struct in_addr arp_src_ip; +// arp_src_ip.s_addr = arp_msg->spa; +// struct in_addr arp_dest_ip; +// arp_dest_ip.s_addr = arp_msg->tpa; +// printf("AFTER MOD: arp op: %d\n", +// bpf_htons(arp_msg->op)); +// printf("AFTER MOD: arp source ip: %s, \n", +// inet_ntoa(arp_src_ip/*inner_arp_dest_ip*/) +// ); +// printf("AFTER MOD: arp dest ip: %s, \n", +// inet_ntoa(arp_dest_ip/*inner_arp_dest_ip*/) +// ); + /* Here we sent the packet out of the receive port. Note that + * we allocate one entry and schedule it. Your design would be + * faster if you do batch processing/transmission */ + +// printf("<<<<<<<<<< Finished processing packet <<<<<<<<<<\n"); + + return true; + } else { + printf("Can't find endpoint!\n"); + return false; + } + } + else if (ntohs(inner_eth->h_proto) == ETH_P_IP) { + // parse inner IP header + struct iphdr *inner_ip = (struct iphdr *)(inner_eth + 1 /*sizeof(*inner_eth)*/); + + + struct in_addr inner_ip_src; + inner_ip_src.s_addr = inner_ip->saddr; + struct in_addr inner_ip_dest; + inner_ip_dest.s_addr = inner_ip->daddr; +// printf("Inner IP src: %s\n", inet_ntoa(inner_ip_src)); +// printf("Inner IP dest: %s\n", inet_ntoa(inner_ip_dest)); + endpoint_key_t epkey; + epkey.vni = trn_get_vni(vxlan->vni); + struct sockaddr_in ep_ip, src_ip; + inet_pton(AF_INET, inet_ntoa(inner_ip_dest/*inner_arp_dest_ip*/), &(ep_ip.sin_addr)); + inet_pton(AF_INET, inet_ntoa(inner_ip_src), &(src_ip.sin_addr)); + epkey.ip = ep_ip.sin_addr.s_addr; + + /* + * TODO: implement SG Logic + * */ + sg_cidr_key_t sg_key; + sg_key.protocol = inner_ip->protocol; + sg_key.local_ip = ep_ip.sin_addr.s_addr; + sg_key.remote_ip = src_ip.sin_addr.s_addr; + sg_key.vni = epkey.vni; + sg_key.direction = 1; // how to express goingout/coming in? + if (sg_key.protocol == IPPROTO_TCP) { + struct tcphdr *inner_tcp = (struct tcphdr *)(inner_ip + 1); + sg_key.port = bpf_htons(inner_tcp->dest); + sg_key.prefixlen = 136; + // how about lpm_key.data? + } else if (sg_key.protocol == IPPROTO_UDP) { + struct udphdr *inner_udp = (struct udphdr *)(inner_ip + 1); + sg_key.port = bpf_htons(inner_udp->dest); + sg_key.prefixlen = 136; + // how about lpm_key.data? + } + + + + auto ep_value = db_client::get_instance().GetNeighborInMemory(epkey); + // endpoint_t ep_value; + // ep_value = db_client::get_instance().GetNeighbor(trn_get_vni(vxlan->vni), inet_ntoa(inner_ip_dest)); + + if (ep_value.hip != 0) { + // epkey.vni = trn_get_vni(vxlan->vni); + // struct sockaddr_in ep_ip; + // inet_pton(AF_INET, inet_ntoa(inner_ip_dest/*inner_arp_dest_ip*/), &(ep_ip.sin_addr)); + // epkey.ip = ep_ip.sin_addr.s_addr; + // we now have key and value, can modify the packet and update the map now. + // int ebpf_rc = bpf_map_update_elem((*fd), &epkey, &ep_value, BPF_ANY); + // printf("AF_XDP: Inserted this neighbor into map: vip: %s, vni: %d, ebpf_rc: %d\n", + // inet_ntoa(inner_ip_dest), trn_get_vni(vxlan->vni), 0); + +// struct in_addr ep_ip_addr, ep_host_ip_addr; +// ep_ip_addr.s_addr = epkey.ip; +// ep_host_ip_addr.s_addr = ep_value.hip; +// printf("Retrived this endpoint: HIP: %s,", inet_ntoa(ep_host_ip_addr)); +// printf("IP: %s, host_mac: %x:%x:%x:%x:%x:%x, mac: %x:%x:%x:%x:%x:%x\n", +// inet_ntoa(ep_ip_addr), +// ep_value.hmac[0],ep_value.hmac[1],ep_value.hmac[2],ep_value.hmac[3],ep_value.hmac[4],ep_value.hmac[5], +// ep_value.mac[0],ep_value.mac[1],ep_value.mac[2],ep_value.mac[3],ep_value.mac[4],ep_value.mac[5] +// ); + + /* Modify inner EitherHdr, pretend it's from target */ + trn_set_dst_mac(inner_eth, ep_value.mac); + + /* Keep overlay header, update outer header destinations */ + trn_set_src_dst_ip_csum(ip, ip->daddr, ep_value.hip, (eth + len)); + trn_set_src_mac(eth, eth->h_dest); + trn_set_dst_mac(eth, ep_value.hmac); + + /* + * Packet modification finished, read packet content again, in order to verify the mod + * */ + +// struct ethhdr *eth = (struct ethhdr *) pkt; +// +// if (ntohs(eth->h_proto) != ETH_P_IP) { +//// printf("%s\n", "AFTER MOD: returning false for this packet as it is NOT IP"); +// return false; +// } +// printf("AFTER MOD: Packet length: %ld\n", len); +// printf("AFTER MOD: Outer eth src: %x:%x:%x:%x:%x:%x, dest: %x:%x:%x:%x:%x:%x; next proto: 0x%x\n" +// "eth size: %d\n", +// eth->h_source[0],eth->h_source[1],eth->h_source[2],eth->h_source[3],eth->h_source[4],eth->h_source[5], +// eth->h_dest[0],eth->h_dest[1],eth->h_dest[2],eth->h_dest[3],eth->h_dest[4],eth->h_dest[5], +// bpf_ntohs(eth->h_proto), +// sizeof(*eth)); +// +//// parse outer IP header +// struct iphdr *ip = (struct iphdr *) (eth + 1/*sizeof(*eth)*/); +// struct in_addr outer_ip_src; +// outer_ip_src.s_addr = ip->saddr; +// struct in_addr outer_ip_dest; +// outer_ip_dest.s_addr = ip->daddr; +// printf("AFTER MOD: Outer ip src: %s", inet_ntoa(outer_ip_src)); +// printf("ip dest: %s\n" +// "AFTER MOD: Outer ip ihl: %d, version: %d\n", +// inet_ntoa(outer_ip_dest), +// ip->ihl, ip->version); +// +// // parse UDP header +// struct udphdr *udp = (struct udphdr *) (ip + 1/*sizeof(*ip)*/); +// printf("AFTER MOD: UDP dest: %d, UDP src: %d, == VXL_DSTPORT? %s\n", +// udp->dest, udp->source, (udp->dest==VXL_DSTPORT? "true" : "false")); +// +// // parse VXLAN header +// struct vxlanhdr_internal* vxlan = (struct vxlanhdr_internal *)(udp + 1/*sizeof(*udp)*/); +// printf("AFTER MOD: VNI: %ld, \n",trn_get_vni(vxlan->vni)); +// +// // parse inner eth header +// struct ethhdr *inner_eth = (struct ethhdr *)(vxlan + 1/*sizeof(*vxlan)*/); +// printf("AFTER MOD: inner eth src: %x:%x:%x:%x:%x:%x, dest: %x:%x:%x:%x:%x:%x; next proto: 0x%x\n", +// inner_eth->h_source[0],inner_eth->h_source[1],inner_eth->h_source[2],inner_eth->h_source[3],inner_eth->h_source[4],inner_eth->h_source[5], +// inner_eth->h_dest[0],inner_eth->h_dest[1],inner_eth->h_dest[2],inner_eth->h_dest[3],inner_eth->h_dest[4],inner_eth->h_dest[5], +// inner_eth->h_proto); +// +// // parse inner IP header +// struct iphdr *inner_ip = (struct iphdr *)(inner_eth + 1 /*sizeof(*inner_eth)*/); +// struct in_addr inner_ip_src, inner_ip_dest; +// inner_ip_src.s_addr = inner_ip->saddr; +// inner_ip_dest.s_addr = inner_ip->daddr; +// printf("AFTER MOD: Inner IP src: %s\n", inet_ntoa(inner_ip_src)); +// printf("AFTER MOD: Inner IP dest: %s\n", inet_ntoa(inner_ip_dest)); + /* Here we sent the packet out of the receive port. Note that + * we allocate one entry and schedule it. Your design would be + * faster if you do batch processing/transmission */ + +// printf("<<<<<<<<<< Finished processing packet <<<<<<<<<<\n"); + + return true; + } else { + printf("Can't find endpoint!\n"); + return false; + } + } + + printf("Neither ARP or IP, returning false.\n"); + return false; + } + printf("process packet: how is this false?\n"); + return false; +} + + + +static void * +run_af_xdp_socket(void *arg) +{ + struct thread_data *t = static_cast(arg); + cpu_set_t cpu_cores; + u32 i; + CPU_ZERO(&cpu_cores); + CPU_SET(t->cpu_core_id, &cpu_cores); + pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores); + for (int j = 0 ; j < t->n_ports_rx ; j ++) { + struct port * port_rx = t->ports_rx[j]; + printf("port: %ld, tx queue needs wake up: %ld, fill queue needs wake up :%ld\n", + port_rx->params.iface_queue, xsk_ring_prod__needs_wakeup(&port_rx->txq) , xsk_ring_prod__needs_wakeup(&port_rx->umem_fq)); + if (port_rx->bc->n_buffers_cons == 0) { + port_rx->bc->n_buffers_cons = 4096; + printf("Manually setting port %d n_buffer_cons to 4096\n", port_rx->params.iface_queue); + } + } + + + for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) { + struct port *port_rx = t->ports_rx[i]; + struct port *port_tx = t->ports_tx[i]; +// printf("Thread: %ld, port rx: %ld, port tx: %ld\n", +// t->cpu_core_id, port_rx->params.iface_queue, port_tx->params.iface_queue); + struct burst_rx *brx = &t->burst_rx; + struct burst_tx *btx = &t->burst_tx[i]; + u32 n_pkts, j; + + /* RX. */ + n_pkts = port_rx_burst(port_rx, brx); + if (!n_pkts) { +// printf("thead %ld got no packets in rx_burst, continue\n", t->cpu_core_id ); + continue; + } + + /* Process & TX. */ + for (j = 0; j < n_pkts; j++) { +// printf("Queue %ld getting the %ld th packet\n", port_rx->params.iface_queue, j); + u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]); + u8 *pkt = static_cast(xsk_umem__get_data(port_rx->params.bp->addr, addr)); +// printf("Queue %ld processing the %ld th packet\n", port_rx->params.iface_queue, j); + + process_packet(pkt, brx->len[j]); +// swap_mac_addresses(pkt); + + btx->addr[btx->n_pkts] = brx->addr[j]; + btx->len[btx->n_pkts] = brx->len[j]; + btx->n_pkts++; + + } + if (btx->n_pkts > 0/*== MAX_BURST_TX*/) { + port_tx_burst(port_tx, btx, port_rx); + btx->n_pkts = 0; + } + } + + return NULL; +} + +/* + * Process + */ +static const struct bpool_params bpool_params_default = { + .n_buffers = 192 /* this number should be set to 64 * (number_of_cores / 8)*/ * 1024, + .buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE, + .mmap_flags = 0, + + .n_users_max = 16/*24*/, + .n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2, +}; + +static const struct xsk_umem_config umem_cfg_default = { + .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, //* 2, + .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, + .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE, + .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM, + .flags = XDP_RING_NEED_WAKEUP, +}; + +static const struct port_params port_params_default = { + .xsk_cfg = { + .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, + .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, + .libbpf_flags = 0, //.libxdp_flags + .xdp_flags = XDP_FLAGS_DRV_MODE, + .bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY, + }, + + .bp = NULL, + .iface = NULL, + .iface_queue = 0, +}; + +#ifndef MAX_PORTS +#define MAX_PORTS 64 +#endif + +#ifndef MAX_THREADS +#define MAX_THREADS 64 +#endif + +static struct bpool_params bpool_params; +static struct xsk_umem_config umem_cfg; +static struct bpool *bp; + +static struct port_params port_params[MAX_PORTS]; +static struct port *ports[MAX_PORTS]; +static u64 n_pkts_rx[MAX_PORTS]; +static u64 n_pkts_tx[MAX_PORTS]; +static int n_ports; + +static pthread_t threads[MAX_THREADS]; +static struct thread_data thread_data[MAX_THREADS]; +static int n_threads; + +static void +print_usage(char *prog_name) +{ + const char *usage = + "Usage:\n" + "\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n" + "\n" + "-c CORE CPU core to run a packet forwarding thread\n" + " on. May be invoked multiple times.\n" + "\n" + "-b SIZE Number of buffers in the buffer pool shared\n" + " by all the forwarding threads. Default: %u.\n" + "\n" + "-i INTERFACE Network interface. Each (INTERFACE, QUEUE)\n" + " pair specifies one forwarding port. May be\n" + " invoked multiple times.\n" + "\n" + "-q QUEUE Network interface queue for RX and TX. Each\n" + " (INTERFACE, QUEUE) pair specified one\n" + " forwarding port. Default: %u. May be invoked\n" + " multiple times.\n" + "\n"; + printf(usage, + prog_name, + bpool_params_default.n_buffers, + port_params_default.iface_queue); +} + +static int +parse_args(int argc, char **argv) +{ + struct option lgopts[] = { + { NULL, 0, 0, 0 } + }; + int opt, option_index; + + /* Parse the input arguments. */ + for ( ; ;) { + opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index); + if (opt == EOF) + break; + + switch (opt) { + case 'b': + bpool_params.n_buffers = atoi(optarg); + break; + + case 'c': + if (n_threads == MAX_THREADS) { + printf("Max number of threads (%d) reached.\n", + MAX_THREADS); + return -1; + } + + thread_data[n_threads].cpu_core_id = atoi(optarg); + n_threads++; + break; + + case 'i': + if (n_ports == MAX_PORTS) { + printf("Max number of ports (%d) reached.\n", + MAX_PORTS); + return -1; + } + + port_params[n_ports].iface = optarg; + port_params[n_ports].iface_queue = 0; + n_ports++; + break; + + case 'q': + if (n_ports == 0) { + printf("No port specified for queue.\n"); + return -1; + } + port_params[n_ports - 1].iface_queue = atoi(optarg); + break; + + default: + printf("Illegal argument.\n"); + return -1; + } + } + + optind = 1; /* reset getopt lib */ + + /* Check the input arguments. */ + if (!n_ports) { + printf("No ports specified.\n"); + return -1; + } + + if (!n_threads) { + printf("No threads specified.\n"); + return -1; + } + + if (n_ports % n_threads) { + printf("Ports cannot be evenly distributed to threads.\n"); + return -1; + } + + return 0; +} + +static void +print_port(u32 port_id) +{ + struct port *port = ports[port_id]; + int option; + socklen_t option_length = sizeof(option); + getsockopt(xsk_socket__fd(port->xsk), SOL_XDP, XDP_OPTIONS, &option, (&option_length)); + printf("Port %u: interface = %s, queue = %u, zero_copy_enabled: %s\n", + port_id, port->params.iface, port->params.iface_queue, + ((option == XDP_OPTIONS_ZEROCOPY ? "true" : "false"))); +} + +static void +print_thread(u32 thread_id) +{ + struct thread_data *t = &thread_data[thread_id]; + u32 i; + + printf("Thread %u (CPU core %u): ", + thread_id, t->cpu_core_id); + + for (i = 0; i < t->n_ports_rx; i++) { + struct port *port_rx = t->ports_rx[i]; + struct port *port_tx = t->ports_tx[i]; + + printf("(%s, %u) -> (%s, %u), ", + port_rx->params.iface, + port_rx->params.iface_queue, + port_tx->params.iface, + port_tx->params.iface_queue); + } + + printf("\n"); +} + +static void +print_port_stats_separator(void) +{ + printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n", + "----", + "------------", + "-------------", + "------------", + "-------------"); +} + +static void +print_port_stats_header(void) +{ + print_port_stats_separator(); + printf("| %4s | %12s | %13s | %12s | %13s |\n", + "Port", + "RX packets", + "RX rate (pps)", + "TX packets", + "TX_rate (pps)"); + print_port_stats_separator(); +} + +static void +print_port_stats_trailer(void) +{ + print_port_stats_separator(); + printf("\n"); +} + +static void +print_port_stats(int port_id, u64 ns_diff) +{ + struct port *p = ports[port_id]; + double rx_pps, tx_pps; + + rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff; + tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff; + + printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n", + port_id, + p->n_pkts_rx, + rx_pps, + p->n_pkts_tx, + tx_pps); + + n_pkts_rx[port_id] = p->n_pkts_rx; + n_pkts_tx[port_id] = p->n_pkts_tx; +} + +static void +print_port_stats_all(u64 ns_diff) +{ + int i; + + print_port_stats_header(); + for (i = 0; i < n_ports; i++) + print_port_stats(i, ns_diff); + print_port_stats_trailer(); +} + +static int quit; + +static void +signal_handler(int sig) +{ + quit = 1; +} + +//static void remove_xdp_program(void) +//{ +// struct xdp_multiprog *mp; +// int i, err; +// +// for (i = 0 ; i < n_ports; i++) { +// mp = xdp_multiprog__get_from_ifindex(if_nametoindex(port_params[i].iface)); +// if (IS_ERR_OR_NULL(mp)) { +// printf("No XDP program loaded on %s\n", port_params[i].iface); +// continue; +// } +// +// err = xdp_multiprog__detach(mp); +// if (err) +// printf("Unable to detach XDP program: %s\n", strerror(-err)); +// } +//} + +void* af_xdp_user_multi_thread::run_af_xdp_multi_threaded(void* args/*int argc, char **argv*/) +{ + struct timespec time; + u64 ns0; + int i; + + std::string table_name_neighbor_ebpf_map = "/sys/fs/bpf/endpoints_map"; + int fd_neighbor_ebpf_map = bpf_obj_get(table_name_neighbor_ebpf_map.c_str()); + + std::string table_name_sg_ebpf_map = "/sys/fs/bpf/security_group_map"; + int fd_security_group_ebpf_map = bpf_obj_get(table_name_sg_ebpf_map.c_str()); + + printf("endpoints map fd: %ld, sg map fd: %ld\n", fd_neighbor_ebpf_map, fd_security_group_ebpf_map); + + + if (fd_neighbor_ebpf_map <= 0 || fd_security_group_ebpf_map <= 0 ) { + printf("fd_neighbor_ebpf_map: %ld, fd_security_group_ebpf_map: %ld, exiting\n" + , fd_neighbor_ebpf_map, fd_security_group_ebpf_map); +// exit(-1); + } + + /* Parse args. */ + memcpy(&bpool_params, &bpool_params_default, + sizeof(struct bpool_params)); + memcpy(&umem_cfg, &umem_cfg_default, + sizeof(struct xsk_umem_config)); +// umem_cfg.flags |= (XDP_RING_NEED_WAKEUP/*XDP_USE_NEED_WAKEUP*/ ); + for (i = 0; i < MAX_PORTS; i++) + memcpy(&port_params[i], &port_params_default, + sizeof(struct port_params)); + +// if (parse_args(argc, argv)) { +// print_usage(argv[0]); +// return -1; +// } + auto number_of_cores = std::thread::hardware_concurrency(); + printf("This machine has %ld cores\n", number_of_cores); + // leave 8 cores for the rest of the system. + n_ports = number_of_cores > 8 ? (number_of_cores - 8) : 0; + + if (n_ports == 0) { + printf("This machine has too little number of cores(%ld), not good for AF_XDP. Exiting\n", number_of_cores); + exit(-1); + } + + printf("After leaving 8 cores for other applications, we are now setting the interface to have %ld AF_XDP sockets.\n", n_ports); + + /* + * TODO: Make NIC name configurable. + * */ + string set_nic_queue_command_template = "ethtool -L enp4s0f1 combined %ld"; + char set_nic_queue_command[100]; + sprintf(set_nic_queue_command, "ethtool -L enp4s0f1 combined %ld", n_ports); + printf("Executing system command: %s\n", set_nic_queue_command); + int set_nic_queue_command_rc = system(set_nic_queue_command); + + if (set_nic_queue_command_rc!=EXIT_SUCCESS) { + printf("set nic queue command failed(%ld)! Exiting\n", set_nic_queue_command_rc); + exit(-1); + } + + // using 1 thread per iface + iface_queue + n_threads = n_ports; // get number of cores of this machine. + + for ( int i = 0 ; i < n_ports ; i ++) { + port_params[i].iface = "enp4s0f1"; + port_params[i].iface_queue = i; + thread_data[i].cpu_core_id = i; + } + + + /* Buffer pool initialization. */ + bp = bpool_init(&bpool_params, &umem_cfg); + if (!bp) { + printf("Buffer pool initialization failed.\n"); + return args; + } + printf("Buffer pool created successfully.\n"); + + /* Ports initialization. */ + for (i = 0; i < MAX_PORTS; i++) + port_params[i].bp = bp; + + for (i = 0; i < n_ports; i++) { + ports[i] = port_init(&port_params[i]); + if (!ports[i]) { + printf("Port %d initialization failed.\n", i); + return args; + } + print_port(i); + } + printf("All ports created successfully.\n"); + + /* Threads. */ + for (i = 0; i < n_threads; i++) { + struct thread_data *t = &thread_data[i]; + u32 n_ports_per_thread = n_ports / n_threads, j; + + for (j = 0; j < n_ports_per_thread; j++) { + t->ports_rx[j] = ports[i * n_ports_per_thread + j]; + t->ports_tx[j] = ports[i * n_ports_per_thread + + (j + 1) % n_ports_per_thread]; +// printf("Thread: %ld has rx port: %ld, tx port: %ld\n", +// i, t->ports_rx[j]->params.iface_queue, t->ports_tx[j]->params.iface_queue); + } + + t->n_ports_rx = n_ports_per_thread; + + print_thread(i); + } + + for (i = 0; i < n_threads; i++) { + int status; + + status = pthread_create(&threads[i], + NULL, + run_af_xdp_socket, + &thread_data[i]); + if (status) { + printf("Thread %d creation failed.\n", i); + return args; + } + } + printf("All threads created successfully.\n"); + + /* Print statistics. */ + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + signal(SIGABRT, signal_handler); + + clock_gettime(CLOCK_MONOTONIC, &time); + ns0 = time.tv_sec * 1000000000UL + time.tv_nsec; + for ( ; !quit; ) { + u64 ns1, ns_diff; + + sleep(10); + clock_gettime(CLOCK_MONOTONIC, &time); + ns1 = time.tv_sec * 1000000000UL + time.tv_nsec; + ns_diff = ns1 - ns0; + ns0 = ns1; + + print_port_stats_all(ns_diff); + } + + /* Threads completion. */ + printf("Quit.\n"); + for (i = 0; i < n_threads; i++) + thread_data[i].quit = 1; + + for (i = 0; i < n_threads; i++) + pthread_join(threads[i], NULL); + + for (i = 0; i < n_ports; i++) + port_free(ports[i]); + + bpool_free(bp); + +// remove_xdp_program(); + + return args; +} diff --git a/src/comm/grpc_client.cpp b/src/comm/grpc_client.cpp index 79c94a4..c1e6beb 100644 --- a/src/comm/grpc_client.cpp +++ b/src/comm/grpc_client.cpp @@ -37,18 +37,27 @@ #include "arionmaster.grpc.pb.h" #include "db_client.h" #include "grpc_client.h" -#include "xdp/trn_datamodel.h" +//#include "xdp/trn_datamodel.h" using namespace arion::schema; -void ArionMasterWatcherImpl::RequestNeighborRules(ArionWingRequest *request, +void ArionMasterWatcherImpl::RequestArionMaster(vector *request_vector, grpc::CompletionQueue *cq) { - grpc::ClientContext ctx; - arion::schema::NeighborRule reply; +// grpc::ClientContext ctx; +// arion::schema::NeighborRule reply; // prepared statements for better performance of db writing in completion queue - auto add_or_update_neighbor_db_stmt = local_db.prepare(replace(Neighbor{ 0, "", "", "", "", 0 })); - auto add_programmed_version_db_stmt = local_db.prepare(insert(ProgrammingState{ 0 })); + + // neighbor state, has ebpf map and local db table + auto add_or_update_neighbor_db_stmt = db_client::get_instance().local_db.prepare(replace(Neighbor{ 0, "", "", "", "", 0 })); + auto add_programmed_neighbor_version_db_stmt = db_client::get_instance().local_db.prepare(insert(NeighborProgrammingState{ 0 })); + + // security group rules, has local db, but NOT ebpf map + auto add_or_update_security_group_rule_db_stmt = db_client::get_instance().local_db.prepare(replace(::SecurityGroupRule{ "", "", "", "", "", 0, 0, "", 0, 0 })); + + // security group port binding, has local db, needs to query security group rules to insert into eBPF map. + auto add_or_update_security_group_port_binding_stmt = db_client::get_instance().local_db.prepare(replace(::SecurityGroupPortBinding{"", ""})); + auto add_programmed_security_group_port_binding_version_db_stmt = db_client::get_instance().local_db.prepare(insert(SecurityGroupPortBindingProgrammingState{ 0 })); // check current grpc channel state, try to connect if needed grpc_connectivity_state current_state = chan_->GetState(true); @@ -66,6 +75,7 @@ void ArionMasterWatcherImpl::RequestNeighborRules(ArionWingRequest *request, int tag_watch = 1; printf("Completion queue: initial task, async watch\n"); +// stub_->AsyncWatch(&call->context, cq, (void*) tag_watch); call->stream = stub_->AsyncWatch(&call->context, cq, (void*)tag_watch); // start time @@ -73,153 +83,673 @@ void ArionMasterWatcherImpl::RequestNeighborRules(ArionWingRequest *request, std::atomic i(tag_watch + 1); bool write_done = false; + int current_write_request_index = 0; while (cq->Next(&got_tag, &ok)) { + printf("Read one from grpc stream\n"); if (ok) { + /* + * TODO: Associate call with the tag_watch, so that the write_done can be got rid of. + * + * */ if (!write_done) { printf("Completion queue: initial task response received\n"); printf("Completion queue: write async watch ArionWingRequest of [group, revision] to stream\n"); - call->stream->Write(*request, (void*)tag_watch); + call->stream->Write(*(request_vector->at(current_write_request_index)), (void*)tag_watch); + printf("Just wrote request with rev: [%ld], map: [%s] and group id: [%s]\n", + (request_vector->at(current_write_request_index))->rev(), + (request_vector->at(current_write_request_index))->map().c_str(), + (request_vector->at(current_write_request_index)->group().c_str()) + ); + current_write_request_index ++; + if (current_write_request_index == request_vector->size()) { + write_done = true; + } + /* + for (auto &request : *request_vector) { + call->stream->Write(*request, (void*)tag_watch); + printf("Just wrote request with rev: [%ld], map: [%s] and group id: [%s]\n", + request->rev(), request->map().c_str(), request->group().c_str() + ); + } write_done = true; + */ } else { call->stream->Read(&call->reply, got_tag); - auto vni = call->reply.tunnel_id(); - auto vpc_ip = call->reply.ip(); - auto vpc_mac = call->reply.mac(); - auto host_ip = call->reply.hostip(); - auto host_mac = call->reply.hostmac(); - auto ver = call->reply.version(); - int fd = fd_neighbor_ebpf_map; - - // non-empty rule - if ("" != vpc_ip) { - marl::schedule([this, &i, vni, vpc_ip, vpc_mac, host_ip, host_mac, ver, fd, - &add_or_update_neighbor_db_stmt, &add_programmed_version_db_stmt] { - // step #1 - check and store as in concurrent hash map - std::string neighbor_key = std::to_string(vni) + "-" + vpc_ip; - - // ebpf programming metadata - endpoint_key_t epkey; - epkey.vni = vni; - struct sockaddr_in ep_ip; - inet_pton(AF_INET, vpc_ip.c_str(), &(ep_ip.sin_addr)); - epkey.ip = ep_ip.sin_addr.s_addr; - - endpoint_t ep; - struct sockaddr_in ep_hip; - inet_pton(AF_INET, host_ip.c_str(), &(ep_hip.sin_addr)); - ep.hip = ep_hip.sin_addr.s_addr; - - std::sscanf(vpc_mac.c_str(), "%02x:%02x:%02x:%02x:%02x:%02x", - &ep.mac[0], &ep.mac[1], &ep.mac[2], - &ep.mac[3], &ep.mac[4], &ep.mac[5]); - - std::sscanf(host_mac.c_str(), "%02x:%02x:%02x:%02x:%02x:%02x", - &ep.hmac[0], &ep.hmac[1], &ep.hmac[2], - &ep.hmac[3], &ep.hmac[4], &ep.hmac[5]); - - bool ebpf_ignored = false; - bool map_updated = false; - int update_ct = 0, max_update_ct = 5; - int ebpf_rc = -1; - - while (!map_updated && (update_ct < max_update_ct)) { - // lock transaction section - // segment lock allows some level of concurrent manipulations of concurrent version map - // as long as the multi-threading version updates' keys are not hashed to the same slot in segment array - segment_lock.lock(neighbor_key); - - auto neighbor_pos = neighbor_task_map.find(neighbor_key); - if (neighbor_pos == neighbor_task_map.end()) { - // key not found, try insert. The function returns successful only when key not exists when inserting - auto res_insert = - neighbor_task_map.insert(neighbor_key, ver); - if (res_insert.second) { - // means successfully inserted, done with update - map_updated = true; - - // step #2 - sync syscall ebpf map programming with return code - ebpf_rc = bpf_map_update_elem(fd, &epkey, &ep, BPF_ANY); - if (ebpf_rc < 0) { - // safely rollback - // rollback version, for insertion case let's revert it to 0 - neighbor_task_map.assign(neighbor_key, 0); - - // rollback map status - map_updated = false; - } - } // 'else' means another thread already inserted before me, then it's not an insert case and next time in the loop will go to case of update - } else { - // key found, means multi neighbor versions might update at the same time - int cur_ver = neighbor_pos->second; - - if (ver > cur_ver) { - // only update neighbor version - // 1. when received (from ArionMaster) neighbor version is greater than current version in map - // 2. and only if the element to update is the original element (version in 'find') - if (neighbor_task_map.assign(neighbor_key, ver)) { + if (call->reply.has_neighbor_rule()) { + auto vni = call->reply.neighbor_rule().tunnel_id(); + auto vpc_ip = call->reply.neighbor_rule().ip(); + auto vpc_mac = call->reply.neighbor_rule().mac(); + auto host_ip = call->reply.neighbor_rule().hostip(); + auto host_mac = call->reply.neighbor_rule().hostmac(); + auto ver = call->reply.neighbor_rule().version(); + int fd = fd_neighbor_ebpf_map; + + // non-empty rule + if ("" != vpc_ip) { + marl::schedule([this, &i, vni, vpc_ip, vpc_mac, host_ip, host_mac, ver, fd, + &add_or_update_neighbor_db_stmt, &add_programmed_neighbor_version_db_stmt] { + // step #1 - check and store as in concurrent hash map + std::string neighbor_key = std::to_string(vni) + "-" + vpc_ip; + + endpoint_key_t epkey; + epkey.vni = vni; + struct sockaddr_in ep_ip; + inet_pton(AF_INET, vpc_ip.c_str(), &(ep_ip.sin_addr)); + epkey.ip = ep_ip.sin_addr.s_addr; + printf("Filled in ep.ip\n"); + endpoint_t ep; + struct sockaddr_in ep_hip; + inet_pton(AF_INET, host_ip.c_str(), &(ep_hip.sin_addr)); + ep.hip = ep_hip.sin_addr.s_addr; + printf("Filled in ep.hip\n"); + + std::sscanf(vpc_mac.c_str(), "%02x:%02x:%02x:%02x:%02x:%02x", + &ep.mac[0], &ep.mac[1], &ep.mac[2], + &ep.mac[3], &ep.mac[4], &ep.mac[5]); + printf("Filled in ep.mac\n"); + + std::sscanf(host_mac.c_str(), "%02x:%02x:%02x:%02x:%02x:%02x", + &ep.hmac[0], &ep.hmac[1], &ep.hmac[2], + &ep.hmac[3], &ep.hmac[4], &ep.hmac[5]); + printf("Filled in ep.hmac\n"); + + printf("vpc_ip is NOT empty: [%s]\n", vpc_ip.c_str()); + bool ebpf_ignored = false; + bool map_updated = false; + int update_ct = 0, max_update_ct = 5; + int ebpf_rc = -1; + + while (!map_updated && (update_ct < max_update_ct)) { + // lock transaction section + // segment lock allows some level of concurrent manipulations of concurrent version map + // as long as the multi-threading version updates' keys are not hashed to the same slot in segment array + segment_lock.lock(neighbor_key); + printf("Inside while loop, map_updated = [%b], update_ct = [%ld], max_update_ct = [%ld]\n", + map_updated, update_ct, max_update_ct); + auto neighbor_pos = neighbor_task_map.find(neighbor_key); + if (neighbor_pos == neighbor_task_map.end()) { + // key not found, try insert. The function returns successful only when key not exists when inserting + auto res_insert = + neighbor_task_map.insert(neighbor_key, ver); + if (res_insert.second) { + // means successfully inserted, done with update map_updated = true; + printf("Found neighbor key in neighbor_task_map\n"); // step #2 - sync syscall ebpf map programming with return code ebpf_rc = bpf_map_update_elem(fd, &epkey, &ep, BPF_ANY); if (ebpf_rc < 0) { // safely rollback - // rollback version - neighbor_task_map.assign(neighbor_key, cur_ver); + // rollback version, for insertion case let's revert it to 0 + neighbor_task_map.assign(neighbor_key, 0); + + // rollback map status + map_updated = false; + }else { + // if ebpf map programming succeeded, also put in local in memory cache + db_client::get_instance().endpoint_cache[epkey] = ep; + } + + } // 'else' means another thread already inserted before me, then it's not an insert case and next time in the loop will go to case of update + } else { + printf("Didn't find neighbor key in neighbor_task_map\n"); + // key found, means multi neighbor versions might update at the same time + int cur_ver = neighbor_pos->second; + + if (ver > cur_ver) { + // only update neighbor version + // 1. when received (from ArionMaster) neighbor version is greater than current version in map + // 2. and only if the element to update is the original element (version in 'find') + if (neighbor_task_map.assign(neighbor_key, ver)) { + map_updated = true; + + // step #2 - sync syscall ebpf map programming with return code + ebpf_rc = bpf_map_update_elem(fd, &epkey, &ep, BPF_ANY); + if (ebpf_rc < 0) { + // safely rollback + // rollback version, for insertion case let's revert it to 0 + neighbor_task_map.assign(neighbor_key, cur_ver); + + // rollback map status + map_updated = false; + }else { + // if ebpf map programming succeeded, also put in local in memory cache + db_client::get_instance().endpoint_cache[epkey] = ep; + } + } + } else { + // otherwise + // ignore: + // 1. update concurrent hash map + // 2. update ebpf map to not overwrite new data with out dated data + // 3. update local db table 1 (table 1 is for local lookup) since it is an old version + // update: journal table (since this skipped version is treated as programming succeeded) + ebpf_ignored = true; + map_updated = true; + } + } + + update_ct++; + // exit transaction section + segment_lock.unlock(neighbor_key); + } + + if (map_updated) { + if (!ebpf_ignored) { + printf("GPPC: Inserted this neighbor into map: vip: %s, vni: %d\n", vpc_ip.c_str(), vni); + // step #3 - async call to write/update to local db table 1 + db_client::get_instance().local_db_writer_queue.dispatch([vni, vpc_ip, host_ip, vpc_mac, host_mac, ver, &add_or_update_neighbor_db_stmt] { + get<0>(add_or_update_neighbor_db_stmt) = { vni, vpc_ip, host_ip, vpc_mac, host_mac, ver }; + db_client::get_instance().local_db.execute(add_or_update_neighbor_db_stmt); + }); + printf("Dispatched local db neighbor insert\n"); + // step #4 (case 1) - when ebpf programming not ignored, write to table 2 (programming journal) when programming succeeded + if (0 == ebpf_rc) { + db_client::get_instance().local_db_writer_queue.dispatch([ver, &add_programmed_neighbor_version_db_stmt] { + get<0>(add_programmed_neighbor_version_db_stmt) = { ver }; + db_client::get_instance().local_db.execute( + add_programmed_neighbor_version_db_stmt); + }); + } + printf("Dispatched local db neighbor journal insert\n"); + } else { + printf("ebpf_ignored = true\n"); + // step #4 (case 2) - always write to local db table 2 (programming journal) when version intended ignored (no need to program older version) + db_client::get_instance().local_db_writer_queue.dispatch([ver, &add_programmed_neighbor_version_db_stmt] { + get<0>(add_programmed_neighbor_version_db_stmt) = { ver }; + db_client::get_instance().local_db.execute( + add_programmed_neighbor_version_db_stmt); + }); + } + } else { + printf("Failed to update neighbor %d %s in map, skipping it\n", vni, vpc_ip.c_str()); + } + + i++; + }); + } else { + printf("vpc_ip is empty\n"); + } + } else if (call->reply.has_securitygrouprule()) { + // only write security group rule to local DB, the actually ebpf map insert will + // happen when a port binding message is sent down. We use the port_id (vni-vpc_ip) + // for the SG rule's local IP, and use the securitygroupid to lookup the rest of info + // for sg_cidr_key_t and sg_cidr_t + auto security_group_id = call->reply.securitygrouprule().securitygroupid(); + auto remote_group_id = call->reply.securitygrouprule().remotegroupid(); + auto direction = call->reply.securitygrouprule().direction(); + auto remote_ip_prefix = call->reply.securitygrouprule().remoteipprefix(); + auto protocol = call->reply.securitygrouprule().protocol(); + auto port_range_max = call->reply.securitygrouprule().portrangemax(); + auto port_range_min = call->reply.securitygrouprule().portrangemin(); + auto ether_type = call->reply.securitygrouprule().ethertype(); + auto vni = call->reply.securitygrouprule().vni(); + auto version = call->reply.securitygrouprule().version(); + int fd = fd_security_group_ebpf_map; + + // non-empty rule + if ("" != security_group_id) { + marl::schedule([this, &i, security_group_id, version, fd, vni, remote_ip_prefix, direction, protocol, port_range_min, + remote_group_id, port_range_max, ether_type, &add_or_update_security_group_rule_db_stmt, &add_programmed_security_group_port_binding_version_db_stmt] { + // step #1 - check and store as in concurrent hash map + + bool ebpf_ignored = false; + bool map_updated = false; + int update_ct = 0, max_update_ct = 5; + + while (!map_updated && (update_ct < max_update_ct)) { + // lock transaction section + // segment lock allows some level of concurrent manipulations of concurrent version map + // as long as the multi-threading version updates' keys are not hashed to the same slot in segment array + segment_lock.lock(security_group_id); + printf("Inside while loop, map_updated = [%b], update_ct = [%ld], max_update_ct = [%ld]\n", + map_updated, update_ct, max_update_ct); + auto sg_pos = security_group_rule_task_map.find(security_group_id); + if (sg_pos == security_group_rule_task_map.end()) { + // key not found, try insert. The function returns successful only when key not exists when inserting + auto res_insert = + security_group_rule_task_map.insert(security_group_id, version); + if (res_insert.second) { + // means successfully inserted, done with update + map_updated = true; + + // step 1.5 get all related security group rules. + auto rows = db_client::get_instance().local_db.get_all<::SecurityGroupPortBinding>( + where( + c(&::SecurityGroupPortBinding::security_group_id) == security_group_id.c_str() + ) + ); + // printf("Retrieved %ld rows of security group rules with security group id == [%s]\n", rows.size(), security_group_id.c_str()); + int ebpf_rc = 0; + printf("Found %ld sg rules related to this ID: %s\n", rows.size(), security_group_id.c_str()); + for (auto &binding : rows) { + // step #2 - sync syscall ebpf map programming with return code + std::string delimiter = "-"; //because port_id is in the format of "vni-vpc_id" + std::string vpc_ip = binding.port_id.substr(binding.port_id.find(delimiter) + 1); + string remote_ip; + int prefixlen = 0 ; + remote_ip = remote_ip_prefix.substr(0, remote_ip_prefix.find("/")); + prefixlen = atoi((remote_ip_prefix.substr(remote_ip_prefix.find("/") + 1).c_str())); + + struct sockaddr_in local_ip_sock, remote_ip_sock; + inet_pton(AF_INET, vpc_ip.c_str(), &(local_ip_sock.sin_addr)); + inet_pton(AF_INET, remote_ip.c_str(), &(remote_ip_sock.sin_addr)); + sg_cidr_key_t sg_key; + sg_key.vni = vni; + sg_key.prefixlen = prefixlen + 96; // 96 = ( __u32 vni; + __u16 port; + __u8 direction; + __u8 protocol; + __u32 local_ip; ) + sg_key.remote_ip = remote_ip_sock.sin_addr.s_addr; + sg_key.local_ip = local_ip_sock.sin_addr.s_addr; + sg_key.direction = direction == "egress" ? 0 : 1; // going out is 0 and coming in is 1 + + if (protocol == "TCP") { + sg_key.protocol = IPPROTO_TCP; + } else if (protocol == "UDP") { + sg_key.protocol = IPPROTO_UDP; + } else { + sg_key.protocol = IPPROTO_NONE; + } + + sg_key.port = port_range_min; //TODO: see if we should use this or other fields + + sg_cidr_t sg_value; + sg_value.sg_id = 1; + sg_value.action = 1; // 1 for allow and other values for drop + printf("Inserting sg rule with prefixlen: %ld VNI: %s, port: %ld, direction: %s, protocol: %s, local_ip: %s, remote_ip: %s\n", + prefixlen, vni, port_range_min, direction.c_str(), protocol.c_str(), vpc_ip.c_str(), remote_ip.c_str()); + int single_ebpf_rc = bpf_map_update_elem(fd, &sg_key, &sg_value, BPF_ANY); + if (single_ebpf_rc != 0) { + ebpf_rc = single_ebpf_rc; + printf("Tried to insert into sg rule ebpf map, but got RC: [%ld], errno: [%s]\n", single_ebpf_rc, std::strerror(errno)); + } else { + printf("Insert into sg eBPF map returned %ld\n", single_ebpf_rc); + // on success, also put in local in memory cache + db_client::get_instance().sg_rule_cache[sg_key] = sg_value; + } + + printf("GPPC: Inserted this sg rule into map: vip: %s, vni: %d\n", vpc_ip.c_str(), vni); + + } + if (ebpf_rc < 0) { + // safely rollback + // rollback version, for insertion case let's revert it to 0 + security_group_rule_task_map.assign(security_group_id, 0); // rollback map status map_updated = false; } + printf("Found neighbor key in security_group_rule_task_map\n"); + } // 'else' means another thread already inserted before me, then it's not an insert case and next time in the loop will go to case of update + } else { + printf("Didn't find neighbor key in security_group_rule_task_map\n"); + // key found, means multi neighbor versions might update at the same time + int cur_ver = sg_pos->second; + + if (version > cur_ver) { + // only update sg rule version + // 1. when received (from ArionMaster) sg rule version is greater than current version in map + // 2. and only if the element to update is the original element (version in 'find') + if (security_group_rule_task_map.assign(security_group_id, version)) { + map_updated = true; + + // step 1.5 get all related security group rules. + auto rows = db_client::get_instance().local_db.get_all<::SecurityGroupPortBinding>( + where( + c(&::SecurityGroupPortBinding::security_group_id) == security_group_id.c_str() + ) + ); + // printf("Retrieved %ld rows of security group rules with security group id == [%s]\n", rows.size(), security_group_id.c_str()); + int ebpf_rc = 0; + printf("Found %ld sg rules related to this ID: %s\n", rows.size(), security_group_id.c_str()); + for (auto &binding : rows) { + // step #2 - sync syscall ebpf map programming with return code + std::string delimiter = "-"; //because port_id is in the format of "vni-vpc_id" + std::string vpc_ip = binding.port_id.substr(binding.port_id.find(delimiter) + 1); + string remote_ip; + int prefixlen = 0 ; + remote_ip = remote_ip_prefix.substr(0, remote_ip_prefix.find("/")); + prefixlen = atoi((remote_ip_prefix.substr(remote_ip_prefix.find("/") + 1).c_str())); + + struct sockaddr_in local_ip_sock, remote_ip_sock; + inet_pton(AF_INET, vpc_ip.c_str(), &(local_ip_sock.sin_addr)); + inet_pton(AF_INET, remote_ip.c_str(), &(remote_ip_sock.sin_addr)); + sg_cidr_key_t sg_key; + sg_key.vni = vni; + sg_key.prefixlen = prefixlen + 96; // 96 = ( __u32 vni; + __u16 port; + __u8 direction; + __u8 protocol; + __u32 local_ip; ) + sg_key.remote_ip = remote_ip_sock.sin_addr.s_addr; + sg_key.local_ip = local_ip_sock.sin_addr.s_addr; + sg_key.direction = direction == "egress" ? 0 : 1; // going out is 0 and coming in is 1 + + if (protocol == "TCP") { + sg_key.protocol = IPPROTO_TCP; + } else if (protocol == "UDP") { + sg_key.protocol = IPPROTO_UDP; + } else { + sg_key.protocol = IPPROTO_NONE; + } + + sg_key.port = port_range_min; //TODO: see if we should use this or other fields + + sg_cidr_t sg_value; + sg_value.sg_id = 1; + sg_value.action = 1; // 1 for allow and other values for drop + printf("Inserting sg rule with prefixlen: %ld VNI: %s, port: %ld, direction: %s, protocol: %s, local_ip: %s, remote_ip: %s\n", + prefixlen, vni, port_range_min, direction.c_str(), protocol.c_str(), vpc_ip.c_str(), remote_ip.c_str()); + int single_ebpf_rc = bpf_map_update_elem(fd, &sg_key, &sg_value, BPF_ANY); + if (single_ebpf_rc != 0) { + ebpf_rc = single_ebpf_rc; + printf("Tried to insert into sg rule ebpf map, but got RC: [%ld], errno: [%s]\n", single_ebpf_rc, std::strerror(errno)); + } else { + printf("Insert into sg eBPF map returned %ld\n", single_ebpf_rc); + // on success, also put in local in memory cache + db_client::get_instance().sg_rule_cache[sg_key] = sg_value; + } + + printf("GPPC: Inserted this sg rule into map: vip: %s, vni: %d\n", vpc_ip.c_str(), vni); + if (single_ebpf_rc == 0) { + // also update the security group rule version journal + db_client::get_instance().local_db_writer_queue.dispatch([version, &add_programmed_security_group_port_binding_version_db_stmt] { + get<0>(add_programmed_security_group_port_binding_version_db_stmt) = { version }; + db_client::get_instance().local_db.execute( + add_programmed_security_group_port_binding_version_db_stmt); + }); + } + + } + if (ebpf_rc < 0) { + // safely rollback + // rollback version, for insertion case let's revert it to 0 + security_group_rule_task_map.assign(security_group_id, 0); + + // rollback map status + map_updated = false; + } + } + } else { + // otherwise + // ignore: + // 1. update concurrent hash map + // 2. update ebpf map to not overwrite new data with out dated data + // 3. update local db table 1 (table 1 is for local lookup) since it is an old version + // update: journal table (since this skipped version is treated as programming succeeded) + ebpf_ignored = true; + map_updated = true; } + } + + update_ct++; + // exit transaction section + segment_lock.unlock(security_group_id); + } + + if (map_updated) { + if (!ebpf_ignored) { + printf("ebpf_ignored = false\n"); + + // step #3 - async call to write/update to local db table + db_client::get_instance().local_db_writer_queue.dispatch( + [security_group_id, remote_group_id, direction, remote_ip_prefix, protocol, port_range_max, port_range_min, &add_or_update_security_group_rule_db_stmt, ether_type, vni, version] { + get<0>(add_or_update_security_group_rule_db_stmt) = + { security_group_id, remote_group_id, direction, remote_ip_prefix, protocol, port_range_max, port_range_min, ether_type, vni, version }; + db_client::get_instance().local_db.execute(add_or_update_security_group_rule_db_stmt); + }); + printf("Dispatched local db security group port binding insert\n"); + + // no journal to write in this case. + } else { + printf("ebpf_ignored = true\n"); + // step #4 (case 2) - always write to local db table 5 (programming journal) when version intended ignored (no need to program older version) + // also update the security group rule version journal + db_client::get_instance().local_db_writer_queue.dispatch([version, &add_programmed_security_group_port_binding_version_db_stmt] { + get<0>(add_programmed_security_group_port_binding_version_db_stmt) = { version }; + db_client::get_instance().local_db.execute( + add_programmed_security_group_port_binding_version_db_stmt); + }); + } + } else { + printf("Failed to update security group rule with vni: %d and id: %s in map, skipping it\n", vni, security_group_id.c_str()); + } + + i++; + }); + } else { + printf("security group id is empty\n"); + } + } else if (call->reply.has_securitygroupportbinding()) { + auto port_id = call->reply.securitygroupportbinding().portid(); + auto security_group_id = call->reply.securitygroupportbinding().securitygroupid(); + auto version = call->reply.securitygroupportbinding().version(); + int fd = fd_security_group_ebpf_map; + + // non-empty rule + if ("" != port_id && "" != security_group_id) { + marl::schedule([this, &i, port_id, security_group_id, version, fd, + &add_or_update_security_group_port_binding_stmt, &add_programmed_security_group_port_binding_version_db_stmt] { + // step #0 - split the port id into vni and vpc_id, then get the security group rules based on the security group id + std::string delimiter = "-"; //because port_id is in the format of "vni-vpc_id" + std::string vni = port_id.substr(0, port_id.find(delimiter)); + std::string vpc_ip = port_id.substr(port_id.find(delimiter) + 1); + // step #1 - check and store as in concurrent hash map + std::string security_group_port_binding_id = port_id + "-" + security_group_id; + printf("vpc_ip is NOT empty: [%s]\n", vpc_ip.c_str()); + bool ebpf_ignored = false; + bool map_updated = false; + int update_ct = 0, max_update_ct = 5; + + while (!map_updated && (update_ct < max_update_ct)) { + // lock transaction section + // segment lock allows some level of concurrent manipulations of concurrent version map + // as long as the multi-threading version updates' keys are not hashed to the same slot in segment array + segment_lock.lock(security_group_id); + printf("Inside while loop, map_updated = [%b], update_ct = [%ld], max_update_ct = [%ld]\n", + map_updated, update_ct, max_update_ct); + auto sg_pos = security_group_rule_task_map.find(security_group_port_binding_id); + if (sg_pos == security_group_rule_task_map.end()) { + // key not found, try insert. The function returns successful only when key not exists when inserting + auto res_insert = + security_group_rule_task_map.insert(security_group_port_binding_id, version); + if (res_insert.second) { + // means successfully inserted, done with update + map_updated = true; + printf("Found neighbor key in security_group_rule_task_map\n"); + // step 1.5 get all related security group rules. + auto rows = db_client::get_instance().local_db.get_all<::SecurityGroupRule>( + where( + c(&::SecurityGroupRule::security_group_id) == security_group_id.c_str() + ) + ); + // printf("Retrieved %ld rows of security group rules with security group id == [%s]\n", rows.size(), security_group_id.c_str()); + int ebpf_rc = 0; + printf("Found %ld sg rules related to this ID: %s\n", rows.size(), security_group_id.c_str()); + for (auto &rule : rows) { + // step #2 - sync syscall ebpf map programming with return code + string remote_ip; + int prefixlen = 0 ; + remote_ip = rule.remote_ip_prefix.substr(0, rule.remote_ip_prefix.find("/")); + prefixlen = atoi((rule.remote_ip_prefix.substr(rule.remote_ip_prefix.find("/") + 1).c_str())); + + struct sockaddr_in local_ip_sock, remote_ip_sock; + inet_pton(AF_INET, vpc_ip.c_str(), &(local_ip_sock.sin_addr)); + inet_pton(AF_INET, remote_ip.c_str(), &(remote_ip_sock.sin_addr)); + sg_cidr_key_t sg_key; + sg_key.vni = atoi(vni.c_str()); + sg_key.prefixlen = prefixlen + 96; // 96 = ( __u32 vni; + __u16 port; + __u8 direction; + __u8 protocol; + __u32 local_ip; ) + sg_key.remote_ip = remote_ip_sock.sin_addr.s_addr; + sg_key.local_ip = local_ip_sock.sin_addr.s_addr; + sg_key.direction = rule.direction == "egress" ? 0 : 1; // going out is 0 and coming in is 1 + + if (rule.protocol == "tcp") { + sg_key.protocol = IPPROTO_TCP; + } else if (rule.protocol == "udp") { + sg_key.protocol = IPPROTO_UDP; + } else { + sg_key.protocol = IPPROTO_NONE; + } + + sg_key.port = rule.port_range_min; //TODO: see if we should use this or other fields + + sg_cidr_t sg_value; + sg_value.sg_id = 1; + sg_value.action = 1; // 1 for allow and other values for drop + printf("Inserting sg rule with prefixlen: %ld VNI: %s, port: %ld, direction: %s, protocol: %s, local_ip: %s, remote_ip: %s\n", + prefixlen, vni.c_str(), rule.port_range_min, rule.direction.c_str(), rule.protocol.c_str(), vpc_ip.c_str(), remote_ip.c_str()); + int single_ebpf_rc = bpf_map_update_elem(fd, &sg_key, &sg_value, BPF_ANY); + if (single_ebpf_rc != 0) { + ebpf_rc = single_ebpf_rc; + printf("Tried to insert into sg rule ebpf map, but got RC: [%ld], errno: [%s]\n", single_ebpf_rc, std::strerror(errno)); + } else { + printf("Insert into sg eBPF map returned %ld\n", single_ebpf_rc); + } + // also put in local in memory cache + db_client::get_instance().sg_rule_cache[sg_key] = sg_value;//.insert(epkey, ep); + printf("GPPC: Inserted this sg rule into map: vip: %s, vni: %s\n", vpc_ip.c_str(), vni.c_str()); + + } + // step #4 (case 1) - when ebpf programming not ignored, write to table 2 (programming journal) when programming succeeded + if (0 == ebpf_rc) { + db_client::get_instance().local_db_writer_queue.dispatch([version, &add_programmed_security_group_port_binding_version_db_stmt] { + get<0>(add_programmed_security_group_port_binding_version_db_stmt) = { version }; + db_client::get_instance().local_db.execute( + add_programmed_security_group_port_binding_version_db_stmt); + }); + } else { + printf("ebpf_rc = [%ld], this version isn't finished, NOT updating the local DB.\n", ebpf_rc); + } + printf("Dispatched local db sg journal insert\n"); + // step #3 - async call to write/update to local db table + db_client::get_instance().local_db_writer_queue.dispatch([security_group_id, port_id, version, &add_or_update_security_group_port_binding_stmt] { + get<0>(add_or_update_security_group_port_binding_stmt) = { port_id, security_group_id }; + db_client::get_instance().local_db.execute(add_or_update_security_group_port_binding_stmt); + }); + printf("Dispatched local db security group port binding insert\n"); + + } // 'else' means another thread already inserted before me, then it's not an insert case and next time in the loop will go to case of update } else { - // otherwise - // ignore: - // 1. update concurrent hash map - // 2. update ebpf map to not overwrite new data with out dated data - // 3. update local db table 1 (table 1 is for local lookup) since it is an old version - // update: journal table (since this skipped version is treated as programming succeeded) - ebpf_ignored = true; - map_updated = true; + printf("Didn't find neighbor key in security_group_rule_task_map\n"); + // key found, means multi neighbor versions might update at the same time + int cur_ver = sg_pos->second; + + if (version > cur_ver) { + // only update neighbor version + // 1. when received (from ArionMaster) neighbor version is greater than current version in map + // 2. and only if the element to update is the original element (version in 'find') + if (security_group_rule_task_map.assign(security_group_port_binding_id, version)) { + map_updated = true; + // step 1.5 get all related security group rules. + auto rows = db_client::get_instance().local_db.get_all<::SecurityGroupRule>( + where( + c(&::SecurityGroupRule::security_group_id) == security_group_id.c_str() + ) + ); + // printf("Retrieved %ld rows of security group rules with security group id == [%s]\n", rows.size(), security_group_id.c_str()); + int ebpf_rc = 0; + printf("Found %ld sg rules related to this ID: %s\n", rows.size(), security_group_id.c_str()); + for (auto &rule : rows) { + // step #2 - sync syscall ebpf map programming with return code + string remote_ip; + int prefixlen = 0 ; + remote_ip = rule.remote_ip_prefix.substr(0, rule.remote_ip_prefix.find("/")); + prefixlen = atoi((rule.remote_ip_prefix.substr(rule.remote_ip_prefix.find("/") + 1).c_str())); + + struct sockaddr_in local_ip_sock, remote_ip_sock; + inet_pton(AF_INET, vpc_ip.c_str(), &(local_ip_sock.sin_addr)); + inet_pton(AF_INET, remote_ip.c_str(), &(remote_ip_sock.sin_addr)); + sg_cidr_key_t sg_key; + sg_key.vni = atoi(vni.c_str()); + sg_key.prefixlen = prefixlen + 96; // 96 = ( __u32 vni; + __u16 port; + __u8 direction; + __u8 protocol; + __u32 local_ip; ) + sg_key.remote_ip = remote_ip_sock.sin_addr.s_addr; + sg_key.local_ip = local_ip_sock.sin_addr.s_addr; + sg_key.direction = rule.direction == "egress" ? 0 : 1; // going out is 0 and coming in is 1 + + if (rule.protocol == "tcp") { + sg_key.protocol = IPPROTO_TCP; + } else if (rule.protocol == "udp") { + sg_key.protocol = IPPROTO_UDP; + } else { + sg_key.protocol = IPPROTO_NONE; + } + + sg_key.port = rule.port_range_min; //TODO: see if we should use this or other fields + + sg_cidr_t sg_value; + sg_value.sg_id = 1; + sg_value.action = 1; // 1 for allow and other values for drop + printf("Inserting sg rule with prefixlen: %ld VNI: %s, port: %ld, direction: %s, protocol: %s, local_ip: %s, remote_ip: %s\n", + prefixlen, vni.c_str(), rule.port_range_min, rule.direction.c_str(), rule.protocol.c_str(), vpc_ip.c_str(), remote_ip.c_str()); + int single_ebpf_rc = bpf_map_update_elem(fd, &sg_key, &sg_value, BPF_ANY); + if (single_ebpf_rc != 0) { + ebpf_rc = single_ebpf_rc; + printf("Tried to insert into sg rule ebpf map, but got RC: [%ld], errno: [%s]\n", single_ebpf_rc, std::strerror(errno)); + } else { + printf("Insert into sg eBPF map returned %ld\n", single_ebpf_rc); + } + // also put in local in memory cache + db_client::get_instance().sg_rule_cache[sg_key] = sg_value;//.insert(epkey, ep); + printf("GPPC: Inserted this sg rule into map: vip: %s, vni: %s\n", vpc_ip.c_str(), vni.c_str()); + + } + // step #4 (case 1) - when ebpf programming not ignored, write to table 2 (programming journal) when programming succeeded + if (0 == ebpf_rc) { + db_client::get_instance().local_db_writer_queue.dispatch([version, &add_programmed_security_group_port_binding_version_db_stmt] { + get<0>(add_programmed_security_group_port_binding_version_db_stmt) = { version }; + db_client::get_instance().local_db.execute( + add_programmed_security_group_port_binding_version_db_stmt); + }); + } else { + printf("ebpf_rc = [%ld], this version isn't finished, NOT updating the local DB.\n", ebpf_rc); + } + printf("Dispatched local db sg journal insert\n"); + // step #3 - async call to write/update to local db table + db_client::get_instance().local_db_writer_queue.dispatch([security_group_id, port_id, version, &add_or_update_security_group_port_binding_stmt] { + get<0>(add_or_update_security_group_port_binding_stmt) = { port_id, security_group_id }; + db_client::get_instance().local_db.execute(add_or_update_security_group_port_binding_stmt); + }); + printf("Dispatched local db security group port binding insert\n"); + } + } else { + // otherwise + // ignore: + // 1. update concurrent hash map + // 2. update ebpf map to not overwrite new data with out dated data + // 3. update local db table 1 (table 1 is for local lookup) since it is an old version + // update: journal table (since this skipped version is treated as programming succeeded) + ebpf_ignored = true; + map_updated = true; + } } + + update_ct++; + segment_lock.unlock(security_group_id); } - update_ct++; - - // exit transaction section - segment_lock.unlock(neighbor_key); - } - - if (map_updated) { - if (!ebpf_ignored) { - // step #3 - async call to write/update to local db table 1 - local_db_writer_queue.dispatch([vni, vpc_ip, host_ip, vpc_mac, host_mac, ver, &add_or_update_neighbor_db_stmt] { - get<0>(add_or_update_neighbor_db_stmt) = { vni, vpc_ip, host_ip, vpc_mac, host_mac, ver }; - local_db.execute(add_or_update_neighbor_db_stmt); - }); - - // step #4 (case 1) - when ebpf programming not ignored, write to table 2 (programming journal) when programming succeeded - if (0 == ebpf_rc) { - local_db_writer_queue.dispatch([ver, &add_programmed_version_db_stmt] { - get<0>(add_programmed_version_db_stmt) = { ver }; - local_db.execute(add_programmed_version_db_stmt); + if (map_updated) { + if (!ebpf_ignored) { + printf("ebpf_ignored = false\n"); + } else { + printf("ebpf_ignored = true\n"); + // step #4 (case 2) - always write to local db table 2 (programming journal) when version intended ignored (no need to program older version) + db_client::get_instance().local_db_writer_queue.dispatch([version, &add_programmed_security_group_port_binding_version_db_stmt] { + get<0>(add_programmed_security_group_port_binding_version_db_stmt) = { version }; + db_client::get_instance().local_db.execute( + add_programmed_security_group_port_binding_version_db_stmt); }); } } else { - // step #4 (case 2) - always write to local db table 2 (programming journal) when version intended ignored (no need to program older version) - local_db_writer_queue.dispatch([ver, &add_programmed_version_db_stmt] { - get<0>(add_programmed_version_db_stmt) = { ver }; - local_db.execute(add_programmed_version_db_stmt); - }); + printf("Failed to update neighbor %d %s in map, skipping it\n", vni.c_str(), vpc_ip.c_str()); } - } else { - printf("Failed to update neighbor %d %s in map, skipping it\n", vni, vpc_ip.c_str()); - } - i++; - }); + i++; + }); + } else { + printf("port_id [%s] or security_group_id: [%s] is empty\n", port_id.c_str(), security_group_id.c_str()); + } } else { + printf("This reply doesn't have a neighbor rule, a security group rule, or a security group port binding\n"); } + } + } else { + printf("NOT okay\n"); } } } @@ -240,34 +770,79 @@ void ArionMasterWatcherImpl::ConnectToArionMaster() { printf("After initiating a new sub to connect to the Arion Master: %s\n", (server_address + ":" + server_port).c_str()); } -void ArionMasterWatcherImpl::RunClient(std::string ip, std::string port, std::string group, std::string table) { +void ArionMasterWatcherImpl::RunClient(std::string ip, std::string port, std::string group, std::string endpoints_table, std::string security_group_rules_table) { printf("Running a grpc client in a separate thread id: %ld\n", std::this_thread::get_id()); server_address = ip; server_port = port; group_id = group; - table_name_neighbor_ebpf_map = table; + table_name_neighbor_ebpf_map = endpoints_table; + table_name_sg_ebpf_map = security_group_rules_table; // Retrieve neighbor's ebpf map fd (handle) fd_neighbor_ebpf_map = bpf_obj_get(table_name_neighbor_ebpf_map.c_str()); - if (fd_neighbor_ebpf_map < 0) { - printf("Failed to get xdp neighbor endpoint map fd, exiting\n"); - return; - } else { - printf("Got xdp neighbor endpoint map fd %d\n", fd_neighbor_ebpf_map); - } +// if (fd_neighbor_ebpf_map < 0) { +// printf("Failed to get xdp neighbor endpoint map fd, exiting\n"); +// return; +// } else { +// printf("Got xdp neighbor endpoint map fd %d\n", fd_neighbor_ebpf_map); +// } + + // check if security group ebpf map exists, and create it if it doesn't + + fd_security_group_ebpf_map = bpf_obj_get(table_name_sg_ebpf_map.c_str()); + + if (fd_security_group_ebpf_map < 0) { + printf("Creating security_group_ebpf_map manually\n"); + + size_t key_size_security_group; + key_size_security_group = sizeof(sg_cidr_key_t); + + printf("Key size: %ld, value size: %ld\n", key_size_security_group, sizeof(sg_cidr_t)); + fd_security_group_ebpf_map = bpf_create_map(BPF_MAP_TYPE_LPM_TRIE, + key_size_security_group, + sizeof(sg_cidr_t), + 999, // need to change it to a bigger number later. + BPF_F_NO_PREALLOC); + + if (fd_security_group_ebpf_map <= 0) { + printf("Tried to manually create security group map, but failed with fd: %ld, and error no: %s, returning\n", + fd_security_group_ebpf_map, std::strerror(errno)); + exit(-1); + } + printf("Manually created security group map with fd: %ld, returning\n", fd_security_group_ebpf_map); - // Create (if db not exists) or connect (if db exists already) to local db - local_db.sync_schema(); + } // Find lkg version to reconcile/sync from server - int rev_lkg = FindLKGVersion(); + int rev_lkg = db_client::get_instance().FindLKGVersion(); printf("Found last known good version: %d from local db to sync from server\n", rev_lkg); - + db_client::get_instance().FillEndpointCacheFromDB(); this->ConnectToArionMaster(); grpc::CompletionQueue cq; - ArionWingRequest watch_req; - watch_req.set_group(group_id); - watch_req.set_rev(rev_lkg); - this->RequestNeighborRules(&watch_req, &cq); + // This vector includes the Arion Requests that will be sent to Arion Master + vector arion_request_vector; + ArionWingRequest neighbor_watch_req; + neighbor_watch_req.set_map("NeighborRule"); + neighbor_watch_req.set_group(""/*group_id*/); + neighbor_watch_req.set_rev(0/*rev_lkg*/); + + ArionWingRequest security_group_rule_watch_req; + security_group_rule_watch_req.set_map("SecurityGroupRule"); + // set version 0 for now. + /* TODO: Change the version and group name to valid ones, + Same group of neighbor and security group should have the same version + */ + security_group_rule_watch_req.set_rev(0); + // set empty group rule for now. + security_group_rule_watch_req.set_group(""); + + ArionWingRequest security_group_port_binding_watch_req; + security_group_port_binding_watch_req.set_map("SecurityGroupPortBinding"); + security_group_port_binding_watch_req.set_rev(0); + security_group_port_binding_watch_req.set_group(""); + arion_request_vector.emplace_back(&neighbor_watch_req); + arion_request_vector.emplace_back(&security_group_rule_watch_req); + arion_request_vector.emplace_back(&security_group_port_binding_watch_req); + this->RequestArionMaster(&arion_request_vector, &cq); } diff --git a/src/main.cpp b/src/main.cpp index a0cd18a..eee37e0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -12,7 +12,7 @@ // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, // WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -#include "grpc_client.h" +//#include "grpc_client.h" #include #include @@ -25,6 +25,8 @@ #include "marl/event.h" #include "marl/scheduler.h" #include "marl/waitgroup.h" +#include "af_xdp_user_multi_thread.h" +#include "grpc_client.h" using namespace std; using std::string; @@ -40,6 +42,7 @@ ArionMasterWatcherImpl *g_grpc_client = NULL; string g_arion_master_address = EMPTY_STRING; string g_arion_master_port = "9090"; string g_arion_neighbor_table = "/sys/fs/bpf/endpoints_map"; +string g_arion_security_group_table = "/sys/fs/bpf/sg_cidr_map"; //TODO: let ArionMaster figure out group from ArionWing IP (in grpc channel) string g_arion_group = "group1"; @@ -100,7 +103,7 @@ int main(int argc, char *argv[]) { signal(SIGINT, signal_handler); signal(SIGTERM, signal_handler); - while ((option = getopt(argc, argv, "a:p:g:d:")) != -1) { + while ((option = getopt(argc, argv, "a:p:g:d")) != -1) { switch (option) { case 'a': g_arion_master_address = optarg; @@ -125,9 +128,12 @@ int main(int argc, char *argv[]) { } } + printf("Read arion master IP: [%s], arion master port: [%s], arion group name: [%s]\n", + g_arion_master_address.c_str(), g_arion_master_port.c_str(), g_arion_group.c_str()); // Create marl scheduler using all the logical processors available to the process. // Bind this scheduler to the main thread so we can call marl::schedule() marl::Scheduler::Config cfg_bind_hw_cores; + cfg_bind_hw_cores.setWorkerThreadCount(thread_pools_size * 2); marl::Scheduler task_scheduler(cfg_bind_hw_cores); task_scheduler.bind(); @@ -138,10 +144,16 @@ int main(int argc, char *argv[]) { marl::schedule([=] { g_grpc_client->RunClient(g_arion_master_address, g_arion_master_port, - g_arion_group, - g_arion_neighbor_table); + g_arion_group,\ + g_arion_neighbor_table, + g_arion_security_group_table); }); + /* + auto afm = af_xdp_user_multi_thread(); + pthread_t t; + pthread_create(&t, NULL, &af_xdp_user_multi_thread::run_af_xdp_multi_threaded, &afm); + */ pause(); cleanup();