Skip to content

Commit f716e6e

Browse files
update
1 parent 0d0d5b2 commit f716e6e

File tree

6 files changed

+502
-6
lines changed

6 files changed

+502
-6
lines changed

tun_test10/src/.main.rs.swp

8 KB
Binary file not shown.

tun_test10/src/.server.rs.swp

44 KB
Binary file not shown.

tun_test10/src/main.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ pub enum Command {
5151
proto: pnet::packet::ip::IpNextHeaderProtocol,
5252
val: Vec<u8>,
5353
},
54+
Cmd {
55+
key: String,
56+
},
5457
}
5558

5659
fn replyer_spawn(
@@ -72,6 +75,9 @@ fn replyer_spawn(
7275
}
7376
.await;
7477
}
78+
Cmd { key } => {
79+
println!(" Cmd");
80+
}
7581
}
7682
}
7783
})
@@ -122,6 +128,9 @@ fn target_spawn(
122128
}
123129
};
124130
}
131+
Cmd { key } => {
132+
println!(" Cmd-> {}", key);
133+
}
125134
}
126135
}
127136
})
@@ -183,8 +192,9 @@ async fn start(arc_map: &Arc<std::sync::Mutex<HashMap<IpAddr, Target>>> ) -> Res
183192

184193
let ipaddr = IpAddr::V4(header.get_destination());
185194

186-
//let tar_tx_ = if map.contains_key(&ipaddr) {
187195
let mut map = arc_map.lock().unwrap();
196+
//let map2 = Arc::clone(&arc_map);
197+
//let map = map2.lock().unwrap();
188198
let tar_tx_ = if map.contains_key(&ipaddr) {
189199
match map.get(&ipaddr) {
190200
Some(target) => &target.tx,
@@ -218,6 +228,11 @@ async fn start(arc_map: &Arc<std::sync::Mutex<HashMap<IpAddr, Target>>> ) -> Res
218228
};
219229

220230
tar_tx_.send(cmd).await.unwrap();
231+
232+
//let cmd2 = Command::Cmd {
233+
// key: "foo2".to_string(),
234+
//};
235+
//tar_tx_.send(cmd2).await.unwrap();
221236
}
222237
}
223238

tun_test10/src/server.rs

+122-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ use std::collections::HashMap;
1818
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
1919

2020
use std::sync::{Arc, Mutex};
21+
//use tokio::sync::Mutex;
22+
//use std::sync::Arc;
23+
2124
use std::str::FromStr;
2225

2326

@@ -62,14 +65,17 @@ pub async fn start(arc_map: Arc<std::sync::Mutex<HashMap<IpAddr,super::Target>>>
6265
while let Ok((stream, _)) = listener.accept().await {
6366
let peer = stream.peer_addr().expect("connected streams should have a peer address");
6467
//tokio::spawn(accept_connection(arc_map.clone(), stream));
65-
tokio::spawn(accept_connection(arc_map.clone(), peer, stream));
68+
//tokio::spawn(accept_connection(arc_map.clone(), peer, stream));
69+
let handle = tokio::spawn(handle_connection(arc_map.clone(), peer, stream));
70+
//let _ = handle.await.unwrap();
6671

6772
}
6873

6974
Ok(())
7075
}
7176

72-
async fn accept_connection(arc_map: Arc<std::sync::Mutex<HashMap<IpAddr,super::Target>>>,peer: SocketAddr,stream: TcpStream) {
77+
async fn accept_connection(arc_map: Arc<std::sync::Mutex<HashMap<IpAddr,super::Target>>>,peer: SocketAddr,stream: TcpStream)
78+
-> Result<()> {
7379
/*
7480
let addr = stream.peer_addr().expect("connected streams should have a peer address");
7581
//info!("Peer address: {}", addr);
@@ -84,12 +90,15 @@ async fn accept_connection(arc_map: Arc<std::sync::Mutex<HashMap<IpAddr,super::T
8490
read.forward(write).await.expect("Failed to forward message");
8591
*/
8692

87-
if let Err(e) = handle_connection(peer, stream).await {
93+
94+
if let Err(e) = handle_connection(arc_map, peer, stream).await {
8895
match e {
8996
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
9097
err => error!("Error processing connection: {}", err),
9198
}
9299
}
100+
Ok(())
101+
93102
/*
94103
let ipaddr = IpAddr::V4(Ipv4Addr::new(10, 1, 0, 2));
95104
let mut map = arc_map.lock().unwrap();
@@ -108,7 +117,78 @@ async fn accept_connection(arc_map: Arc<std::sync::Mutex<HashMap<IpAddr,super::T
108117
}
109118

110119

111-
async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> {
120+
//async fn parser(arc_map: &Arc<std::sync::Mutex<HashMap<IpAddr,super::Target>>>, msg :&Message) -> Result<()> {
121+
async fn parser(arc_map: &Arc<std::sync::Mutex<HashMap<IpAddr,super::Target>>>, msg :&Message) {
122+
123+
println!("SERVER: MSG");
124+
125+
let ipaddr = IpAddr::V4(Ipv4Addr::new(10, 1, 0, 2));
126+
//let mut map = arc_map.lock().unwrap();
127+
let map = arc_map.lock().unwrap();
128+
if map.contains_key(&ipaddr) {
129+
//Box::pin(async move {
130+
match map.get(&ipaddr) {
131+
Some(target) => {
132+
//&target.tx,
133+
134+
println!("map ok ");
135+
//&target.tx,
136+
//let cmd = super::Command::Cmd {
137+
// key: "*command".to_string(),
138+
//};
139+
//let txc = target.tx.clone();
140+
//target.tx.send(cmd).await.unwrap();
141+
//target.tx.send(cmd);
142+
143+
//txc.send(cmd).await.unwrap();
144+
//txc.send(cmd).await;
145+
//txc.send(cmd);
146+
},
147+
None => {
148+
println!("map ng ");
149+
}
150+
}
151+
//});
152+
}
153+
//Ok(())
154+
155+
}
156+
/*
157+
async fn parser(arc_map: &Arc<std::sync::Mutex<HashMap<IpAddr,super::Target>>>, msg :&Message) -> Result<()> {
158+
159+
println!("SERVER: MSG");
160+
161+
let ipaddr = IpAddr::V4(Ipv4Addr::new(10, 1, 0, 2));
162+
163+
let mut map = arc_map.lock().unwrap();
164+
if map.contains_key(&ipaddr) {
165+
match map.get(&ipaddr) {
166+
Some(target) => {
167+
//&target.tx,
168+
println!("map ok ");
169+
let cmd = super::Command::Cmd {
170+
key: "*command".to_string(),
171+
};
172+
let txc = target.tx.clone();
173+
target.tx.send(cmd).await.unwrap();
174+
//target.tx.send(cmd);
175+
176+
//txc.send(cmd).await.unwrap();
177+
//txc.send(cmd);
178+
179+
},
180+
None => {
181+
println!("map ng ");
182+
}
183+
}
184+
};
185+
186+
Ok(())
187+
188+
}
189+
190+
*/
191+
async fn handle_connection(arc_map: Arc<std::sync::Mutex<HashMap<IpAddr,super::Target>>>,peer: SocketAddr, stream: TcpStream) -> Result<()> {
112192
let ws_stream = accept_async(stream).await.expect("Failed to accept");
113193
info!("New WebSocket connection: {}", peer);
114194
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
@@ -123,7 +203,44 @@ async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> {
123203
Some(msg) => {
124204
let msg = msg?;
125205
if msg.is_text() ||msg.is_binary() {
126-
ws_sender.send(msg).await?;
206+
207+
// parser(&arc_map, &msg).await;
208+
ws_sender.send(msg).await?;
209+
210+
let ipaddr = IpAddr::V4(Ipv4Addr::new(10, 1, 0, 2));
211+
//let map = arc_map.lock().unwrap();
212+
let map_lock = Arc::clone(&arc_map);
213+
let map = map_lock.lock().unwrap();
214+
let rtx_ = if map.contains_key(&ipaddr) {
215+
match map.get(&ipaddr) {
216+
Some(target) => {
217+
println!("map ok ");
218+
Some(target.tx.clone())
219+
},
220+
None => {
221+
println!("map ng ");
222+
None
223+
}
224+
}
225+
} else {
226+
None
227+
};
228+
229+
match rtx_ {
230+
231+
Some(tx) => {
232+
let cmd = super::Command::Cmd {
233+
key: "*command".to_string(),
234+
};
235+
//tx.send(cmd).await.unwrap();
236+
237+
},
238+
None => {
239+
continue;
240+
},
241+
242+
};
243+
127244
} else if msg.is_close() {
128245
break;
129246
}

tun_test10/src/server.rs.old1

+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
//! A simple echo server.
2+
//!
3+
//! You can test this out by running:
4+
//!
5+
//! cargo run --example server 127.0.0.1:12345
6+
//!
7+
//! And then in another window run:
8+
//!
9+
//! cargo run --example client ws://127.0.0.1:12345/
10+
11+
use std::{env };
12+
13+
//use futures_util::StreamExt;
14+
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
15+
16+
use std::collections::HashMap;
17+
//use std::net::IpAddr;
18+
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
19+
20+
use std::sync::{Arc, Mutex};
21+
use std::str::FromStr;
22+
23+
24+
use futures_util::{SinkExt};
25+
use log::*;
26+
use std::{net::SocketAddr, time::Duration};
27+
use tokio::net::{TcpListener, TcpStream};
28+
use tokio_tungstenite::{accept_async, tungstenite::Error, tungstenite::Message, tungstenite::Result};
29+
//use tungstenite::{Message, Result};
30+
31+
//mod main;
32+
33+
/*
34+
#[tokio::main]
35+
async fn main() -> Result<(), Error> {
36+
let _ = env_logger::try_init();
37+
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
38+
39+
// Create the event loop and TCP listener we'll accept connections on.
40+
let try_socket = TcpListener::bind(&addr).await;
41+
let listener = try_socket.expect("Failed to bind");
42+
info!("Listening on: {}", addr);
43+
44+
while let Ok((stream, _)) = listener.accept().await {
45+
tokio::spawn(accept_connection(stream));
46+
}
47+
48+
Ok(())
49+
}
50+
*/
51+
52+
pub async fn start(arc_map: Arc<std::sync::Mutex<HashMap<IpAddr,super::Target>>>) -> Result<()> {
53+
//pub async fn start() -> Result<(), Error> {
54+
//let _ = env_logger::try_init();
55+
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
56+
57+
// Create the event loop and TCP listener we'll accept connections on.
58+
let try_socket = TcpListener::bind(&addr).await;
59+
let listener = try_socket.expect("Failed to bind");
60+
info!("Listening on: {}", addr);
61+
62+
while let Ok((stream, _)) = listener.accept().await {
63+
let peer = stream.peer_addr().expect("connected streams should have a peer address");
64+
//tokio::spawn(accept_connection(arc_map.clone(), stream));
65+
tokio::spawn(accept_connection(arc_map.clone(), peer, stream));
66+
67+
}
68+
69+
Ok(())
70+
}
71+
72+
async fn accept_connection(arc_map: Arc<std::sync::Mutex<HashMap<IpAddr,super::Target>>>,peer: SocketAddr,stream: TcpStream) {
73+
/*
74+
let addr = stream.peer_addr().expect("connected streams should have a peer address");
75+
//info!("Peer address: {}", addr);
76+
77+
let ws_stream = tokio_tungstenite::accept_async(stream)
78+
.await
79+
.expect("Error during the websocket handshake occurred");
80+
81+
//info!("New WebSocket connection: {}", addr);
82+
83+
let (write, read) = ws_stream.split();
84+
read.forward(write).await.expect("Failed to forward message");
85+
*/
86+
87+
if let Err(e) = handle_connection(peer, stream).await {
88+
match e {
89+
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
90+
err => error!("Error processing connection: {}", err),
91+
}
92+
}
93+
/*
94+
let ipaddr = IpAddr::V4(Ipv4Addr::new(10, 1, 0, 2));
95+
let mut map = arc_map.lock().unwrap();
96+
if map.contains_key(&ipaddr) {
97+
match map.get(&ipaddr) {
98+
Some(target) => { //&target.tx,
99+
println!(" is match.");
100+
},
101+
None => {
102+
println!(" is unreviewed.");
103+
return;
104+
}
105+
}
106+
}
107+
*/
108+
}
109+
110+
111+
async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> {
112+
let ws_stream = accept_async(stream).await.expect("Failed to accept");
113+
info!("New WebSocket connection: {}", peer);
114+
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
115+
let mut interval = tokio::time::interval(Duration::from_millis(1000));
116+
117+
// Echo incoming WebSocket messages and send a message periodically every second.
118+
119+
loop {
120+
tokio::select! {
121+
msg = ws_receiver.next() => {
122+
match msg {
123+
Some(msg) => {
124+
let msg = msg?;
125+
if msg.is_text() ||msg.is_binary() {
126+
ws_sender.send(msg).await?;
127+
} else if msg.is_close() {
128+
break;
129+
}
130+
}
131+
None => break,
132+
}
133+
}
134+
_ = interval.tick() => {
135+
ws_sender.send(Message::Text("tick".to_owned())).await?;
136+
}
137+
}
138+
}
139+
140+
Ok(())
141+
}
142+

0 commit comments

Comments
 (0)