将Cargo.toml
文件修改为如下所示:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13[package] name = "server" version = "0.1.0" edition = "2022" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] tokio = { version = "1", features = ["full"] } warp = "0.3" futures-util = { version = "0.3", default-features = false, features = ["sink"] } tokio-stream = "0.1" pretty_env_logger = "0.4"
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139// #![deny(warnings)] use std::collections::HashMap; use std::convert::Infallible; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; use futures_util::{SinkExt, StreamExt, TryFutureExt}; use tokio::sync::{mpsc, RwLock}; use tokio_stream::wrappers::UnboundedReceiverStream; use warp::ws::{Message, WebSocket}; use warp::Filter; /// 我们的全局唯一用户 ID 计数器。 static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); /// 我们当前连接用户的状态。 /// /// - 键是他们的 id /// - 值是 `warp::ws::Message` 的发送者 type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>; #[tokio::main] async fn main() { pretty_env_logger::init(); // 跟踪所有连接的用户,key是usize,value // 是一个 websocket 发送者。 let users = Users::default(); // 把我们的“状态”变成一个新的过滤器... //注释掉了 //let users = warp::any().map(move || users.clone()); // GET /chat -> websocket 升级 let chat = warp::path("chat") // `ws()` 过滤器将准备 Websocket 握手... .and(warp::ws()) .and(with_users(users.clone())) .map(|ws: warp::ws::Ws, users| { // 如果握手成功,这将调用我们的函数。 ws.on_upgrade(move |socket| user_connected(socket, users)) }); let files = warp::fs::dir("static"); let routes = chat.or(files); warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; } //包一下,让它可以克隆 fn with_users(users: Users) -> impl Filter<Extract = (Users,), Error = Infallible> + Clone { warp::any().map(move || users.clone()) } async fn user_connected(ws: WebSocket, users: Users) { // 使用计数器为该用户分配一个新的唯一 ID。 let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); eprintln!("new chat user: {}", my_id); // 将套接字拆分为消息的发送者和接收者。 let (mut user_ws_tx, mut user_ws_rx) = ws.split(); // 使用无界通道来处理消息的缓冲和刷新 // 到 websocket... let (tx, rx) = mpsc::unbounded_channel(); let mut rx = UnboundedReceiverStream::new(rx); tokio::task::spawn(async move { while let Some(message) = rx.next().await { user_ws_tx .send(message) .unwrap_or_else(|e| { eprintln!("websocket send error: {}", e); }) .await; } }); // 将发件人保存在我们的已连接用户列表中。 users.write().await.insert(my_id, tx.clone()); // 返回一个 `Future`,它基本上是一个状态机管理 // 这个特定用户的连接。 // 用户每次发送消息,广播给 // 所有其他用户... while let Some(result) = user_ws_rx.next().await { let msg = match result { Ok(msg) => { msg } Err(e) => { eprintln!("websocket error(uid={}): {}", my_id, e); break; } }; //因为我不需要把用户每次发送消息,广播给所有其他用户,先注释掉了 user_message(my_id, msg, &users).await; } // 只要用户停留,user_ws_rx 流将继续处理 // 连接的。 一旦他们断开连接,然后... user_disconnected(my_id, &users).await; } async fn user_message(my_id: usize, msg: Message, users: &Users) { // 跳过任何非文本消息... let msg = if let Ok(s) = msg.to_str() { s } else { return; }; let new_msg = format!("<User#{}>: {}", my_id, msg); // 来自该用户的新消息,将其发送给其他所有人(相同的 uid 除外)... for (&uid, tx) in users.read().await.iter() { if my_id != uid { if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) { // tx 断开连接,我们的 `user_disconnected` 代码 // 应该发生在另一个任务中,仅此而已 // 在这里做。 } } } } async fn user_disconnected(my_id: usize, users: &Users) { eprintln!("good bye user: {}", my_id); // 流关闭,所以从用户列表中删除 users.write().await.remove(&my_id); }
最后
以上就是英俊小蜜蜂最近收集整理的关于Rust之Websocket 服务端【Warp】的全部内容,更多相关Rust之Websocket内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复