diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..cafd2d66 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "metrics-exporter-influx"] + path = metrics-exporter-influx + url = https://github.com/aizensoosuke/metrics-exporter-influx.git diff --git a/Cargo.lock b/Cargo.lock index f84cd003..682e4acb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -52,6 +52,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -141,6 +153,15 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "ascii-canvas" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8824ecca2e851cec16968d54a01dd372ef8f95b244fb84b84e70128be347c3c6" +dependencies = [ + "term", +] + [[package]] name = "asn1-rs" version = "0.6.2" @@ -180,6 +201,149 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-compression" +version = "0.4.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b37fc50485c4f3f736a4fb14199f6d5f5ba008d7f28fe710306c92780f004c07" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "async-executor" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb812ffb58524bdd10860d7d974e2f01cc0950c2438a74ee5ec2e2280c6c4ffa" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "pin-project-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.3.1", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1237c0ae75a0f3765f58910ff9cdd0a12eeb39ab2f4c7de23262f337f0aacbb3" +dependencies = [ + "async-lock", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "tracing", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener 5.4.0", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-object-pool" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "333c456b97c3f2d50604e8b2624253b7f787208cb72eb75e64b0ad11b221652c" +dependencies = [ + "async-std", +] + +[[package]] +name = "async-process" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde3f4e40e6021d7acffc90095cbd6dc54cb593903d1de5832f435eb274b85dc" +dependencies = [ + "async-channel 2.3.1", + "async-io", + "async-lock", + "async-signal", + "async-task", + "blocking", + "cfg-if", + "event-listener 5.4.0", + "futures-lite", + "rustix", + "tracing", +] + [[package]] name = "async-recursion" version = "1.1.1" @@ -191,6 +355,58 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "async-signal" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7605a4e50d4b06df3898d5a70bf5fde51ed9059b0434b73105193bc27acce0d" +dependencies = [ + "async-io", + "async-lock", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix", + "signal-hook-registry", + "slab", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-std" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "730294c1c08c2e0f85759590518f6333f0d5a0a766a27d519c1b244c3dfd8a24" +dependencies = [ + "async-attributes", + "async-channel 1.9.0", + "async-global-executor", + "async-io", + "async-lock", + "async-process", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.88" @@ -226,7 +442,7 @@ dependencies = [ "miniz_oxide", "object", "rustc-demangle", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -235,6 +451,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -247,6 +469,17 @@ version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3" +[[package]] +name = "basic-cookies" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67bd8fd42c16bdb08688243dc5f0cc117a3ca9efeeaba3a345a18a6159ad96f7" +dependencies = [ + "lalrpop", + "lalrpop-util", + "regex", +] + [[package]] name = "bimap" version = "0.6.3" @@ -262,6 +495,21 @@ dependencies = [ "serde", ] +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -295,6 +543,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel 2.3.1", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "btparse" version = "0.2.0" @@ -445,6 +706,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -500,6 +770,36 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "crunchy" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" + [[package]] name = "crypto-bigint" version = "0.5.5" @@ -653,6 +953,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc 0.2.172", + "redox_users", + "winapi", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -742,6 +1063,21 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ena" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d248bdd43ce613d87415282f69b9bb99d947d290b10962dd6c56233312c2ad5" +dependencies = [ + "log", +] + +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "enum-display" version = "0.1.4" @@ -803,6 +1139,49 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cea14ef9355e3beab063703aa9dab15afd25f0667c341310c1e5274bb1d0da18" +dependencies = [ + "libc 0.2.172", + "windows-sys 0.59.0", +] + +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener 5.4.0", + "pin-project-lite", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "ff" version = "0.13.1" @@ -819,6 +1198,12 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flarch" version = "0.9.2" @@ -834,6 +1219,7 @@ dependencies = [ "futures", "js-sys", "log", + "metrics", "rand 0.8.5", "regex", "rmp-serde", @@ -854,13 +1240,23 @@ dependencies = [ "webrtc", ] +[[package]] +name = "flate2" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flcrypto" version = "0.9.2" dependencies = [ "anyhow", "async-recursion", - "base64", + "base64 0.22.1", "bytes", "ed25519-dalek", "enum_dispatch", @@ -892,9 +1288,10 @@ dependencies = [ "flmodules", "flnode", "log", - "thiserror 2.0.12", + "reqwest", + "serde", + "serde_json", "tokio", - "webrtc-util", ] [[package]] @@ -925,8 +1322,9 @@ dependencies = [ "flmodules", "futures", "getrandom 0.2.16", - "itertools", + "itertools 0.14.0", "log", + "metrics", "names", "num-bigint", "rand 0.8.5", @@ -958,7 +1356,7 @@ dependencies = [ "flmodules", "futures", "getrandom 0.2.16", - "itertools", + "itertools 0.14.0", "log", "names", "rand 0.8.5", @@ -982,6 +1380,8 @@ dependencies = [ "flarch", "flmodules", "log", + "metrics", + "metrics-exporter-influx", "thiserror 2.0.12", "tokio", ] @@ -1049,6 +1449,19 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -1145,14 +1558,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] -name = "group" -version = "0.13.0" +name = "gloo-timers" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" dependencies = [ - "ff", - "rand_core 0.6.4", - "subtle", + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "group" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" +dependencies = [ + "ff", + "rand_core 0.6.4", + "subtle", ] [[package]] @@ -1161,6 +1586,15 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.15.3" @@ -1173,6 +1607,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f154ce46856750ed433c8649605bf7ed2de3bc35fd9d2a9f30cddd873c80cb08" + [[package]] name = "hex" version = "0.4.3" @@ -1197,6 +1637,17 @@ dependencies = [ "digest", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.3.1" @@ -1208,6 +1659,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -1215,7 +1677,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.3.1", ] [[package]] @@ -1226,8 +1688,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "pin-project-lite", ] @@ -1237,6 +1699,63 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "httpmock" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08ec9586ee0910472dec1a1f0f8acf52f0fdde93aea74d70d4a3107b4be0fd5b" +dependencies = [ + "assert-json-diff", + "async-object-pool", + "async-std", + "async-trait", + "base64 0.21.7", + "basic-cookies", + "crossbeam-utils", + "form_urlencoded", + "futures-util", + "hyper 0.14.32", + "lazy_static", + "levenshtein", + "log", + "regex", + "serde", + "serde_json", + "serde_regex", + "similar", + "tokio", + "url", +] + +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.6.0" @@ -1246,8 +1765,8 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "httparse", "itoa", "pin-project-lite", @@ -1263,8 +1782,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" dependencies = [ "futures-util", - "http", - "hyper", + "http 1.3.1", + "hyper 1.6.0", "hyper-util", "rustls", "rustls-pki-types", @@ -1276,17 +1795,21 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.11" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" +checksum = "dc2fdfdbff08affe55bb779f33b053aa1fe5dd5b54c257343c17edfa55711bdb" dependencies = [ + "base64 0.22.1", "bytes", "futures-channel", + "futures-core", "futures-util", - "http", - "http-body", - "hyper", + "http 1.3.1", + "http-body 1.0.1", + "hyper 1.6.0", + "ipnet", "libc 0.2.172", + "percent-encoding", "pin-project-lite", "socket2", "tokio", @@ -1489,12 +2012,40 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +[[package]] +name = "iri-string" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -1544,12 +2095,58 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + +[[package]] +name = "lalrpop" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cb077ad656299f160924eb2912aa147d7339ea7d69e1b5517326fdcec3c1ca" +dependencies = [ + "ascii-canvas", + "bit-set", + "ena", + "itertools 0.11.0", + "lalrpop-util", + "petgraph", + "pico-args", + "regex", + "regex-syntax 0.8.5", + "string_cache", + "term", + "tiny-keccak", + "unicode-xid", + "walkdir", +] + +[[package]] +name = "lalrpop-util" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507460a910eb7b32ee961886ff48539633b788a36b65692b95f225b844c82553" +dependencies = [ + "regex-automata 0.4.9", +] + [[package]] name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "levenshtein" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db13adb97ab515a3691f56e4dbab09283d0b86cb45abd991d8634a9d6f501760" + [[package]] name = "libc" version = "0.2.172" @@ -1569,12 +2166,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.9.0", + "libc 0.2.172", +] + [[package]] name = "linked-hash-map" version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + [[package]] name = "litemap" version = "0.8.0" @@ -1596,6 +2209,9 @@ name = "log" version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +dependencies = [ + "value-bag", +] [[package]] name = "lru-slab" @@ -1603,6 +2219,24 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "mach2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709" +dependencies = [ + "libc 0.2.172", +] + +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "md-5" version = "0.10.6" @@ -1628,6 +2262,70 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" +dependencies = [ + "ahash", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-influx" +version = "0.2.2" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "http 0.2.12", + "httpmock", + "indexmap 1.9.3", + "itertools 0.13.0", + "metrics", + "metrics-util", + "quanta 0.12.5", + "reqwest", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tokio-retry", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "metrics-macros" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "metrics-util" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.13.1", + "indexmap 1.9.3", + "metrics", + "num_cpus", + "ordered-float", + "quanta 0.11.1", + "radix_trie", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -1679,6 +2377,21 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "new_debug_unreachable" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" + +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.26.4" @@ -1702,6 +2415,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -1737,6 +2460,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc 0.2.172", +] + [[package]] name = "object" version = "0.36.7" @@ -1773,6 +2506,21 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "ordered-float" +version = "3.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" +dependencies = [ + "num-traits", +] + +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "p256" version = "0.13.2" @@ -1797,6 +2545,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" @@ -1817,7 +2571,7 @@ dependencies = [ "libc 0.2.172", "redox_syscall", "smallvec", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -1832,7 +2586,7 @@ version = "3.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" dependencies = [ - "base64", + "base64 0.22.1", "serde", ] @@ -1851,6 +2605,51 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.9.0", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher", +] + +[[package]] +name = "pico-args" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315" + +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1876,6 +2675,17 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -1886,6 +2696,21 @@ dependencies = [ "spki", ] +[[package]] +name = "polling" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b53a684391ad002dd6a596ceb6c74fd004fdce75f4be2e3f615068abbea5fd50" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "polyval" version = "0.6.2" @@ -1937,6 +2762,12 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "precomputed-hash" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" + [[package]] name = "primeorder" version = "0.13.6" @@ -1955,6 +2786,37 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc 0.2.172", + "mach2", + "once_cell", + "raw-cpuid 10.7.0", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + +[[package]] +name = "quanta" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e" +dependencies = [ + "crossbeam-utils", + "libc 0.2.172", + "once_cell", + "raw-cpuid 11.5.0", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quinn" version = "0.11.8" @@ -2025,6 +2887,16 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2084,6 +2956,24 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + +[[package]] +name = "raw-cpuid" +version = "11.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146" +dependencies = [ + "bitflags 2.9.0", +] + [[package]] name = "rcgen" version = "0.13.2" @@ -2107,6 +2997,17 @@ dependencies = [ "bitflags 2.9.0", ] +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom 0.2.16", + "libredox", + "thiserror 1.0.69", +] + [[package]] name = "regex" version = "1.11.1" @@ -2115,8 +3016,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2127,9 +3037,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -2138,18 +3054,20 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "reqwest" -version = "0.12.15" +version = "0.12.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb" +checksum = "a2f8e5513d63f2e5b386eb5106dc67eaf3f84e95258e210489136b8b92ad6119" dependencies = [ - "base64", + "async-compression", + "base64 0.22.1", "bytes", + "futures-channel", "futures-core", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-rustls", "hyper-util", "ipnet", @@ -2161,7 +3079,6 @@ dependencies = [ "pin-project-lite", "quinn", "rustls", - "rustls-pemfile", "rustls-pki-types", "serde", "serde_json", @@ -2171,14 +3088,14 @@ dependencies = [ "tokio-rustls", "tokio-util", "tower", + "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 0.26.11", - "windows-registry", + "webpki-roots 1.0.0", ] [[package]] @@ -2283,6 +3200,19 @@ dependencies = [ "nom", ] +[[package]] +name = "rustix" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +dependencies = [ + "bitflags 2.9.0", + "errno", + "libc 0.2.172", + "linux-raw-sys", + "windows-sys 0.59.0", +] + [[package]] name = "rustls" version = "0.23.27" @@ -2309,15 +3239,6 @@ dependencies = [ "security-framework", ] -[[package]] -name = "rustls-pemfile" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "rustls-pki-types" version = "1.12.0" @@ -2473,6 +3394,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_regex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8136f1a4ea815d7eac4101cfd0b16dc0cb5e1fe1b8609dfd728058656b7badf" +dependencies = [ + "regex", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2491,7 +3422,7 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" dependencies = [ - "base64", + "base64 0.22.1", "chrono", "hex", "indexmap 1.9.3", @@ -2549,6 +3480,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shared" version = "0.9.1" @@ -2603,6 +3543,24 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "similar" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" + +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" + [[package]] name = "slab" version = "0.4.9" @@ -2653,6 +3611,18 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "string_cache" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f" +dependencies = [ + "new_debug_unreachable", + "parking_lot", + "phf_shared", + "precomputed-hash", +] + [[package]] name = "strsim" version = "0.11.1" @@ -2665,7 +3635,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea256fb46a13f9204e9dee9982997b2c3097db175a9fddaa8350310d03c4d5a3" dependencies = [ - "base64", + "base64 0.22.1", "crc", "lazy_static", "md-5", @@ -2735,6 +3705,30 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "tempfile" +version = "3.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" +dependencies = [ + "fastrand", + "getrandom 0.3.3", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] + +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -2775,6 +3769,16 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.3.41" @@ -2806,6 +3810,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.8.1" @@ -2860,6 +3873,17 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" @@ -2925,6 +3949,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower-http" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +dependencies = [ + "bitflags 2.9.0", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "iri-string", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -2943,10 +3985,23 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "tracing-core" version = "0.1.33" @@ -2954,6 +4009,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -2970,7 +4055,7 @@ checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" dependencies = [ "bytes", "data-encoding", - "http", + "http 1.3.1", "httparse", "log", "rand 0.9.1", @@ -2988,7 +4073,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0044fdae001dd8a1e247ea6289abf12f4fcea1331a2364da512f9cd680bbd8cb" dependencies = [ "async-trait", - "base64", + "base64 0.22.1", "futures", "log", "md-5", @@ -3020,6 +4105,12 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "universal-hash" version = "0.5.1" @@ -3074,6 +4165,18 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + +[[package]] +name = "value-bag" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" + [[package]] name = "version_check" version = "0.9.5" @@ -3548,7 +4651,7 @@ dependencies = [ "windows-interface", "windows-link", "windows-result", - "windows-strings 0.4.0", + "windows-strings", ] [[package]] @@ -3579,40 +4682,20 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" -[[package]] -name = "windows-registry" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4286ad90ddb45071efd1a66dfa43eb02dd0dfbae1545ad6cc3c51cf34d7e8ba3" -dependencies = [ - "windows-result", - "windows-strings 0.3.1", - "windows-targets 0.53.0", -] - [[package]] name = "windows-result" -version = "0.3.2" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ "windows-link", ] [[package]] name = "windows-strings" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87fa48cc5d406560701792be122a10132491cff9d0aeb23583cc2dcafc847319" -dependencies = [ - "windows-link", -] - -[[package]] -name = "windows-strings" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ "windows-link", ] @@ -3623,7 +4706,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -3632,7 +4715,7 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -3641,30 +4724,14 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.6", - "windows_aarch64_msvc 0.52.6", - "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm 0.52.6", - "windows_i686_msvc 0.52.6", - "windows_x86_64_gnu 0.52.6", - "windows_x86_64_gnullvm 0.52.6", - "windows_x86_64_msvc 0.52.6", -] - -[[package]] -name = "windows-targets" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e4c7e8ceaaf9cb7d7507c974735728ab453b67ef8f18febdd7c11fe59dca8b" -dependencies = [ - "windows_aarch64_gnullvm 0.53.0", - "windows_aarch64_msvc 0.53.0", - "windows_i686_gnu 0.53.0", - "windows_i686_gnullvm 0.53.0", - "windows_i686_msvc 0.53.0", - "windows_x86_64_gnu 0.53.0", - "windows_x86_64_gnullvm 0.53.0", - "windows_x86_64_msvc 0.53.0", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] [[package]] @@ -3673,96 +4740,48 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_aarch64_msvc" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" - [[package]] name = "windows_i686_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" -[[package]] -name = "windows_i686_gnu" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1dc67659d35f387f5f6c479dc4e28f1d4bb90ddd1a5d3da2e5d97b42d6272c3" - [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_gnullvm" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" - [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_i686_msvc" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnu" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "windows_x86_64_msvc" -version = "0.53.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" - [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/Makefile b/Makefile index 5d2a1b90..9eb67da2 100644 --- a/Makefile +++ b/Makefile @@ -123,6 +123,23 @@ docker_dev: docker push fledgre/$$cli:dev; \ done +deploy: + @test "${DEVBOX_SHELL_ENABLED}" == "1" || (echo "Devbox not enabled. Aborting..."; exit 1) + @echo "Building fledger with musl, then deploying to XDC..." + cd cli && cargo build --release --target=x86_64-unknown-linux-musl -p fledger && cargo build --release --target=x86_64-unknown-linux-musl -p flsignal + @./deploy-binaries.sh + +test_realm: + /usr/bin/mktemp -d -t fledger-XXXX + ./target-common/x86_64-unknown-linux-musl/release/fledger --config "$(shell /usr/bin/mktemp -d -t fledger-XXXX)" --disable-turn-stun --signal-url ws://localhost:8765 realm create simulation --cond-pass + +test_signal: + ./target-common/x86_64-unknown-linux-musl/release/flsignal -v --max-list-len 25 + +test_just_fetch_once: + rm -rf /tmp/fledger + ./target-common/x86_64-unknown-linux-musl/release/fledger --config "$(shell /usr/bin/mktemp -d -t fledger-XXXX)" --disable-turn-stun --signal-url ws://localhost:8765 simulation just-fetch-once + clean: for c in ${CARGOS}; do \ echo "Cleaning $$c"; \ diff --git a/cli/fledger/Cargo.toml b/cli/fledger/Cargo.toml index 82570990..d3d04765 100644 --- a/cli/fledger/Cargo.toml +++ b/cli/fledger/Cargo.toml @@ -17,11 +17,17 @@ flmodules = { path = "../../flmodules", version = "0.9" } flnode = { path = "../../flnode", version = "0.9" } flcrypto = { path = "../../flcrypto", version = "0.9" } -anyhow = {version = "1", features = ["backtrace"]} +anyhow = { version = "1", features = ["backtrace"] } clap = "4" clap-verbosity-flag = "3" env_logger = "0.11" log = "0.4" -thiserror = "2" tokio = "1" -webrtc-util = "0.10" +serde_json = "1.0.140" + +reqwest = { version = "0.12.19", features = [ + "json", + "rustls-tls", + "blocking", +], default-features = false } +serde = { version = "1.0.219", features = ["derive"] } diff --git a/cli/fledger/src/hermes/api.rs b/cli/fledger/src/hermes/api.rs new file mode 100644 index 00000000..6e463789 --- /dev/null +++ b/cli/fledger/src/hermes/api.rs @@ -0,0 +1,143 @@ +use crate::hermes::snapshot::Snapshot; +use crate::hermes::update_response::UpdateResponse; +use crate::state::{Page, SimulationState}; +use anyhow::Error; +use reqwest::Method; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; +use tokio::time::Instant; + +// Command and Control server for the Fledger CLI +// +// Hermes is both a command and control server and a metrics collector for the Fledger CLI. +// It has a dashboard that allows users to monitor the state of their Fledger nodes, +// view logs, and control the nodes remotely. +#[derive(Default, Clone, Debug)] +pub struct HermesApi { + client: reqwest::blocking::Client, + node_id: u32, +} + +#[derive(Deserialize, Debug, Clone, Default)] +pub struct LostTargetPagesResponse { + pub lost_target_pages: Vec, +} + +impl HermesApi { + pub fn update(&self, state: SimulationState) -> Result { + let snapshot = Snapshot::make(state.clone()); + let url = format!("https://fledger.yohan.ch/api/nodes/{}", state.node_id); + let response = self.api_request(Method::PUT, url, &snapshot)?; + let bot_state: UpdateResponse = serde_json::from_str(&response)?; + Ok(bot_state) + } + + pub fn create_node(&mut self, experiment_id: u32, node_name: String) -> u32 { + let mut data = HashMap::new(); + data.insert("name", node_name.clone()); + let url = format!( + "https://fledger.yohan.ch/api/experiments/{}/nodes", + experiment_id + ); + + let text = self.api_request(Method::POST, url, &data).unwrap(); + let json: Value = serde_json::from_str(&text).unwrap(); + let id_opt = json["id"].as_u64(); + let id = if id_opt.is_none() { + panic!("Failed to create node: no ID returned"); + } else { + id_opt.unwrap() as u32 + }; + self.node_id = id; + id + } + + pub fn get_lost_target_pages( + &self, + experiment_id: u32, + ) -> Result { + let url = format!( + "https://fledger.yohan.ch/api/experiments/{}/lost-target-pages", + experiment_id, + ); + let response = self.api_request(Method::GET, url, &())?; + let lost_target_pages: LostTargetPagesResponse = + serde_json::from_str(&response).map_err(|e| Error::new(e))?; + Ok(lost_target_pages) + } + + pub fn start_fetching(&self, experiment_id: u32) -> Result<(), Error> { + let url = format!( + "https://fledger.yohan.ch/api/experiments/{}/start-fetching", + experiment_id + ); + self.api_request(Method::GET, url, &())?; + Ok(()) + } + + pub fn store_target_pages( + &self, + experiment_id: u32, + target_pages: Vec, + ) -> Result<(), Error> { + let mut data = HashMap::new(); + data.insert("target_pages", target_pages); + let url = format!( + "https://fledger.yohan.ch/api/experiments/{}/store-target-pages", + experiment_id + ); + self.api_request(Method::POST, url, &data)?; + Ok(()) + } + + pub fn set_node_target_pages(&self, target_pages: Vec) -> Result { + let mut data = HashMap::new(); + let ids = target_pages + .iter() + .map(|page| page.id.clone()) + .collect::>(); + data.insert("stored_targets", ids); + let url = format!( + "https://fledger.yohan.ch/api/nodes/{}/set-target-pages", + self.node_id.clone(), + ); + let response = self.api_request(Method::POST, url, &data)?; + let target_to_fetch: UpdateResponse = serde_json::from_str(&response)?; + Ok(target_to_fetch) + } + + fn api_request( + &self, + method: Method, + url: String, + data: &T, + ) -> Result { + let start = Instant::now(); + match self + .client + .request(method, url) + .json(data) + .header("Accept", "application/json") + .header( + "Authorization", + "Bearer 1|d4EeHkRPlqwpgLpALyTor5FxHI4NWg1LXJtf5NZBfd82aa17", + ) + .send() + { + Ok(resp) => { + let text = resp.text()?; + log::info!( + "Successful API request in {}ms: {}", + start.elapsed().as_millis(), + text.clone() + ); + Ok(text) + } + Err(err) => { + log::error!("Error: {}", err); + Err(Error::new(err)) + } + } + } +} diff --git a/cli/fledger/src/hermes/mod.rs b/cli/fledger/src/hermes/mod.rs new file mode 100644 index 00000000..27057798 --- /dev/null +++ b/cli/fledger/src/hermes/mod.rs @@ -0,0 +1,3 @@ +pub mod api; +pub mod snapshot; +pub mod update_response; diff --git a/cli/fledger/src/hermes/snapshot.rs b/cli/fledger/src/hermes/snapshot.rs new file mode 100644 index 00000000..7980f4a3 --- /dev/null +++ b/cli/fledger/src/hermes/snapshot.rs @@ -0,0 +1,139 @@ +use crate::state::{Page, SimulationState}; +use serde::Serialize; + +#[derive(Serialize, Default, Debug, Clone)] +pub struct Snapshot { + node_status: String, + pages_stored: Vec, + evil_no_forward: bool, + + timed_metrics: Vec<(String, u32)>, + timeless_metrics: Vec<(String, u32)>, +} + +struct MetricsBuilder { + metrics: Vec<(String, u32)>, +} + +impl MetricsBuilder { + pub fn new() -> Self { + MetricsBuilder { + metrics: Vec::new(), + } + } + + pub fn add_metric(&mut self, key: String, value: u32) { + self.metrics.push((key, value)); + } + + pub fn build(self) -> Vec<(String, u32)> { + self.metrics.clone() + } +} + +impl Snapshot { + pub fn make(simulation_state: SimulationState) -> Self { + let mut timed_metrics = MetricsBuilder::new(); + timed_metrics.add_metric( + "pages_stored".to_owned(), + simulation_state.pages_stored.len() as u32, + ); + + let mut timeless_metrics = MetricsBuilder::new(); + timeless_metrics.add_metric( + "connected_nodes_total".to_string(), + simulation_state.connected_nodes_total, + ); + timeless_metrics.add_metric( + "pages_stored_total".to_string(), + simulation_state.pages_stored.len() as u32, + ); + timeless_metrics.add_metric( + "ds_size_bytes".to_string(), + simulation_state.ds_size_bytes as u32, + ); + timeless_metrics.add_metric( + "evil_no_forward".to_string(), + simulation_state.evil_no_forward as u32, + ); + timeless_metrics.add_metric( + "store_flo_total".to_string(), + simulation_state.ds_metrics.store_flo_total, + ); + timeless_metrics.add_metric( + "request_flo_metas_sent_total".to_string(), + simulation_state.ds_metrics.request_flo_metas_sent_total, + ); + timeless_metrics.add_metric( + "flo_value_sent_total".to_string(), + simulation_state.ds_metrics.flo_value_sent_total, + ); + timeless_metrics.add_metric( + "flo_value_sent_blocked_total".to_string(), + simulation_state.ds_metrics.flo_value_sent_blocked_total, + ); + timeless_metrics.add_metric( + "available_flos_sent_total".to_string(), + simulation_state.ds_metrics.available_flos_sent_total, + ); + timeless_metrics.add_metric( + "available_flos_sent_blocked_total".to_string(), + simulation_state + .ds_metrics + .available_flos_sent_blocked_total, + ); + timeless_metrics.add_metric( + "flos_sent_total".to_string(), + simulation_state.ds_metrics.flos_sent_total, + ); + timeless_metrics.add_metric( + "flos_sent_blocked_total".to_string(), + simulation_state.ds_metrics.flos_sent_blocked_total, + ); + timeless_metrics.add_metric( + "max_flo_metas_received_in_available_flos".to_string(), + simulation_state + .ds_metrics + .max_flo_metas_received_in_available_flos, + ); + timeless_metrics.add_metric( + "max_flo_metas_requested_in_request_flos".to_string(), + simulation_state + .ds_metrics + .max_flo_metas_requested_in_request_flos, + ); + timeless_metrics.add_metric( + "max_flo_ids_received_in_request_flos".to_string(), + simulation_state + .ds_metrics + .max_flo_ids_received_in_request_flos, + ); + timeless_metrics.add_metric( + "max_flos_sent_in_flos".to_string(), + simulation_state.ds_metrics.max_flos_sent_in_flos, + ); + timeless_metrics.add_metric( + "max_flos_received_in_flos".to_string(), + simulation_state.ds_metrics.max_flos_received_in_flos, + ); + + timeless_metrics.add_metric( + "target_successfully_fetched_total".to_string(), + simulation_state.target_successfully_fetched_total, + ); + + timeless_metrics.add_metric( + "fetch_requests_total".to_string(), + simulation_state.fetch_requests_total, + ); + + Snapshot { + node_status: simulation_state.node_status, + pages_stored: simulation_state.pages_stored, + evil_no_forward: simulation_state.evil_no_forward, + + timed_metrics: timed_metrics.build(), + timeless_metrics: timeless_metrics.build(), + } + } +} diff --git a/cli/fledger/src/hermes/update_response.rs b/cli/fledger/src/hermes/update_response.rs new file mode 100644 index 00000000..e22f95cb --- /dev/null +++ b/cli/fledger/src/hermes/update_response.rs @@ -0,0 +1,8 @@ +use serde::Deserialize; + +#[derive(Deserialize, Debug, Clone, Default)] +pub struct UpdateResponse { + pub target_page_ids: Option>, +} + +impl UpdateResponse {} diff --git a/cli/fledger/src/influx/api.rs b/cli/fledger/src/influx/api.rs new file mode 100644 index 00000000..122116f8 --- /dev/null +++ b/cli/fledger/src/influx/api.rs @@ -0,0 +1,36 @@ +use tokio::time::Instant; + +#[derive(Default, Debug, Clone)] +pub struct InfluxApi { + client: reqwest::blocking::Client, +} + +impl InfluxApi { + pub fn write(&self, influx_data: String) -> Result { + let start = Instant::now(); + match self. + client + .post("https://influxdb.abehssera.com/api/v2/write?org=fledger&bucket=fledger&precision=ns") + .body(influx_data) + .header( + "Authorization", + "Token F7y_RJHnXA0szQHDhEiuRDAw7B2etGywSc-wdMK-BJtkXwplqXe5ogCcXDEJJR18ZvWJ87kwxckl6n1lFu9B-Q==", + ) + .send() + { + Ok(resp) => { + let text = resp.text()?; + log::info!( + "Successful write to influxdb ({}ms): {}", + start.elapsed().as_millis(), + text.clone(), + ); + Ok(text) + }, + Err(err) => { + log::error!("Error when writing to influxdb: {}", err); + Err(err.into()) + }, + } + } +} diff --git a/cli/fledger/src/influx/lines.rs b/cli/fledger/src/influx/lines.rs new file mode 100644 index 00000000..39d6756c --- /dev/null +++ b/cli/fledger/src/influx/lines.rs @@ -0,0 +1,88 @@ +use crate::state::SimulationState; + +// Influx Line Protocol for Fledger +pub struct InfluxLines { + node_name: String, +} + +impl InfluxLines { + fn create_influx_line(&self, measurement: String, value: u32) -> String { + format!("{measurement},node_name={} value={value}", self.node_name) + } + + pub fn make_influx_data(node_name: String, stats: SimulationState) -> String { + let generator = Self { node_name }; + + let mut lines = Vec::new(); + lines.push(generator.create_influx_line( + "fledger_realm_storage_bytes".to_string(), + stats.ds_size_bytes as u32, + )); + lines.push(generator.create_influx_line( + "fledger_pages_total".to_string(), + stats.pages_stored.len() as u32, + )); + lines.push(generator.create_influx_line( + "fledger_connected_total".to_string(), + stats.connected_nodes_total, + )); + // lines.push(generator.create_influx_line( + // "fledger_forwarded_flo_requests_total".to_string(), + // stats.request_flos_received_total, + // )); + lines.push(generator.create_influx_line( + "fledger_forwarded_flo_meta_requests_total".to_string(), + stats.ds_metrics.available_flos_sent_total, + )); + lines.push(generator.create_influx_line( + "fledger_blocked_flo_meta_requests_total".to_string(), + stats.ds_metrics.available_flos_sent_blocked_total, + )); + lines.push(generator.create_influx_line( + "fledger_flos_metas_received_from_neighbour".to_string(), + stats.ds_metrics.max_flo_metas_received_in_available_flos, + )); + lines.push(generator.create_influx_line( + "fledger_flos_requested_from_neighbour".to_string(), + stats.ds_metrics.max_flo_metas_requested_in_request_flos, + )); + lines.push(generator.create_influx_line( + "fledger_flos_ids_received_from_neighbour".to_string(), + stats.ds_metrics.max_flo_ids_received_in_request_flos, + )); + lines.push(generator.create_influx_line( + "fledger_forwarded_flo_requests_total".to_string(), + stats.ds_metrics.flos_sent_total, + )); + lines.push(generator.create_influx_line( + "fledger_flos_sent_to_neighbour".to_string(), + stats.ds_metrics.max_flos_sent_in_flos, + )); + lines.push(generator.create_influx_line( + "fledger_blocked_flo_requests_total".to_string(), + stats.ds_metrics.flos_sent_blocked_total, + )); + lines.push(generator.create_influx_line( + "fledger_flo_value_sent_total".to_string(), + stats.ds_metrics.flo_value_sent_total, + )); + lines.push(generator.create_influx_line( + "fledger_flo_value_blocked_total".to_string(), + stats.ds_metrics.flo_value_sent_blocked_total, + )); + lines.push(generator.create_influx_line( + "fledger_blocked_flo_requests_total".to_string(), + stats.ds_metrics.flos_sent_blocked_total, + )); + lines.push(generator.create_influx_line( + "fledger_flos_received_from_neighbour".to_string(), // received + stats.ds_metrics.max_flos_received_in_flos, + )); + lines.push(generator.create_influx_line( + "fledger_ds_store_flo_total".to_string(), + stats.ds_metrics.store_flo_total, + )); + + lines.join("\n") + } +} diff --git a/cli/fledger/src/influx/mod.rs b/cli/fledger/src/influx/mod.rs new file mode 100644 index 00000000..e6735683 --- /dev/null +++ b/cli/fledger/src/influx/mod.rs @@ -0,0 +1,2 @@ +pub mod api; +pub mod lines; diff --git a/cli/fledger/src/main.rs b/cli/fledger/src/main.rs index fd99ee76..2495f91e 100644 --- a/cli/fledger/src/main.rs +++ b/cli/fledger/src/main.rs @@ -13,9 +13,17 @@ use flmodules::{ use flnode::{node::Node, version::VERSION_STRING}; use page::{Page, PageCommands}; use realm::{RealmCommands, RealmHandler}; +use simulation::{SimulationCommand, SimulationHandler}; +mod hermes; +mod influx; mod page; mod realm; +mod simulation; +mod simulation_chat; +mod simulation_dht; +mod simulation_realm; +mod state; /// Fledger node CLI binary #[derive(Parser, Debug, Clone)] @@ -62,6 +70,10 @@ pub struct Args { #[arg(long, default_value = "false")] log_gossip: bool, + /// Log dht connections + #[arg(long, default_value = "false")] + log_dht_connections: bool, + /// Log random router #[arg(long, default_value = "false")] log_random: bool, @@ -74,6 +86,21 @@ pub struct Args { #[arg(long, default_value = "false")] log_dht_storage: bool, + /// Wait a random amount of ms, bounded by + /// bootwait_max, before starting the node. + /// This allows the signaling server to be + /// less overwhelmed + #[arg(long, default_value = "0")] + bootwait_max: u64, + + /// Run the node but refuse to forward data + #[arg(long, default_value = "false")] + evil_noforward: bool, + + /// Delay between loop iterations in ms + #[arg(long, default_value = "1000")] + loop_delay: u32, + #[command(subcommand)] command: Option, } @@ -94,6 +121,8 @@ enum Commands { Crypto {}, /// Prints the statistics of the different modules and then quits Stats {}, + /// Simulation tasks + Simulation(SimulationCommand), } struct Fledger { @@ -112,31 +141,13 @@ async fn main() -> anyhow::Result<()> { logger.parse_env("RUST_LOG"); logger.try_init().expect("Failed to initialize logger"); - let storage = DataStorageFile::new(args.config.clone(), "fledger".into()); - let mut node_config = Node::get_config(storage.clone_box())?; - args.name - .as_ref() - .map(|name| node_config.info.name = name.clone()); - log::info!( "Starting app with version {}/{}", SIGNAL_VERSION, VERSION_STRING ); - let cc = if args.disable_turn_stun { - ConnectionConfig::from_signal(&args.signal_url) - } else { - ConnectionConfig::new( - args.signal_url.clone().into(), - Some(HostLogin::from_url(&args.stun_url)), - Some(HostLogin::from_login_url(&args.turn_url)?), - ) - }; - log::debug!("Connecting to websocket at {:?}", cc); - let network = network_start(node_config.clone(), cc).await?; - let node = Node::start(Box::new(storage), node_config, network.broker).await?; - Fledger::run(node, args).await + Fledger::run(args).await } pub enum FledgerState { @@ -148,25 +159,62 @@ pub enum FledgerState { } impl Fledger { - async fn run(node: Node, args: Args) -> anyhow::Result<()> { + async fn make_node(args: Args) -> anyhow::Result { + let storage = DataStorageFile::new(args.config.clone(), "fledger".into()); + let mut node_config = Node::get_config(storage.clone_box())?; + args.name + .as_ref() + .map(|name| node_config.info.name = name.clone()); + + let cc = if args.disable_turn_stun { + ConnectionConfig::from_signal(&args.signal_url) + } else { + ConnectionConfig::new( + args.signal_url.clone().into(), + Some(HostLogin::from_url(&args.stun_url)), + Some(HostLogin::from_login_url(&args.turn_url)?), + ) + }; + log::debug!("Connecting to websocket at {:?}", cc); + let network = network_start(node_config.clone(), cc).await?; + let node = Node::start(Box::new(storage), node_config, network.broker).await?; + Ok(node) + } + + async fn make_fledger(args: Args) -> anyhow::Result { + let node = Self::make_node(args.clone()).await?; let nc = &node.node_config.info; log::info!("Started node {}: {}", nc.get_id(), nc.name); - let mut f = Fledger { + Ok(Fledger { ds: node.dht_storage.as_ref().unwrap().clone(), dr: node.dht_router.as_ref().unwrap().clone(), node, args, - }; + }) + } - match f.args.command.clone() { + async fn run(args: Args) -> anyhow::Result<()> { + match args.command.clone() { Some(cmd) => match cmd { - Commands::Realm { command } => RealmHandler::run(f, command).await, + Commands::Realm { command } => { + RealmHandler::run(Self::make_fledger(args).await?, command).await + } Commands::Crypto {} => todo!(), Commands::Stats {} => todo!(), - Commands::Page { command } => Page::run(f, command).await, + Commands::Page { command } => { + Page::run(Self::make_fledger(args).await?, command).await + } + Commands::Simulation(command) => { + SimulationHandler::run(Self::make_fledger(args).await?, command).await + } }, - None => f.loop_node(FledgerState::Forever).await, + None => { + Self::make_fledger(args) + .await? + .loop_node(FledgerState::Forever) + .await + } } } @@ -182,6 +230,7 @@ impl Fledger { FledgerState::Duration(i) => log::info!("Just hanging around {i} seconds"), FledgerState::Forever => log::info!("Looping forever"), } + loop { count += 1; @@ -209,19 +258,22 @@ impl Fledger { } { return Ok(()); } + self.ds.sync()?; - println!( - "dht-connections: {}/{}", - self.dr.stats.borrow().active, - self.dr - .stats - .borrow() - .all_nodes - .iter() - .map(|n| n.to_string()) - .collect::>() - .join(", ") - ); + if self.args.log_dht_connections { + log::info!( + "dht-connections: {}/{}", + self.dr.stats.borrow().active, + self.dr + .stats + .borrow() + .all_nodes + .iter() + .map(|n| n.to_string()) + .collect::>() + .join(", ") + ); + } } } @@ -231,7 +283,7 @@ impl Fledger { log::info!( "Nodes are: {}", self.node - .nodes_online()? + .nodes_connected()? .iter() .map(|n| format!("{}/{}", n.name, n.get_id())) .collect::>() diff --git a/cli/fledger/src/realm.rs b/cli/fledger/src/realm.rs index f013374f..9e46495d 100644 --- a/cli/fledger/src/realm.rs +++ b/cli/fledger/src/realm.rs @@ -49,6 +49,7 @@ impl RealmHandler { cond_pass: bool, ) -> anyhow::Result<()> { f.loop_node(crate::FledgerState::Connected(1)).await?; + //f.loop_node(crate::FledgerState::Duration(1)).await?; let config = RealmConfig { max_space: max_space.unwrap_or(1000000), @@ -70,10 +71,10 @@ impl RealmHandler { "danu".to_string(), INDEX_HTML.to_string(), None, - cond.clone(), - signers.clone(), + Condition::Pass, + vec![], ) - .root_tag("danu".to_string(), None, cond.clone(), signers) + .root_tag("danu".to_string(), None, Condition::Pass, vec![]) .build() .await?; diff --git a/cli/fledger/src/simulation.rs b/cli/fledger/src/simulation.rs new file mode 100644 index 00000000..a907f4bc --- /dev/null +++ b/cli/fledger/src/simulation.rs @@ -0,0 +1,134 @@ +use crate::simulation_chat::simulation::SimulationChat; +use crate::simulation_dht::simulation::SimulationDht; +use crate::simulation_realm::simulation::SimulationRealm; +use crate::Fledger; +use clap::{arg, Args, Subcommand}; +use flarch::random; +use flarch::tasks::wait_ms; + +#[derive(Args, Debug, Clone)] +pub struct SimulationCommand { + /// Print new messages as they come + #[arg(long, default_value = "false")] + pub print_new_messages: bool, + + #[command(subcommand)] + pub subcommand: SimulationSubcommand, +} + +#[derive(Subcommand, Debug, Clone)] +pub enum SimulationSubcommand { + Chat { + /// Send a simulation_chat message upon node creation + #[arg(long)] + send_msg: Option, + + /// Wait for a simulation_chat message with the given body. + /// log "RECV_CHAT_MSG TRIGGERED" upon message received, at log level info + #[arg(long)] + recv_msg: Option, + }, + + DhtJoinRealm {}, + + DhtCreatePages { + #[arg(long)] + filler_amount: u32, + + #[arg(long)] + target_amount: u32, + + #[arg(long)] + page_size: u32, + + #[arg(long)] + pages_propagation_delay: u32, + + #[arg(long)] + connection_delay: u32, + + #[arg(long)] + experiment_id: u32, + }, + + DhtFetchPages { + #[arg(long, default_value = "1200000")] + propagation_timeout_ms: u32, + + #[arg(long, default_value = "600000")] + timeout_ms: u32, + + #[arg(long, default_value = "false")] + enable_sync: bool, + + #[arg(long)] + experiment_id: u32, + + #[arg(long, default_value = "false")] + with_local_blacklists: bool, + }, + + JustFetchOnce {}, +} + +pub struct SimulationHandler {} + +impl SimulationHandler { + pub async fn run(f: Fledger, command: SimulationCommand) -> anyhow::Result<()> { + // wait a random amount of time before running a simulation + // to avoid overloading the signaling server + if f.args.bootwait_max != 0 { + let randtime = random::() % f.args.bootwait_max; + log::info!("Waiting {}ms before running this node...", randtime); + wait_ms(randtime).await; + } + + let loop_delay = f.args.loop_delay; + match command.subcommand.clone() { + SimulationSubcommand::Chat { send_msg, recv_msg } => { + SimulationChat::run_chat(f, command, send_msg, recv_msg).await + } + SimulationSubcommand::DhtJoinRealm {} => SimulationRealm::run_dht_join_realm(f).await, + SimulationSubcommand::DhtCreatePages { + filler_amount, + target_amount, + page_size, + pages_propagation_delay, + connection_delay, + experiment_id, + } => { + SimulationDht::run_create_pages( + f, + filler_amount, + target_amount, + page_size, + pages_propagation_delay, + connection_delay, + experiment_id, + ) + .await + } + SimulationSubcommand::DhtFetchPages { + propagation_timeout_ms, + timeout_ms, + enable_sync, + experiment_id, + with_local_blacklists, + } => { + let evil_noforward = f.args.evil_noforward.clone(); + SimulationDht::run_fetch_pages( + f, + loop_delay, + enable_sync, + propagation_timeout_ms, + timeout_ms, + experiment_id, + evil_noforward, + with_local_blacklists, + ) + .await + } + SimulationSubcommand::JustFetchOnce {} => SimulationDht::just_fetch_once(f).await, + } + } +} diff --git a/cli/fledger/src/simulation_chat/mod.rs b/cli/fledger/src/simulation_chat/mod.rs new file mode 100644 index 00000000..47a12cbb --- /dev/null +++ b/cli/fledger/src/simulation_chat/mod.rs @@ -0,0 +1 @@ +pub mod simulation; diff --git a/cli/fledger/src/simulation_chat/simulation.rs b/cli/fledger/src/simulation_chat/simulation.rs new file mode 100644 index 00000000..68f6b3d8 --- /dev/null +++ b/cli/fledger/src/simulation_chat/simulation.rs @@ -0,0 +1,70 @@ +use crate::simulation::SimulationCommand; +use crate::Fledger; +use flarch::nodeids::U256; +use flarch::tasks::wait_ms; +use flmodules::gossip_events::core::Event; + +#[derive(Clone)] +pub struct SimulationChat {} + +impl SimulationChat { + pub async fn run_chat( + mut f: Fledger, + simulation_args: SimulationCommand, + send_msg: Option, + recv_msg: Option, + ) -> anyhow::Result<()> { + f.loop_node(crate::FledgerState::Connected(1)).await?; + + if let Some(ref msg) = recv_msg { + log::info!("Waiting for simulation_chat message {}.", msg); + } + + if let Some(ref msg) = send_msg { + log::info!("Sending simulation_chat message {}.", msg); + f.node.add_chat_message(msg.into()).await?; + } + + let mut acked_msg_ids: Vec = Vec::new(); + + loop { + wait_ms(1000).await; + + if simulation_args.print_new_messages { + Self::log_new_messages(&f, &mut acked_msg_ids); + } + + if let Some(ref msg) = recv_msg { + let gossip = f.node.gossip.as_ref(); + if gossip + .unwrap() + .chat_events() + .iter() + .any(|ev| ev.msg.eq(msg)) + { + log::info!("SIMULATION END"); + f.loop_node(crate::FledgerState::Forever).await?; + return Ok(()); + } + } + } + } + + fn log_new_messages(f: &Fledger, acked_msg_ids: &mut Vec) { + let chat_events = f.node.gossip.as_ref().unwrap().chat_events(); + let chats: Vec<&Event> = chat_events + .iter() + .filter(|ev| !acked_msg_ids.contains(&ev.get_id())) + .collect(); + + if chats.len() <= 0 { + log::debug!("... No new message"); + } else { + log::info!("--- New Messages ---"); + for chat in chats { + acked_msg_ids.push(chat.get_id()); + log::info!(" [{}] {}", chat.src, chat.msg); + } + } + } +} diff --git a/cli/fledger/src/simulation_dht/mod.rs b/cli/fledger/src/simulation_dht/mod.rs new file mode 100644 index 00000000..47a12cbb --- /dev/null +++ b/cli/fledger/src/simulation_dht/mod.rs @@ -0,0 +1 @@ +pub mod simulation; diff --git a/cli/fledger/src/simulation_dht/simulation.rs b/cli/fledger/src/simulation_dht/simulation.rs new file mode 100644 index 00000000..c99cb726 --- /dev/null +++ b/cli/fledger/src/simulation_dht/simulation.rs @@ -0,0 +1,359 @@ +use crate::hermes::api::HermesApi; +use crate::state::{Page, SimulationState}; +use crate::Fledger; +use flarch::tasks::{time::timeout, wait_ms}; +use flcrypto::tofrombytes::ToFromBytes; +use flmodules::flo::realm::RealmID; +use flmodules::{ + dht_storage::realm_view::RealmView, + flo::{ + blob::{BlobAccess, BlobPage}, + flo::{FloID, FloWrapper}, + realm::GlobalID, + }, +}; +use std::collections::HashSet; +use std::str::FromStr; +use std::time::{Duration, Instant}; + +#[derive(Clone)] +pub struct SimulationDht {} + +impl SimulationDht { + fn get_page_content(page: &FloWrapper) -> String { + String::from_utf8(page.datas().iter().next().unwrap().1.clone().to_vec()).unwrap() + } + + fn get_page_name(page: &FloWrapper) -> String { + page.values().iter().next().unwrap().1.clone() + } + + fn log_page_info(flo_page: &FloWrapper) { + let page_content = Self::get_page_content(flo_page); + log::info!( + "page {}/{}/{} | {} | {} ({}B -> {}B)", + flo_page.flo_id(), + flo_page.realm_id(), + flo_page.version(), + flo_page.values().iter().next().unwrap().1, + page_content.chars().take(50).collect::(), + page_content.size(), + flo_page.size(), + ); + } + + async fn create_flo_page( + rv: &mut RealmView, + name: &str, + content: String, + ) -> anyhow::Result> { + let flo_page = rv + .create_http(name, content, None, flcrypto::access::Condition::Pass, &[]) + .await?; + Self::log_page_info(&flo_page); + Ok(flo_page) + } + + async fn settle_and_sync(f: &mut Fledger) -> anyhow::Result<()> { + // ds.settle, ds.sync + f.node + .dht_storage + .as_mut() + .unwrap() + .broker + .settle(Vec::new()) + .await?; + f.node.dht_storage.as_mut().unwrap().sync()?; + Ok(()) + } + + fn make_page_id(realm_id: RealmID, page_id: FloID) -> GlobalID { + GlobalID::new(realm_id.clone(), page_id.clone()) + } + + async fn fetch_page(f: &mut Fledger, id: GlobalID) -> anyhow::Result> { + f.node.dht_storage.as_mut().unwrap().get_flo(&id).await + } + + pub async fn run_create_pages( + mut f: Fledger, + filler_amount: u32, + target_amount: u32, + page_size: u32, + propagation_delay: u32, + connection_delay: u32, + experiment_id: u32, + ) -> anyhow::Result<()> { + f.loop_node(crate::FledgerState::DHTAvailable).await?; + log::info!("DHT CONNECTED"); + + log::info!("[Waiting for connections to settle]"); + log::info!("{} ms", connection_delay); + wait_ms(connection_delay as u64).await; + + //let router = f.node.dht_router.unwrap(); + let mut rv = RealmView::new_first(f.node.dht_storage.clone().unwrap()).await?; + + log::info!("[Create filler pages]"); + for i in 0..filler_amount { + wait_ms(500).await; + Self::create_flo_page( + &mut rv, + &format!("filler-{}", i.to_string()), + String::from_utf8(vec![b'-'; page_size as usize])?, + ) + .await?; + } + + log::info!("[Waiting for fillers to settle]"); + log::info!("{} ms", propagation_delay); + Self::settle_and_sync(&mut f).await?; + wait_ms(propagation_delay as u64).await; + + log::info!("[Create target_pages simulation flo page]"); + let mut target_pages = vec![]; + for i in 0..target_amount { + wait_ms(3000).await; + target_pages.push( + Self::create_flo_page( + &mut rv, + &format!("target-{}", i.to_string()), + String::from_utf8(vec![b'o'; page_size as usize])?, + ) + .await?, + ); + } + + log::info!("[Waiting for targets to settle]"); + log::info!("{} ms", propagation_delay); + Self::settle_and_sync(&mut f).await?; + wait_ms(propagation_delay as u64).await; + + let pages = target_pages + .iter() + .map(|page| Page { + id: page.flo_id().to_string(), + name: Self::get_page_name(page), + }) + .collect::>(); + + let hermes = HermesApi::default(); + hermes.store_target_pages(experiment_id, pages)?; + + let mut all_targets_propagated = false; + let start_instant = Instant::now(); + while !all_targets_propagated { + let lost_target_pages = hermes.get_lost_target_pages(experiment_id)?; + + if start_instant.elapsed() > Duration::from_secs(60 * 15) { + log::warn!("PROPAGATION TIMEOUT REACHED (15min)"); + log::info!("SIMULATION END"); + return Err(anyhow::anyhow!("timeout")); + } + + if lost_target_pages.lost_target_pages.is_empty() { + all_targets_propagated = true; + } else { + let pages_to_repropagate = target_pages + .iter() + .filter(|page| { + lost_target_pages + .lost_target_pages + .contains(&page.flo_id().to_string().clone()) + }) + .collect::>>(); + for page in pages_to_repropagate { + log::info!("REPROPAGATE: {}", Self::get_page_name(page)); + f.ds.store_flo(page.flo().clone())?; + wait_ms(500).await + } + wait_ms(15000).await; + } + } + + hermes.start_fetching(experiment_id)?; + log::info!("SIMULATION END"); + + f.loop_node(crate::FledgerState::Forever).await?; + Ok(()) + } + + pub async fn run_fetch_pages( + mut f: Fledger, + loop_delay: u32, + enable_sync: bool, + propagation_timeout_ms: u32, + timeout_ms: u32, + experiment_id: u32, + evil_noforward: bool, + with_local_blacklists: bool, + ) -> anyhow::Result<()> { + let mut start_instant = Instant::now(); + + unsafe { + if with_local_blacklists { + log::info!("enabling local blacklists"); + flmodules::dht_storage::messages::LOCAL_BLACKLISTS = true; + flmodules::dht_router::messages::LOCAL_BLACKLISTS = true; + } + } + + let node_name = f.node.node_config.info.name.clone(); + let mut state = SimulationState::new(experiment_id, node_name); + + let timeout_result = timeout( + Duration::from_millis(timeout_ms.into()), + f.loop_node(crate::FledgerState::DHTAvailable), + ) + .await; + + if timeout_result.is_err() { + log::warn!("SIMULATION TIMEOUT WHILE CONNECTING TO DHT"); + log::info!("SIMULATION END"); + state.timeout(); + return Err(timeout_result.unwrap_err().into()); + } + + log::info!("DHT CONNECTED"); + + let realm_id = RealmView::new_first(f.node.dht_storage.as_ref().unwrap().clone()) + .await? + .realm + .realm_id(); + + let mut target_page_ids = HashSet::new(); + let mut fetched_page_ids = HashSet::new(); + + let mut iteration = 0u32; + + loop { + if target_page_ids.is_empty() + && start_instant.elapsed().as_millis() > propagation_timeout_ms as u128 + { + log::warn!("PROPAGATION TIMEOUT REACHED ({}ms)", propagation_timeout_ms); + log::info!("SIMULATION END"); + state.timeout(); + state.update_and_upload(&mut f).await; + f.loop_node(crate::FledgerState::Forever).await?; + + return Ok(()); + } + + if !target_page_ids.is_empty() + && start_instant.elapsed().as_millis() > timeout_ms as u128 + { + log::warn!("SIMULATION TIMEOUT REACHED ({}ms)", timeout_ms); + log::info!("SIMULATION END"); + state.timeout(); + state.update_and_upload(&mut f).await; + f.loop_node(crate::FledgerState::Forever).await?; + + return Ok(()); + } + + wait_ms(loop_delay.into()).await; + iteration += 1; + + if enable_sync { + f.node.dht_storage.as_mut().unwrap().sync()?; + } + + if iteration == 1 { + // initial full metrics upload, just to get some info on hermes + let _ = state.update_and_upload(&mut f).await; + } + + if iteration % 10 == 0 && target_page_ids.is_empty() { + // target propagation not over + // sending light metrics (only stored target pages) + // and checking if propagation ended + let response = state.send_target_pages(&mut f).await; + if response.target_page_ids.is_some() + && !response.target_page_ids.as_ref().unwrap().is_empty() + { + // target propagation ended, it's now time to fetch. + // resetting the timeout + log::info!( + "TIMEOUT RESET AFTER {}s, STARTING TO FETCH PAGES", + start_instant.elapsed().as_secs() + ); + start_instant = Instant::now(); + for target_page_id in response.target_page_ids.unwrap() { + target_page_ids.insert(target_page_id.clone()); + } + + unsafe { + if evil_noforward && !flmodules::dht_storage::messages::EVIL_NO_FORWARD { + log::info!("becoming a malicious node"); + log::info!("SIMULATION END"); + flmodules::dht_storage::messages::EVIL_NO_FORWARD = true; + flmodules::dht_router::messages::EVIL_NO_FORWARD = true; + } + } + } + } + + if iteration % 30 == 0 && !target_page_ids.is_empty() { + // fetching phase - full metrics upload + let _ = state.update_and_upload(&mut f).await; + } + + for page_id in target_page_ids.clone() { + if fetched_page_ids.contains(&page_id.clone()) { + continue; + } + let flo_id = FloID::from_str(&page_id.clone())?; + let global_page_id = Self::make_page_id(realm_id.clone(), flo_id); + let page = Self::fetch_page(&mut f, global_page_id).await; + state.increment_fetch_requests_total(); + if page.is_ok() { + fetched_page_ids.insert(page_id.clone()); + } + } + + // log fetched and target pages + log::info!("fetched pages: {fetched_page_ids:?}"); + log::info!("target pages: {target_page_ids:?}"); + + state.target_successfully_fetched_total = fetched_page_ids.len() as u32; + + if !target_page_ids.is_empty() && fetched_page_ids.is_superset(&target_page_ids) { + log::info!("all target pages fetched."); + log::info!("SIMULATION END"); + state.success(); + state.update_and_upload(&mut f).await; + f.loop_node(crate::FledgerState::Forever).await?; + } + } + } + + pub async fn just_fetch_once(mut f: Fledger) -> anyhow::Result<()> { + f.loop_node(crate::FledgerState::DHTAvailable).await?; + f.loop_node(crate::FledgerState::Connected(2)).await?; + + let realm_id = RealmView::new_first(f.node.dht_storage.as_ref().unwrap().clone()) + .await? + .realm + .realm_id(); + + let flo_id = + FloID::from_str("5efe0a6143df5641af9d6036ba8da82222bb30211c21ba5ec236851efda38420")?; + let global_page_id = Self::make_page_id(realm_id.clone(), flo_id); + + for i in 0..10 { + wait_ms(1000).await; + + let page = Self::fetch_page(&mut f, global_page_id.clone()).await; + + if page.is_ok() { + log::info!("God made a miracle"); + } else { + log::info!("Fetch {i} done."); + } + } + + f.loop_node(crate::FledgerState::Forever).await?; + + Ok(()) + } +} diff --git a/cli/fledger/src/simulation_realm/mod.rs b/cli/fledger/src/simulation_realm/mod.rs new file mode 100644 index 00000000..47a12cbb --- /dev/null +++ b/cli/fledger/src/simulation_realm/mod.rs @@ -0,0 +1 @@ +pub mod simulation; diff --git a/cli/fledger/src/simulation_realm/simulation.rs b/cli/fledger/src/simulation_realm/simulation.rs new file mode 100644 index 00000000..0b81434f --- /dev/null +++ b/cli/fledger/src/simulation_realm/simulation.rs @@ -0,0 +1,14 @@ +use crate::Fledger; + +#[derive(Clone)] +pub struct SimulationRealm {} + +impl SimulationRealm { + pub async fn run_dht_join_realm(mut f: Fledger) -> anyhow::Result<()> { + f.loop_node(crate::FledgerState::DHTAvailable).await?; + log::info!("SIMULATION END"); + + f.loop_node(crate::FledgerState::Forever).await?; + Ok(()) + } +} diff --git a/cli/fledger/src/state.rs b/cli/fledger/src/state.rs new file mode 100644 index 00000000..f58e5319 --- /dev/null +++ b/cli/fledger/src/state.rs @@ -0,0 +1,158 @@ +use crate::hermes::api::HermesApi; +use crate::hermes::update_response::UpdateResponse; +use crate::influx::api::InfluxApi; +use crate::influx::lines::InfluxLines; +use crate::Fledger; +use flcrypto::tofrombytes::ToFromBytes; +use flmodules::dht_storage::broker::DHTStorage; +use flmodules::dht_storage::messages::DsMetrics; +use flmodules::flo::blob::{BlobAccess, BlobPage}; +use serde::{Deserialize, Serialize}; +use std::any::type_name; + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct Page { + pub name: String, + pub id: String, +} + +#[derive(Clone, Debug, Default)] +pub struct SimulationState { + pub experiment_id: u32, + pub node_id: u32, + pub node_name: String, + pub node_status: String, + + pub pages_stored: Vec, + pub connected_nodes_total: u32, + pub ds_size_bytes: u64, + pub evil_no_forward: bool, + pub target_successfully_fetched_total: u32, + pub fetch_requests_total: u32, + + pub ds_metrics: DsMetrics, + + pub api: HermesApi, + pub influx: InfluxApi, +} + +impl SimulationState { + pub fn new(experiment_id: u32, node_name: String) -> Self { + let mut state = Self::default(); + + let node_id = state.api.create_node(experiment_id, node_name.clone()); + + state.experiment_id = experiment_id; + state.node_id = node_id; + state.node_name = node_name; + state.node_status = "active".to_string(); + state + } + + pub fn increment_fetch_requests_total(&mut self) { + self.fetch_requests_total += 1; + } + + pub async fn refresh_pages(&mut self, ds: &mut DHTStorage) { + let pages_stored = ds + .get_flos() + .await + .unwrap_or_else(|e| { + log::error!("failed to get flos {e}"); + vec![] + }) + .iter() + .filter(|flo| flo.flo_type() == type_name::()) + .map(|flo| { + let page = BlobPage::from_rmp_bytes(&flo.flo_type(), &flo.data()).unwrap(); + Page { + id: flo.flo_id().to_string(), + name: page.0.values().iter().next().unwrap().1.clone(), + } + }) + .collect::>(); + + self.pages_stored = pages_stored.clone(); + log::info!( + "pages stored: {}", + pages_stored + .iter() + .map(|page| page.name.clone()) + .collect::>() + .join(", ") + ); + } + + pub async fn get_stored_targets(&mut self, ds: &mut DHTStorage) -> Vec { + ds.get_flos() + .await + .unwrap_or_else(|e| { + log::error!("failed to get flos {e}"); + vec![] + }) + .iter() + .filter(|flo| flo.flo_type() == type_name::()) + .map(|flo| { + let page = BlobPage::from_rmp_bytes(&flo.flo_type(), &flo.data()).unwrap(); + Page { + id: flo.flo_id().to_string(), + name: page.0.values().iter().next().unwrap().1.clone(), + } + }) + .filter(|page| page.name.starts_with("target")) + .collect::>() + } + + pub fn success(&mut self) { + self.node_status = "success".to_string(); + } + + pub fn timeout(&mut self) { + self.node_status = "timeout".to_string(); + } + + pub async fn update_and_upload(&mut self, f: &mut Fledger) -> UpdateResponse { + let connected_nodes_total = f + .node + .dht_router + .clone() + .unwrap() + .stats + .borrow() + .active + .clone() as u32; + let ds = f.node.dht_storage.as_mut().unwrap(); + let ds_size = ds.stats.borrow().realm_stats.iter().next().unwrap().1.size; + self.ds_metrics = ds.stats.borrow().experiment_stats.clone(); + self.connected_nodes_total = connected_nodes_total; + self.ds_size_bytes = ds_size; + self.evil_no_forward = f.args.evil_noforward.clone(); + self.refresh_pages(ds).await; + + self.upload() + } + + pub async fn send_target_pages(&mut self, f: &mut Fledger) -> UpdateResponse { + let target_pages = self + .get_stored_targets(f.node.dht_storage.as_mut().unwrap()) + .await; + self.api + .set_node_target_pages(target_pages) + .unwrap_or_else(|e| { + log::error!("failed to set target pages: {e}"); + UpdateResponse::default() + }) + } + + pub fn upload(&mut self) -> UpdateResponse { + let node_name = self.node_name.clone(); + let _ = self + .influx + .write(InfluxLines::make_influx_data(node_name, self.clone())); + + self.api.update(self.clone()).unwrap_or_else(|e| { + log::error!("failed to update simulation state: {e}"); + UpdateResponse::default() + }) + } +} diff --git a/cli/flsignal/Cargo.toml b/cli/flsignal/Cargo.toml index 7dd1c14d..c6f13e0d 100644 --- a/cli/flsignal/Cargo.toml +++ b/cli/flsignal/Cargo.toml @@ -12,13 +12,16 @@ keywords = ["network", "signalling", "webrtc"] categories = ["network-programming"] [dependencies] -flmodules = {path = "../../flmodules", version = "0.9"} -flarch = {path = "../../flarch", version = "0.9"} +flmodules = { path = "../../flmodules", version = "0.9" } +flarch = { path = "../../flarch", version = "0.9" } -anyhow = {version = "1", features = ["backtrace"]} +anyhow = { version = "1", features = ["backtrace"] } clap = "4" clap-verbosity-flag = "3" env_logger = "0.11" log = "0.4" thiserror = "2" tokio = "1" + +metrics = "0.21.1" +metrics-exporter-influx = { version = "0.2.2", path = "../../metrics-exporter-influx" } diff --git a/cli/flsignal/src/main.rs b/cli/flsignal/src/main.rs index 1be7e90e..0204f58d 100644 --- a/cli/flsignal/src/main.rs +++ b/cli/flsignal/src/main.rs @@ -1,4 +1,4 @@ -use std::str::FromStr; +use std::{fs::File, str::FromStr, time::Duration}; use clap::Parser; use flarch::web_rtc::web_socket_server::WebSocketServer; @@ -6,6 +6,7 @@ use flmodules::{ flo::realm::RealmID, network::signal::{SignalConfig, SignalOut, SignalServer}, }; +use metrics_exporter_influx::{InfluxBuilder, InfluxRecorderHandle}; /// Fledger signalling server #[derive(Parser, Debug)] @@ -24,6 +25,18 @@ struct Args { max_list_len: Option, } +fn setup_metrics(node_name: String) -> InfluxRecorderHandle { + log::info!("Setting up metrics"); + let metrics_file = File::create(format!("/tmp/{}.metrics", node_name)) + .expect(format!("could not create /tmp/{}.metrics", node_name).as_ref()); + return InfluxBuilder::new() + .with_duration(Duration::from_secs(10)) + .with_writer(metrics_file) + .add_global_tag("node_name", node_name) + .install() + .expect("could not setup influx recorder"); +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); @@ -33,6 +46,8 @@ async fn main() -> anyhow::Result<()> { logger.parse_env("RUST_LOG"); logger.try_init().expect("Failed to initialize logger"); + let _influx = setup_metrics("flsignal".into()); + let wss = WebSocketServer::new(8765).await?; let system_realm = args.system_realm.and_then(|sr| RealmID::from_str(&sr).ok()); log::info!("System realm config is: {:?}", system_realm); diff --git a/deploy-binaries.sh b/deploy-binaries.sh new file mode 100755 index 00000000..e809d3bb --- /dev/null +++ b/deploy-binaries.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +if test "$#" -ne 0; then + echo "usage: $0" + exit 1 +fi + +if ! test -f "target-common/x86_64-unknown-linux-musl/release/flsignal"; then + echo "ERROR: you did not compile flsignal with musl. The experiment will not work." + echo "Aborting..." + exit 1 +fi + +if ! test -f "target-common/x86_64-unknown-linux-musl/release/fledger"; then + echo "ERROR: you did not compile fledger with musl. The experiment will not work." + echo "Aborting..." + exit 1 +fi + +echo "Copy binaries for local experiments..." +cp \ + target-common/x86_64-unknown-linux-musl/release/fledger \ + target-common/x86_64-unknown-linux-musl/release/flsignal \ + /var/fledger/ + +echo "Uploading fledger and flsignal binaries..." +scp \ + target-common/x86_64-unknown-linux-musl/release/fledger \ + target-common/x86_64-unknown-linux-musl/release/flsignal \ + fledger:/usr/share/caddy/ + +# echo "Pushing fledger and flsignal binaries to github..." +# cp \ +# target-common/x86_64-unknown-linux-musl/release/fledger \ +# target-common/x86_64-unknown-linux-musl/release/flsignal \ +# ../fledger-binaries/ +# cd ../fledger-binaries/ || exit 1 +# git add . || exit 1 +# git commit -m "deploy" || exit 1 +# git push || exit 1 diff --git a/flarch/Cargo.toml b/flarch/Cargo.toml index 39e152a5..0983fe8d 100644 --- a/flarch/Cargo.toml +++ b/flarch/Cargo.toml @@ -16,7 +16,7 @@ node = ["flmacro/node"] [dependencies] flmacro = { version = "0.9", path = "../flmacro" } -anyhow = {version = "1", features = ["backtrace"]} +anyhow = { version = "1", features = ["backtrace"] } bytes = "1.9" chrono = "0.4" enum-display = "0.1" @@ -37,6 +37,7 @@ serde = { version = "1", features = ["derive"] } sha2 = "0.10" rmp-serde = "1" btparse = "0.2" +metrics = "0.21.1" # For libc [target.'cfg(target_family="unix")'.dependencies] @@ -52,26 +53,26 @@ wasm-bindgen-test = "0.3" wasmtimer = "0.4" serde-wasm-bindgen = { version = "0.6" } web-sys = { version = "0.3", features = [ - 'Window', - "Storage", - "console", - "MessageEvent", - "RtcConfiguration", - "RtcDataChannel", - "RtcDataChannelEvent", - "RtcDataChannelState", - "RtcIceCandidate", - "RtcIceCandidateInit", - "RtcIceConnectionState", - "RtcIceGatheringState", - "RtcPeerConnection", - "RtcPeerConnectionIceEvent", - "RtcSdpType", - "RtcSessionDescriptionInit", - "RtcSignalingState", - "ErrorEvent", - "MessageEvent", - "WebSocket", + 'Window', + "Storage", + "console", + "MessageEvent", + "RtcConfiguration", + "RtcDataChannel", + "RtcDataChannelEvent", + "RtcDataChannelState", + "RtcIceCandidate", + "RtcIceCandidateInit", + "RtcIceConnectionState", + "RtcIceGatheringState", + "RtcPeerConnection", + "RtcPeerConnectionIceEvent", + "RtcSdpType", + "RtcSessionDescriptionInit", + "RtcSignalingState", + "ErrorEvent", + "MessageEvent", + "WebSocket", ] } # [dev-dependencies] diff --git a/flarch/src/broker.rs b/flarch/src/broker.rs index 872f7abe..b092dbd2 100644 --- a/flarch/src/broker.rs +++ b/flarch/src/broker.rs @@ -42,6 +42,7 @@ use std::{ }; use futures::lock::Mutex; +use metrics::counter; use thiserror::Error; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -713,6 +714,11 @@ impl Intern { .map(|(_, msg)| msg.clone()) .collect(); + counter!( + "flarch_broker_tap_out_bytes", + msgs.clone().iter().map(|msg| size_of_val(msg) as u64).sum() + ); + let type_str = self.type_str(); for (i, ss) in self.subsystems.iter_mut() { if let Err(e) = ss.send_tap_out(msgs.clone()).await { @@ -738,6 +744,11 @@ impl Intern { .map(|(_, msg)| msg.clone()) .collect(); + counter!( + "flarch_broker_tap_in_bytes", + msgs.iter().map(|msg| size_of_val(msg) as u64).sum() + ); + let type_str = self.type_str(); for (i, ss) in self.subsystems.iter_mut() { if let Err(e) = ss.send_tap_in(msgs.clone()).await { diff --git a/flarch/src/nodeids.rs b/flarch/src/nodeids.rs index 6c1e6abd..37ea350c 100644 --- a/flarch/src/nodeids.rs +++ b/flarch/src/nodeids.rs @@ -24,7 +24,7 @@ pub struct U256(#[serde_as(as = "Hex")] [u8; 32]); impl fmt::Display for U256 { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for index in 0..8 { + for index in 0..32 { f.write_fmt(format_args!("{:02x}", self.0[index]))?; } Ok(()) diff --git a/flarch/src/web_rtc/libc/web_socket_client.rs b/flarch/src/web_rtc/libc/web_socket_client.rs index 5251aa52..c61f4035 100644 --- a/flarch/src/web_rtc/libc/web_socket_client.rs +++ b/flarch/src/web_rtc/libc/web_socket_client.rs @@ -9,6 +9,8 @@ use crate::broker::{Broker, SubsystemHandler}; use crate::tasks::wait_ms; use crate::web_rtc::websocket::{BrokerWSClient, WSClientIn, WSClientOut, WSError}; +use metrics::counter; + pub struct WebSocketClient { url: String, write: Option>, tungstenite::Message>>, @@ -76,6 +78,7 @@ impl WebSocketClient { impl SubsystemHandler for WebSocketClient { async fn messages(&mut self, msgs: Vec) -> Vec { for msg in msgs { + counter!("flarch_ws_client_sent_bytes", size_of_val(&msg) as u64); match msg { WSClientIn::Message(msg) => { if let Some(mut write) = self.write.as_mut() { diff --git a/flarch/src/web_rtc/libc/web_socket_server.rs b/flarch/src/web_rtc/libc/web_socket_server.rs index 27e35c99..f852dafc 100644 --- a/flarch/src/web_rtc/libc/web_socket_server.rs +++ b/flarch/src/web_rtc/libc/web_socket_server.rs @@ -19,6 +19,8 @@ use crate::{ web_rtc::websocket::BrokerWSServer, }; +use metrics::counter; + pub struct WebSocketServer { connections: Arc>>, conn_thread: JoinHandle<()>, @@ -69,6 +71,7 @@ impl WebSocketServer { impl SubsystemHandler for WebSocketServer { async fn messages(&mut self, from_broker: Vec) -> Vec { for msg in from_broker { + counter!("flarch_ws_server_recv_bytes", size_of_val(&msg) as u64); match msg { WSServerIn::Message(id, msg) => { let mut connections = self.connections.lock().await; diff --git a/flbrowser/Cargo.lock b/flbrowser/Cargo.lock index 8f95c4a1..f7ee359c 100644 --- a/flbrowser/Cargo.lock +++ b/flbrowser/Cargo.lock @@ -52,6 +52,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -794,6 +806,7 @@ dependencies = [ "futures", "js-sys", "log", + "metrics", "rand 0.8.5", "regex", "rmp-serde", @@ -890,6 +903,7 @@ dependencies = [ "getrandom 0.2.16", "itertools", "log", + "metrics", "names", "num-bigint", "rand 0.8.5", @@ -1556,6 +1570,28 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" +dependencies = [ + "ahash", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-macros" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "mime" version = "0.3.17" diff --git a/flmodules/Cargo.toml b/flmodules/Cargo.toml index e9aeb0e8..913efa6a 100644 --- a/flmodules/Cargo.toml +++ b/flmodules/Cargo.toml @@ -46,10 +46,11 @@ serde_json = "1" names = { version = "0.14", default-features = false } bytes = { version = "1.7.1", features = ["serde"] } reqwest = { version = "0.12", features = [ - "stream", - "rustls-tls", + "stream", + "rustls-tls", ], default-features = false } num-bigint = { version = "0.4.6", features = ["serde"] } +metrics = "0.21.1" [dev-dependencies] flmodules = { path = ".", features = ["testing"] } diff --git a/flmodules/src/dht_router/messages.rs b/flmodules/src/dht_router/messages.rs index 38ada69d..63f1ab38 100644 --- a/flmodules/src/dht_router/messages.rs +++ b/flmodules/src/dht_router/messages.rs @@ -1,13 +1,17 @@ +use std::collections::{HashMap, HashSet}; + use flarch::{ broker::{SubsystemHandler, TranslateFrom, TranslateInto}, nodeids::{NodeID, U256}, platform_async_trait, }; -use rand::seq::SliceRandom; +use itertools::Itertools; +use rand::{seq::SliceRandom, Rng}; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use crate::{ + dht_storage::messages::MessageClosest, nodeconfig::NodeInfo, router::messages::{NetworkWrapper, RouterIn, RouterOut}, timer::TimerMessage, @@ -18,6 +22,9 @@ use super::{ kademlia::*, }; +pub static mut EVIL_NO_FORWARD: bool = false; +pub static mut LOCAL_BLACKLISTS: bool = false; + /// These are the messages which will be exchanged between the nodes for this /// module. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -71,6 +78,11 @@ pub(super) struct Messages { // This is different than core.active, because there can be connections from other // modules, or connections from another node. connected: Vec, + + // readFlo id -> node id + requests_in_flight: HashMap, + + blacklisted_nodes: HashSet, } impl Messages { @@ -82,6 +94,8 @@ impl Messages { core: Kademlia::new(root, cfg), tx: Some(tx), connected: vec![], + requests_in_flight: HashMap::new(), + blacklisted_nodes: HashSet::new(), }, rx, ) @@ -231,22 +245,137 @@ impl Messages { } } + fn get_readflo_id(&self, msg: NetworkWrapper) -> Option { + if msg.module == "DHTStorage" { + match msg.unwrap_yaml("DHTStorage") { + Some(msg_readflo) => match msg_readflo { + MessageClosest::ReadFlo(_id, request_id) => { + log::warn!("sending MessageClosest::ReadFlo {}", msg.msg.clone()); + Some(request_id.clone()) + } + _ => None, + }, + None => None, + } + } else { + None + } + } + + fn blacklist_bad_nodes(&mut self) { + unsafe { + if !self::LOCAL_BLACKLISTS { + return; + } + } + + let in_flight = self.requests_in_flight.clone(); + in_flight + .iter() + .map(|(_request_id, node_id)| node_id) + .counts() + .iter() + .for_each(|(node_id_request, count)| { + let node_id1 = (*node_id_request).clone(); + if count.clone() > 10 { + // made 10 requests to this node + // remove the node from requests_in_flight to reset the counter + // and blacklist the node + log::warn!("blacklisting {node_id1}."); + let request_ids = self + .requests_in_flight + .clone() + .iter() + .filter(|(_request_id, node_id2)| node_id1 == **node_id2) + .map(|(request_id, _node_id)| request_id) + .copied() + .collect_vec(); + for value in request_ids { + self.requests_in_flight.remove_entry(&value); + } + self.core.remove_node(&node_id1); + self.blacklisted_nodes.insert(node_id1); + } + }); + } + + fn randomly_whitelist(&mut self) { + unsafe { + if !self::LOCAL_BLACKLISTS { + return; + } + } + + let mut rng = rand::thread_rng(); + if !self.blacklisted_nodes.is_empty() && rng.gen_bool(0.05) { + let len = self.blacklisted_nodes.len(); + if len > 0 { + let random_index = rng.gen_range(0..len); + let node_id_opt = self.blacklisted_nodes.iter().nth(random_index).clone(); + if let Some(node_id) = node_id_opt { + log::warn!("whitelisting {node_id}."); + self.core.add_node(node_id.clone()); + self.blacklisted_nodes.remove(&(node_id.clone())); + } + } + } + } + + fn log_requests_in_flight(&self) { + unsafe { + if !self::LOCAL_BLACKLISTS { + return; + } + } + + log::info!("counts:"); + self.requests_in_flight + .iter() + .map(|tuple| tuple.1) + .counts() + .iter() + .for_each(|item| log::info!(" {} -> {}", item.0, item.1)) + } + fn message_closest( - &self, + &mut self, orig: NodeID, last_hop: NodeID, key: U256, msg: NetworkWrapper, ) -> Vec { - match self + let readflo_id_opt = self.get_readflo_id(msg.clone()); + + self.blacklist_bad_nodes(); + + let closest = self .closest_or_connected(key.clone(), Some(&last_hop)) .first() - { - Some(&next_hop) => vec![ - ModuleMessage::Closest(orig, key, msg.clone()).wrapper_network(next_hop), - DHTRouterOut::MessageRouting(orig, last_hop, next_hop, key, msg).into(), - ], + .copied(); + + self.randomly_whitelist(); + + match closest.clone() { + Some(next_hop) => { + if self.blacklisted_nodes.contains(&next_hop) { + log::error!("IMPOSSIBLE?: sending a message to a blacklisted node."); + } + + if let Some(readflo_id) = readflo_id_opt { + log::info!("NEXT HOP: {}", next_hop); + self.requests_in_flight + .insert(readflo_id.clone(), next_hop.clone()); + self.log_requests_in_flight(); + } + vec![ + ModuleMessage::Closest(orig, key, msg.clone()).wrapper_network(next_hop), + DHTRouterOut::MessageRouting(orig, last_hop, next_hop, key, msg).into(), + ] + } None => { + if readflo_id_opt.is_some() { + log::warn!("NO NEXT HOP!"); + } if key == self.core.root { vec![DHTRouterOut::MessageDest(orig, last_hop, msg).into()] } else { diff --git a/flmodules/src/dht_storage/broker.rs b/flmodules/src/dht_storage/broker.rs index 336e4b05..a3b17814 100644 --- a/flmodules/src/dht_storage/broker.rs +++ b/flmodules/src/dht_storage/broker.rs @@ -47,6 +47,7 @@ pub enum DHTStorageIn { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum DHTStorageOut { + Flos(Vec), FloValue(FloCuckoo), FloValues(Vec), RealmIDs(Vec), diff --git a/flmodules/src/dht_storage/core.rs b/flmodules/src/dht_storage/core.rs index 2e10a02f..2b997782 100644 --- a/flmodules/src/dht_storage/core.rs +++ b/flmodules/src/dht_storage/core.rs @@ -175,6 +175,10 @@ impl RealmStorage { .collect() } + pub fn get_flos(&self) -> Vec { + return self.flos.iter().map(|flo| flo.1.flo.clone()).collect_vec(); + } + pub fn store_cuckoo_ids(&mut self, parent: &FloID, cuckoos: Vec) { for cuckoo in cuckoos { self.store_cuckoo_id(parent, cuckoo); @@ -256,6 +260,11 @@ impl RealmStorage { self.realm_config.max_space ); } + // log::warn!( + // "realm storage : current({}) / max({})", + // self.size, + // self.realm_config.max_space + // ); is_new_flo } diff --git a/flmodules/src/dht_storage/messages.rs b/flmodules/src/dht_storage/messages.rs index c08f511b..2dfba8a9 100644 --- a/flmodules/src/dht_storage/messages.rs +++ b/flmodules/src/dht_storage/messages.rs @@ -47,7 +47,8 @@ pub enum MessageClosest { // which have enough place left. StoreFlo(Flo), // Request a Flo. The FloID is in the DHTRouting::Request. - ReadFlo(RealmID), + // The U256 is a random id identifying this specific request + ReadFlo(RealmID, U256), // Request Cuckoos for the given ID. The FloID is in the DHTRouting::Request. GetCuckooIDs(RealmID), // Store the Cuckoo-ID in the relevant Flo. The DHTRouting::Request(key) is the @@ -61,7 +62,8 @@ pub enum MessageClosest { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum MessageDest { // Returns the result of the requested Flo, including any available Cuckoo-IDs. - FloValue(FloCuckoo), + // The optional u256 is an optional identifier of the readflo request id + FloValue(FloCuckoo, Option), // Indicates this Flo is not in the closest node. UnknownFlo(GlobalID), // The Cuckoo-IDs stored next to the Flo composed of the GlobalID @@ -87,10 +89,58 @@ pub struct RealmStats { pub config: RealmConfig, } +#[derive(Serialize, Debug, Default, Clone)] +pub struct DsMetrics { + pub store_flo_total: u32, + pub request_flo_metas_sent_total: u32, // TODO: what? + pub flo_value_sent_total: u32, + pub flo_value_sent_blocked_total: u32, + pub available_flos_sent_total: u32, + pub available_flos_sent_blocked_total: u32, + pub flos_sent_total: u32, + pub flos_sent_blocked_total: u32, + pub max_flo_metas_received_in_available_flos: u32, + pub max_flo_metas_requested_in_request_flos: u32, + pub max_flo_ids_received_in_request_flos: u32, + pub max_flos_sent_in_flos: u32, + pub max_flos_received_in_flos: u32, +} + +/** + * It is necessary to call `self.refresh_stats()` after each change of a stat, + * and using methods would surely be worse than using macros, unless + * we would use a method for each stat. + * + * Since we want it to be as easy as possible to add new stats, we use macros. + */ +macro_rules! increment_stat { + ($self:ident, $stat:expr) => { + $stat += 1; + $self.refresh_stats(); + }; +} + +// macro_rules! absolute_stat { +// ($self:ident, $stat:expr, $value:expr) => { +// $stat = $value; +// $self.refresh_stats(); +// }; +// } + +macro_rules! max_stat { + ($self:ident, $stat:expr, $value:expr) => { + if $value > $stat { + $stat = $value; + $self.refresh_stats(); + } + }; +} + #[derive(Debug, Default, Clone)] pub struct Stats { pub realm_stats: HashMap, pub system_realms: Vec, + pub experiment_stats: DsMetrics, } /// The message handling part, but only for DHTStorage messages. @@ -101,10 +151,15 @@ pub struct Messages { our_id: NodeID, ds: Box, tx: Option>, + + experiment_stats: DsMetrics, } +pub static mut EVIL_NO_FORWARD: bool = false; +pub static mut LOCAL_BLACKLISTS: bool = false; + impl Messages { - /// Returns a new chat module. + /// Returns a new simulation_chat module. pub fn new( ds: Box, config: DHTConfig, @@ -120,6 +175,8 @@ impl Messages { nodes: vec![], ds, tx: Some(tx), + + experiment_stats: DsMetrics::new(), }; msgs.store(); (msgs, rx) @@ -159,7 +216,7 @@ impl Messages { DHTStorageIn::StoreFlo(flo) => self.store_flo(flo), DHTStorageIn::ReadFlo(id) => vec![match self.read_flo(&id) { Some(df) => DHTStorageOut::FloValue(df.clone()).into(), - None => MessageClosest::ReadFlo(id.realm_id().clone()) + None => MessageClosest::ReadFlo(id.realm_id().clone(), U256::rnd()) .to_intern_out(id.flo_id().clone().into()) // .inspect(|msg| log::info!("{} sends {msg:?}", self.our_id)) .expect("Creating ReadFlo message"), @@ -190,7 +247,8 @@ impl Messages { .iter() .flat_map(|realm| realm.1.get_all_flo_cuckoos()) .collect::>(), - ).into()] + ) + .into()] } } } @@ -207,7 +265,7 @@ impl Messages { MessageClosest::StoreFlo(flo) => { return self.store_flo(flo); } - MessageClosest::ReadFlo(rid) => { + MessageClosest::ReadFlo(rid, request_id) => { // log::info!( // "{} got request for {}/{} from {}", // self.our_id, @@ -222,10 +280,16 @@ impl Messages { .and_then(|realm| realm.get_flo_cuckoo(&fid)) { // log::info!("sends flo {:?}", fc.0); - return MessageDest::FloValue(fc) - .to_intern_out(origin) - // .inspect(|msg| log::info!("{} sends {msg:?}", self.our_id)) - .map_or(vec![], |msg| vec![msg]); + if unsafe { !EVIL_NO_FORWARD } { + increment_stat!(self, self.experiment_stats.flo_value_sent_total); + return MessageDest::FloValue(fc, Some(request_id)) + .to_intern_out(origin) + // .inspect(|msg| log::info!("{} sends {msg:?}", self.our_id)) + .map_or(vec![], |msg| vec![msg]); + } else { + increment_stat!(self, self.experiment_stats.flo_value_sent_blocked_total); + return vec![]; + } } } MessageClosest::GetCuckooIDs(rid) => { @@ -251,7 +315,7 @@ impl Messages { fn msg_dest(&mut self, msg: MessageDest) -> Vec { match msg { - MessageDest::FloValue(fc) => { + MessageDest::FloValue(fc, _) => { // log::info!("{} stores {:?}", self.our_id, flo.0); self.store_flo(fc.0.clone()); self.realms @@ -289,6 +353,7 @@ impl Messages { self.realms.keys().cloned().collect(), )], MessageNeighbour::AvailableRealmIDs(realm_ids) => { + increment_stat!(self, self.experiment_stats.request_flo_metas_sent_total); let accepted_realms = realm_ids .into_iter() .filter(|rid| self.config.accepts_realm(&rid)) @@ -304,31 +369,80 @@ impl Messages { ) .collect() } - MessageNeighbour::RequestFloMetas(realm_id) => self - .realms - .get(&realm_id) - .map(|realm| realm.get_flo_metas()) - .map_or(vec![], |fm| { - vec![MessageNeighbour::AvailableFlos(realm_id, fm)] - }), - MessageNeighbour::AvailableFlos(realm_id, flo_metas) => self - .realms - .get(&realm_id) - .and_then(|realm| realm.sync_available(&flo_metas)) - .map_or(vec![], |needed| { - vec![MessageNeighbour::RequestFlos(realm_id, needed)] - }), - MessageNeighbour::RequestFlos(realm_id, flo_ids) => self - .realms - .get(&realm_id) - .map(|realm| { - flo_ids - .iter() - .filter_map(|id| realm.get_flo_cuckoo(id)) - .collect::>() - }) - .map_or(vec![], |flos| vec![MessageNeighbour::Flos(flos)]), + MessageNeighbour::RequestFloMetas(realm_id) => { + if unsafe { !EVIL_NO_FORWARD } { + increment_stat!(self, self.experiment_stats.available_flos_sent_total); + self.realms + .get(&realm_id) + .map(|realm| realm.get_flo_metas()) + .map_or(vec![], |fm| { + vec![MessageNeighbour::AvailableFlos(realm_id, fm)] + }) + } else { + increment_stat!( + self, + self.experiment_stats.available_flos_sent_blocked_total + ); + vec![] + } + } + MessageNeighbour::AvailableFlos(realm_id, flo_metas) => { + max_stat!( + self, + self.experiment_stats + .max_flo_metas_received_in_available_flos, + flo_metas.len() as u32 + ); + + self.realms + .get(&realm_id) + .and_then(|realm| realm.sync_available(&flo_metas)) + .map_or(vec![], |needed| { + max_stat!( + self, + self.experiment_stats + .max_flo_metas_requested_in_request_flos, + needed.len() as u32 + ); + vec![MessageNeighbour::RequestFlos(realm_id, needed)] + }) + } + MessageNeighbour::RequestFlos(realm_id, flo_ids) => { + max_stat!( + self, + self.experiment_stats.max_flo_ids_received_in_request_flos, + flo_ids.len() as u32 + ); + if unsafe { !EVIL_NO_FORWARD } { + increment_stat!(self, self.experiment_stats.flos_sent_total); + self.realms + .get(&realm_id) + .map(|realm| { + flo_ids + .iter() + .filter_map(|id| realm.get_flo_cuckoo(id)) + .collect::>() + }) + .map_or(vec![], |flos| { + //log::info!("Flos to send: {}", flos.len()); + max_stat!( + self, + self.experiment_stats.max_flos_sent_in_flos, + flos.len() as u32 + ); + vec![MessageNeighbour::Flos(flos)] + }) + } else { + increment_stat!(self, self.experiment_stats.flos_sent_blocked_total); + vec![] + } + } MessageNeighbour::Flos(flo_cuckoos) => { + max_stat!( + self, + self.experiment_stats.max_flos_received_in_flos, + flo_cuckoos.len() as u32 + ); for (flo, cuckoos) in flo_cuckoos { self.store_flo(flo.clone()); self.realms.get_mut(&flo.realm_id()).map(|realm| { @@ -352,6 +466,7 @@ impl Messages { } fn store_flo(&mut self, flo: Flo) -> Vec { + increment_stat!(self, self.experiment_stats.store_flo_total); let mut res = vec![]; if self.upsert_flo(flo.clone()) { // log::info!("{}: store_flo", self.our_id); @@ -372,7 +487,7 @@ impl Messages { // Either its realm is already known, or it is a new realm. // When 'true' is returned, then the flo has been stored. fn upsert_flo(&mut self, flo: Flo) -> bool { - // log::trace!( + // log::info!( // "{} store_flo {}({}/{}) {}", // self.our_id, // flo.flo_type(), @@ -427,15 +542,23 @@ impl Messages { } fn store(&mut self) { - self.tx.clone().map(|tx| { - tx.send(Stats::from_realms(&self.realms, self.config.realms.clone())) - .is_err() - .then(|| self.tx = None) - }); + self.refresh_stats(); serde_yaml::to_string(&self.realms) .ok() .map(|s| (*self.ds).set(MODULE_NAME, &s)); } + + fn refresh_stats(&mut self) { + self.tx.clone().map(|tx| { + tx.send(Stats::from_realms( + &self.realms, + self.config.realms.clone(), + self.experiment_stats.clone(), + )) + .is_err() + .then(|| self.tx = None) + }); + } } #[platform_async_trait()] @@ -454,19 +577,30 @@ impl SubsystemHandler for Messages { .map_or(vec![], |msg| vec![msg]) } }) - // .inspect(|msg| log::debug!("{_id}: Out: {msg:?}")) + //.inspect(|msg| log::debug!("{_id}: Out: {msg:?}")) .collect() } } +impl DsMetrics { + pub fn new() -> Self { + Self::default() + } +} + impl Stats { - fn from_realms(realms: &HashMap, system_realms: Vec) -> Self { + fn from_realms( + realms: &HashMap, + system_realms: Vec, + experiment_stats: DsMetrics, + ) -> Self { Self { realm_stats: realms .iter() .map(|(id, realm)| (id.clone(), RealmStats::from_realm(realm))) .collect(), system_realms, + experiment_stats, } } } @@ -592,11 +726,11 @@ mod tests { let out = NetworkWrapper::wrap_yaml( MODULE_NAME, - &MessageDest::FloValue((fr.flo().clone(), vec![])), + &MessageDest::FloValue((fr.flo().clone(), vec![]), None), ) .unwrap(); - if let MessageDest::FloValue(flo) = out.unwrap_yaml(MODULE_NAME).unwrap() { + if let MessageDest::FloValue(flo, _) = out.unwrap_yaml(MODULE_NAME).unwrap() { let fr2 = TryInto::::try_into(flo.0)?; assert_eq!(fr, fr2); } else { diff --git a/flmodules/src/dht_storage/realm_view.rs b/flmodules/src/dht_storage/realm_view.rs index be12d9f9..52699ed0 100644 --- a/flmodules/src/dht_storage/realm_view.rs +++ b/flmodules/src/dht_storage/realm_view.rs @@ -397,16 +397,16 @@ impl RealmView { self.dht_storage.convert(flo.cond(), &flo.realm_id()).await } - pub async fn update_realm(&mut self) -> anyhow::Result<&FloRealm>{ + pub async fn update_realm(&mut self) -> anyhow::Result<&FloRealm> { self.realm = self.dht_storage.get_flo(&self.realm.global_id()).await?; Ok(&self.realm) } - pub async fn update_all(&mut self) -> anyhow::Result<()>{ + pub async fn update_all(&mut self) -> anyhow::Result<()> { self.update_pages().await?; self.update_tags().await?; self.update_realm().await?; - + Ok(()) } } diff --git a/flmodules/src/flo/blob.rs b/flmodules/src/flo/blob.rs index 6d23725f..ce6be70f 100644 --- a/flmodules/src/flo/blob.rs +++ b/flmodules/src/flo/blob.rs @@ -171,6 +171,10 @@ impl FloBlobTag { pub fn blob_id(&self) -> BlobID { (*self.flo_id()).into() } + + pub fn values(&self) -> &HashMap { + (&(*self.get_blob().values())).into() + } } impl BlobAccess for FloBlobTag { @@ -195,6 +199,20 @@ impl Blob { datas: HashMap::new(), } } + + pub fn make( + blob_type: String, + links: HashMap>, + values: HashMap, + datas: HashMap, + ) -> Self { + Self { + blob_type, + links, + values, + datas, + } + } } impl BlobFamily for FloBlob {} diff --git a/flmodules/src/flo/flo.rs b/flmodules/src/flo/flo.rs index 12283187..b194bf2a 100644 --- a/flmodules/src/flo/flo.rs +++ b/flmodules/src/flo/flo.rs @@ -59,7 +59,7 @@ pub struct Flo { // The data signature is verifiable by the latest cond. data_signature: ConditionSignature, // Because I thought that ed25519 signatures have a random nonce, when in - // fact the implementation of ed25519-dalek uses a nonce derived from the + // fact the implementation of ed25519-dalek uses a nonce derived from the // message, probably to avoid nonce-reuse... nonce: U256, // The first condition of this Flo, together with a signature on itself. diff --git a/flmodules/src/gossip_events/broker.rs b/flmodules/src/gossip_events/broker.rs index 6537de11..13ca23a3 100644 --- a/flmodules/src/gossip_events/broker.rs +++ b/flmodules/src/gossip_events/broker.rs @@ -80,7 +80,7 @@ impl Gossip { Ok(()) } - /// Gets a copy of all chat events stored in the module. + /// Gets a copy of all simulation_chat events stored in the module. pub fn chat_events(&self) -> Vec { self.storage.borrow().events(Category::TextMessage) } diff --git a/flmodules/src/gossip_events/messages.rs b/flmodules/src/gossip_events/messages.rs index b6b10252..2d088c3a 100644 --- a/flmodules/src/gossip_events/messages.rs +++ b/flmodules/src/gossip_events/messages.rs @@ -32,7 +32,7 @@ pub enum GossipOut { } /// The first module to use the random_connections is a copy of the previous -/// chat. +/// simulation_chat. /// Now it holds events of multiple categories and exchanges them between the /// nodes. pub struct Messages { @@ -57,7 +57,7 @@ impl std::fmt::Debug for Messages { } impl Messages { - /// Returns a new chat module. + /// Returns a new simulation_chat module. pub fn new( id: NodeID, storage: Box, diff --git a/flmodules/src/network/broker.rs b/flmodules/src/network/broker.rs index aac054e3..2422951e 100644 --- a/flmodules/src/network/broker.rs +++ b/flmodules/src/network/broker.rs @@ -11,6 +11,7 @@ use core::panic; use itertools::concat; +use metrics::counter; use std::{fmt, time::Duration}; use thiserror::Error; use tokio::sync::{mpsc::UnboundedReceiver, watch}; @@ -171,6 +172,8 @@ impl Messages { /// This can be either messages requested by this node, or connection /// setup requests from another node. async fn msg_ws(&mut self, msg: WSClientOut) -> Vec { + counter!("network_broker_recv_ws_bytes", size_of_val(&msg) as u64); + let msg_node_str = match msg { WSClientOut::Message(msg) => msg, WSClientOut::Error(e) => { @@ -230,6 +233,8 @@ impl Messages { } async fn msg_call(&mut self, msg: NetworkIn) -> anyhow::Result> { + counter!("network_broker_recv_call_bytes", size_of_val(&msg) as u64); + match msg { NetworkIn::MessageToNode(id, msg_str) => { log::trace!( @@ -266,6 +271,11 @@ impl Messages { } async fn msg_node(&mut self, id: U256, msg_nc: NCOutput) -> Vec { + counter!( + "network_broker_recv_node_bytes", + size_of_val(&msg_nc) as u64 + ); + match msg_nc { NCOutput::Connected(_) => vec![NetworkOut::Connected(id)], NCOutput::Disconnected(_) => vec![NetworkOut::Disconnected(id)], @@ -346,6 +356,7 @@ impl SubsystemHandler for Messages { "{}: Processing message {msg}", self.node_config.info.get_id() ); + counter!("network_recv_bytes", size_of_val(&msg) as u64); match msg { NetworkIn::WebSocket(ws) => out.extend(self.msg_ws(ws).await), NetworkIn::WebRTC(WebRTCConnOutput::Message(id, msg)) => { @@ -396,6 +407,7 @@ impl NetworkWebRTC { loop { let msg = self.tap.recv().await; if let Some(msg_reply) = msg { + counter!("network_recv_bytes", size_of_val(&msg_reply) as u64); return msg_reply; } } @@ -405,6 +417,7 @@ impl NetworkWebRTC { /// The message is of type [`NetworkIn`], as this is what the user can /// send to the [`Network`]. pub fn send(&mut self, msg: NetworkIn) -> anyhow::Result<()> { + counter!("network_broker_rtc_sent_bytes", size_of_val(&msg) as u64); self.broker_net.emit_msg_in(msg) } diff --git a/flmodules/src/template/messages.rs b/flmodules/src/template/messages.rs index 15204491..0dc517c7 100644 --- a/flmodules/src/template/messages.rs +++ b/flmodules/src/template/messages.rs @@ -40,7 +40,7 @@ pub struct Messages { } impl Messages { - /// Returns a new chat module. + /// Returns a new simulation_chat module. pub fn new( storage: Box, cfg: TemplateConfig, diff --git a/flmodules/src/web_proxy/messages.rs b/flmodules/src/web_proxy/messages.rs index 936be393..f2b49733 100644 --- a/flmodules/src/web_proxy/messages.rs +++ b/flmodules/src/web_proxy/messages.rs @@ -52,7 +52,7 @@ pub struct Messages { } impl Messages { - /// Returns a new chat module. + /// Returns a new simulation_chat module. pub fn new( ds: Box, cfg: WebProxyConfig, diff --git a/flnode/src/node.rs b/flnode/src/node.rs index 68c9fd7a..7bc2f756 100644 --- a/flnode/src/node.rs +++ b/flnode/src/node.rs @@ -259,7 +259,7 @@ impl Node { } } - /// Adds a new chat message that will be broadcasted to the system. + /// Adds a new simulation_chat message that will be broadcasted to the system. pub async fn add_chat_message(&mut self, msg: String) -> anyhow::Result<()> { if let Some(g) = self.gossip.as_mut() { let event = core::Event { diff --git a/metrics-exporter-influx b/metrics-exporter-influx new file mode 160000 index 00000000..e79ff4ac --- /dev/null +++ b/metrics-exporter-influx @@ -0,0 +1 @@ +Subproject commit e79ff4ac29d34e3593cad950d6693b8684841b34