Skip to content

Commit 48f4250

Browse files
authored
Merge pull request #393 from nervosnetwork/wasm-support-wss
feat: wasm support wss
2 parents 09ec7b7 + 22cc9bb commit 48f4250

File tree

3 files changed

+40
-15
lines changed

3 files changed

+40
-15
lines changed

tentacle/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ thiserror = "1.0"
3030
nohash-hasher = "0.2"
3131

3232
parking_lot = { version = "0.12", optional = true }
33-
tokio-tungstenite = { version = "0.24", optional = true }
33+
tokio-tungstenite = { version = "0.26", optional = true }
3434
httparse = { version = "1.9", optional = true }
3535
futures-timer = { version = "3.0.2", optional = true }
3636
async-std = { version = "1", features = ["unstable"], optional = true }

tentacle/src/transports/browser.rs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,23 @@ use crate::{
4444
use futures::FutureExt;
4545
use wasm_bindgen::JsCast;
4646

47-
async fn connect(addr: Multiaddr, timeout: Duration) -> Result<(Multiaddr, BrowserStream)> {
47+
async fn connect(
48+
addr: Multiaddr,
49+
timeout: Duration,
50+
ty: TransportType,
51+
) -> Result<(Multiaddr, BrowserStream)> {
52+
let schema = match ty {
53+
TransportType::Ws => "ws",
54+
TransportType::Wss => "wss",
55+
_ => unreachable!(),
56+
};
4857
let url = match multiaddr_to_socketaddr(&addr) {
49-
Some(socket_address) => format!("ws://{}:{}", socket_address.ip(), socket_address.port()),
58+
Some(socket_address) => format!(
59+
"{}://{}:{}",
60+
schema,
61+
socket_address.ip(),
62+
socket_address.port()
63+
),
5064
None => {
5165
let mut iter = addr.iter().peekable();
5266

@@ -72,10 +86,10 @@ async fn connect(addr: Multiaddr, timeout: Duration) -> Result<(Multiaddr, Brows
7286

7387
match (proto1, proto2) {
7488
(Protocol::Dns4(domain), Protocol::Tcp(port)) => {
75-
break format!("ws://{}:{}", domain, port)
89+
break format!("{}://{}:{}", schema, domain, port)
7690
}
7791
(Protocol::Dns6(domain), Protocol::Tcp(port)) => {
78-
break format!("ws://{}:{}", domain, port)
92+
break format!("{}://{}:{}", schema, domain, port)
7993
}
8094
_ => return Err(TransportErrorKind::NotSupported(addr.clone())),
8195
}
@@ -127,14 +141,24 @@ impl TransportDial for BrowserTransport {
127141
type DialFuture = BrowserDialFuture;
128142

129143
fn dial(self, address: Multiaddr) -> Result<Self::DialFuture> {
130-
if !matches!(find_type(&address), TransportType::Ws) {
131-
return Err(TransportErrorKind::NotSupported(address));
132-
}
133-
let dial = crate::runtime::spawn(connect(address, self.timeout));
144+
match find_type(&address) {
145+
TransportType::Ws => {
146+
let dial = crate::runtime::spawn(connect(address, self.timeout, TransportType::Ws));
134147

135-
Ok(TransportFuture::new(Box::pin(async {
136-
dial.await.expect("oneshot channel panic")
137-
})))
148+
Ok(TransportFuture::new(Box::pin(async {
149+
dial.await.expect("oneshot channel panic")
150+
})))
151+
}
152+
TransportType::Wss => {
153+
let dial =
154+
crate::runtime::spawn(connect(address, self.timeout, TransportType::Wss));
155+
156+
Ok(TransportFuture::new(Box::pin(async {
157+
dial.await.expect("oneshot channel panic")
158+
})))
159+
}
160+
_ => Err(TransportErrorKind::NotSupported(address)),
161+
}
138162
}
139163
}
140164

tentacle/src/transports/ws.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bytes::Bytes;
12
use futures::{future::ok, Sink, StreamExt, TryFutureExt};
23
use log::debug;
34
use std::{
@@ -101,7 +102,7 @@ impl TransportDial for WsTransport {
101102
pub struct WsStream {
102103
inner: WebSocketStream<TcpStream>,
103104
recv_buf: Vec<u8>,
104-
pending_ping: Option<Vec<u8>>,
105+
pending_ping: Option<Bytes>,
105106
already_send_close: bool,
106107
}
107108

@@ -182,7 +183,7 @@ impl AsyncRead for WsStream {
182183
match self.inner.poll_next_unpin(cx) {
183184
Poll::Ready(Some(Ok(t))) => {
184185
let data = match t {
185-
Message::Binary(data) => data,
186+
Message::Binary(data) => data.to_vec(),
186187
Message::Close(_) => return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
187188
Message::Ping(data) => {
188189
self.pending_ping = Some(data);
@@ -239,7 +240,7 @@ impl AsyncWrite for WsStream {
239240
match sink.as_mut().poll_ready(cx) {
240241
Poll::Ready(Ok(_)) => {
241242
sink.as_mut()
242-
.start_send(Message::Binary(buf.to_vec()))
243+
.start_send(Message::Binary(buf.to_vec().into()))
243244
.map_err::<io::Error, _>(|_| Into::into(io::ErrorKind::BrokenPipe))?;
244245
let _ignore = sink
245246
.as_mut()

0 commit comments

Comments
 (0)