Skip to content

Commit

Permalink
ios: add non-blocking l2cap read and write APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
oriolsr committed Aug 22, 2024
1 parent cff0f2a commit 36243ed
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 23 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ serde = { version = "1.0.143", optional = true, features = ["derive"] }
tracing = { version = "0.1.36", default-features = false }

[dev-dependencies]
tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread", "time"] }
tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread", "time", "sync"] }
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }

[target.'cfg(not(target_os = "linux"))'.dependencies]
Expand Down Expand Up @@ -59,6 +59,7 @@ java-spaghetti = "0.2.0"
async-channel = "2.2.0"

[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
async-channel = "2.2.0"
async-broadcast = "0.5.1"
objc = "0.2.7"
objc_id = "0.1.1"
Expand Down
13 changes: 9 additions & 4 deletions src/corebluetooth/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use futures_core::Stream;
use futures_lite::StreamExt;
use objc_foundation::{INSArray, INSFastEnumeration, INSString, NSArray};
use objc_id::ShareId;
use tracing::debug;

use super::delegates::{PeripheralDelegate, PeripheralEvent};
use super::l2cap_channel::{L2capChannelReader, L2capChannelWriter};
use super::types::{CBPeripheral, CBL2CAPChannel, CBPeripheralState, CBService, CBUUID};
use super::types::{CBPeripheral, CBPeripheralState, CBService, CBUUID};
use crate::device::ServicesChanged;
use crate::error::ErrorKind;
use crate::pairing::PairingAgent;
Expand Down Expand Up @@ -227,6 +228,7 @@ impl DeviceImpl {
return Err(ErrorKind::NotConnected.into());
}

debug!("open_l2cap_channel {:?}", self.peripheral);
self.peripheral.open_l2cap_channel(psm);

let l2capchannel = loop {
Expand All @@ -244,9 +246,12 @@ impl DeviceImpl {
}
};

// Here you would implement the conversion from CBL2CAPChannel to L2capChannelReader and L2capChannelWriter
// For now, we'll return a placeholder implementation
Err(ErrorKind::NotSupported.into())
debug!("open_l2cap_channel success {:?}", self.peripheral);

let reader = L2capChannelReader::new(l2capchannel.clone());
let writer = L2capChannelWriter::new(l2capchannel);

Ok((reader, writer))
}
}

Expand Down
183 changes: 171 additions & 12 deletions src/corebluetooth/l2cap_channel.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,99 @@
use std::fmt;
use std::sync::Arc;
use async_channel::{Receiver, Sender, TryRecvError, TrySendError};
use std::thread;
use std::time::Duration;

use crate::Result;
use super::types::{CBL2CAPChannel, NSStreamStatus}; // Adjust the import based on your project structure
use crate::error::{Error, ErrorKind};
use objc_id::{Id, Shared};
use tracing::debug;

/// Utility struct to close the channel on drop.
pub(super) struct L2capCloser {
channel: Id<CBL2CAPChannel, Shared>,
}

impl L2capCloser {
fn close(&self) {
self.channel.input_stream().close();
self.channel.output_stream().close();
}
}

impl Drop for L2capCloser {
fn drop(&mut self) {
self.close()
}
}

/// The reader side of an L2CAP channel.
pub struct L2capChannelReader {
_private: (),
stream: Receiver<Vec<u8>>,
closer: Arc<L2capCloser>,
}

impl L2capChannelReader {
/// Creates a new L2capChannelReader.
pub fn new(channel: Id<CBL2CAPChannel, Shared>) -> Self {
let (sender, receiver) = async_channel::bounded(16);
let closer = Arc::new(L2capCloser { channel: channel.clone() });

channel.input_stream().open();

thread::spawn(move || read_loop(channel, sender));

Self {
stream: receiver,
closer,
}
}

/// Reads data from the L2CAP channel into the provided buffer.
#[inline]
pub async fn read(&mut self, _buf: &mut [u8]) -> Result<usize> {
todo!()
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let packet = self.stream
.recv()
.await
.map_err(|_| Error::new(ErrorKind::ConnectionFailed, None, "channel is closed".to_string()))?;

if packet.len() > buf.len() {
return Err(Error::new(
ErrorKind::InvalidParameter,
None,
"Buffer is too small".to_string(),
));
}

buf[..packet.len()].copy_from_slice(&packet);
Ok(packet.len())
}

pub fn try_read(&mut self, _buf: &mut [u8]) -> Result<usize> {
todo!()
/// Attempts to read data from the L2CAP channel into the provided buffer without blocking.
#[inline]
pub fn try_read(&mut self, buf: &mut [u8]) -> Result<usize> {
let packet = self.stream.try_recv().map_err(|e| match e {
TryRecvError::Empty => Error::new(ErrorKind::NotReady, None, "no received packet in queue".to_string()),
TryRecvError::Closed => Error::new(ErrorKind::ConnectionFailed, None, "channel is closed".to_string()),
})?;

if packet.len() > buf.len() {
return Err(Error::new(
ErrorKind::InvalidParameter,
None,
"Buffer is too small".to_string(),
));
}

buf[..packet.len()].copy_from_slice(&packet);
Ok(packet.len())
}

/// Closes the L2CAP channel reader.
pub async fn close(&mut self) -> Result<()> {
todo!()
self.closer.close();
Ok(())
}
}

Expand All @@ -27,21 +103,49 @@ impl fmt::Debug for L2capChannelReader {
}
}

/// The writer side of an L2CAP channel.
pub struct L2capChannelWriter {
_private: (),
stream: Sender<Vec<u8>>,
closer: Arc<L2capCloser>,
}

impl L2capChannelWriter {
pub async fn write(&mut self, _packet: &[u8]) -> Result<()> {
todo!()
/// Creates a new L2capChannelWriter.
pub fn new(channel: Id<CBL2CAPChannel, Shared>) -> Self {
let (sender, receiver) = async_channel::bounded(16);
let closer = Arc::new(L2capCloser { channel: channel.clone() });

channel.output_stream().open();

thread::spawn(move || write_loop(channel, receiver));

Self {
stream: sender,
closer,
}

}

/// Writes data to the L2CAP channel.
pub async fn write(&mut self, packet: &[u8]) -> Result<()> {
self.stream
.send(packet.to_vec())
.await
.map_err(|_| Error::new(ErrorKind::ConnectionFailed, None, "channel is closed".to_string()))
}

pub fn try_write(&mut self, _packet: &[u8]) -> Result<()> {
todo!()
/// Attempts to write data to the L2CAP channel without blocking.
pub fn try_write(&mut self, packet: &[u8]) -> Result<()> {
self.stream.try_send(packet.to_vec()).map_err(|e| match e {
TrySendError::Closed(_) => Error::new(ErrorKind::ConnectionFailed, None, "channel is closed".to_string()),
TrySendError::Full(_) => Error::new(ErrorKind::NotReady, None, "No buffer space for write".to_string()),
})
}

/// Closes the L2CAP channel writer.
pub async fn close(&mut self) -> Result<()> {
todo!()
self.closer.close();
Ok(())
}
}

Expand All @@ -50,3 +154,58 @@ impl fmt::Debug for L2capChannelWriter {
f.write_str("L2capChannelWriter")
}
}

/// Continuously reads from the L2CAP channel and sends the data through the provided sender.
///
/// This function is used to handle the asynchronous nature of L2CAP communication.
/// It runs in a separate thread to continuously poll the input stream for new data,
/// allowing the main thread to perform other tasks without blocking on I/O operations.
/// This approach improves responsiveness and efficiency in handling incoming data.
fn read_loop(channel: Id<CBL2CAPChannel, Shared>, sender: Sender<Vec<u8>>) {
#[allow(unused_mut)]
let mut buf = vec![0u8; 1024];
loop {
let stream_status = channel.input_stream().stream_status();
debug!("Read Loop Stream Status: {:?}", stream_status);

if stream_status == NSStreamStatus(0) || stream_status == NSStreamStatus(6) {
break;
}
if !channel.input_stream().has_bytes_available() || stream_status != NSStreamStatus(2) {
thread::sleep(Duration::from_millis(10));
continue;
}
let res = channel.input_stream().read(buf.as_ptr(), buf.len());
if res < 0 {
debug!("Read Loop Error: Stream read failed");
break;
}
let size = res.try_into().unwrap();
let packet = unsafe { Vec::<u8>::from_raw_parts(buf.as_ptr() as *mut u8, size, size) };
if sender.send_blocking(packet.to_vec()).is_err() {
debug!("Read Loop Error: Sender is closed");
break;
}
core::mem::forget(packet);
}
debug!("rx_thread_end");
}

/// Continuously receives data from the provided receiver and writes it to the L2CAP channel.
///
/// This function is used for managing outgoing data in a non-blocking manner.
/// By running in a separate thread, it allows the main application to queue up data
/// for sending without waiting for each write operation to complete. This improves
/// overall performance and responsiveness, especially when dealing with potentially
/// slow or unreliable Bluetooth connections.
fn write_loop(channel: Id<CBL2CAPChannel, Shared>, receiver: Receiver<Vec<u8>>) {
while let Ok(packet) = receiver.recv_blocking() {
let objc_packet = unsafe { core::slice::from_raw_parts(packet.as_ptr() as *const u8, packet.len()) };
let res = channel.output_stream().write(objc_packet, packet.len());
if res < 0 {
debug!("Write Loop Error: Stream write failed");
break;
}
}
debug!("tx_thread_end");
}
6 changes: 0 additions & 6 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,6 @@ impl Device {
self.0.services().await
}

/// Open l2cap channel given psm.
#[inline]
pub async fn open_l2cap_channel(&self, psm: u16) -> Result<objc_id::ShareId<sys::types::CBL2CAPChannel>> {
self.0.open_l2cap_channel(psm).await
}

/// Asynchronously blocks until a GATT services changed packet is received
///
/// # Platform specific
Expand Down

0 comments on commit 36243ed

Please sign in to comment.