This commit is contained in:
spacelin 2021-01-08 20:20:31 +08:00
parent 57a3e34cbb
commit 7f8f042814

View File

@ -10,7 +10,8 @@ use futures::{
channel::mpsc,
io::{AsyncRead, AsyncWrite},
ready,
stream::{IntoAsyncRead, TryStreamExt},
sink::Sink,
stream::{BoxStream, IntoAsyncRead, Stream, StreamExt, TryStreamExt},
};
use gloo_events::EventListener;
use js_sys::Reflect;
@ -26,7 +27,7 @@ use wasm_bindgen::{prelude::*, JsCast};
use wasm_bindgen_futures::JsFuture;
use wasm_streams::{
readable::{IntoStream, ReadableStream},
WritableStream,
writable::{IntoSink, WritableStream},
};
use web_sys::{window, EventTarget};
@ -44,8 +45,8 @@ type Item = io::Result<Vec<u8>>;
pub struct AsyncSerial {
// receiver: IntoAsyncRead<mpsc::Receiver<Item>>,
readable: IntoAsyncRead<IntoStream<'static>>,
sender: mpsc::Sender<Item>,
readable: IntoAsyncRead<BoxStream<'static, Item>>,
writable: IntoSink<'static>,
port: binding::SerialPort,
signals: binding::SerialOutputSignals,
timeout: Duration,
@ -80,14 +81,15 @@ impl AsyncSerial {
let _on_disconnect =
EventListener::new(&target, "disconnect", move |_event| log("on disconnect!"));
let readable = Reflect::get(&port, "readable".into()).expect("readable");
let writable = Reflect::get(&port, "writable".into()).expect("writable");
let readable = Reflect::get(&port, &"readable".into()).expect("readable");
let writable = Reflect::get(&port, &"writable".into()).expect("writable");
let readable = ReadableStream::from_raw(readable.unchecked_into());
let writable = WritableStream::from_raw(writable.unchecked_into());
Ok(AsyncSerial {
sender,
readable: readable.into_stream().map_ok(),
writable: writable.into_sink(),
readable: readable.into_stream().boxed().into_async_read(),
port,
signals: binding::SerialOutputSignals::new(),
timeout: Duration::from_secs(1),
@ -148,11 +150,11 @@ impl AsyncWrite for AsyncSerial {
) -> Poll<io::Result<usize>> {
if !self.writing {
self.writing = true;
self.readable
self.writable
.start_send(Ok(Vec::from(buf)))
.map_err(|_| Into::<io::Error>::into(io::ErrorKind::BrokenPipe))?
}
let r = ready!(self.readable.poll_ready(cx));
let r = ready!(self.writable.poll_ready(cx));
r.map_err(|_| Into::<io::Error>::into(io::ErrorKind::BrokenPipe))?;
self.writing = false;
Poll::Ready(Ok(buf.len()))