Skip to content

Commit

Permalink
fix: clear cache before call
Browse files Browse the repository at this point in the history
  • Loading branch information
rise0chen committed May 30, 2024
1 parent a1feab1 commit b37931b
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 2 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ name = "tcp-server"
path = "examples/tcp-server.rs"
required-features = ["tcp-server"]

[[example]]
name = "timeout"
path = "examples/timeout.rs"
required-features = ["tcp-server"]

[[example]]
name = "rtu-over-tcp-server"
path = "examples/rtu-over-tcp-server.rs"
Expand Down
105 changes: 105 additions & 0 deletions examples/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::{net::SocketAddr, pin::Pin, time::Duration};

use futures::future;
use tokio::net::TcpListener;

use tokio_modbus::{
prelude::*,
server::tcp::{accept_tcp_connection, Server},
};

struct ExampleService {}

impl tokio_modbus::server::Service for ExampleService {
type Request = Request<'static>;
type Future = Pin<Box<dyn future::Future<Output = Result<Response, Exception>> + Send + Sync>>;

fn call(&self, req: Self::Request) -> Self::Future {
match req {
Request::ReadHoldingRegisters(_addr, cnt) => {
let data = vec![0; cnt as usize];
Box::pin(future::ready(Ok(Response::ReadHoldingRegisters(data))))
}
Request::WriteMultipleRegisters(addr, values) => Box::pin(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
Ok(Response::WriteMultipleRegisters(addr, values.len() as u16))
}),
Request::WriteSingleRegister(addr, value) => Box::pin(future::ready(Ok(
Response::WriteSingleRegister(addr, value),
))),
_ => {
println!("SERVER: Exception::IllegalFunction - Unimplemented function code in request: {req:?}");
// TODO: We want to return a Modbus Exception response `IllegalFunction`. https://github.com/slowtec/tokio-modbus/issues/165
Box::pin(future::ready(Err(Exception::IllegalFunction)))
}
}
}
}

impl ExampleService {
fn new() -> Self {
Self {}
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let socket_addr = "127.0.0.1:5502".parse().unwrap();

tokio::select! {
_ = server_context(socket_addr) => unreachable!(),
_ = client_context(socket_addr) => println!("Exiting"),
}

Ok(())
}

async fn server_context(socket_addr: SocketAddr) -> anyhow::Result<()> {
println!("Starting up server on {socket_addr}");
let listener = TcpListener::bind(socket_addr).await?;
let server = Server::new(listener);
let new_service = |_socket_addr| Ok(Some(ExampleService::new()));
let on_connected = |stream, socket_addr| async move {
accept_tcp_connection(stream, socket_addr, new_service)
};
let on_process_error = |err| {
eprintln!("{err}");
};
server.serve(&on_connected, on_process_error).await?;
Ok(())
}

async fn client_context(socket_addr: SocketAddr) {
let task = async {
// Give the server some time for starting up
tokio::time::sleep(Duration::from_secs(1)).await;

println!("CLIENT: Connecting client...");
let mut ctx = tcp::connect(socket_addr).await.unwrap();

let read_reg = tokio::time::timeout(
Duration::from_millis(200),
ctx.read_holding_registers(0x01, 2),
)
.await;
println!("{:?}", read_reg); // Should success

let write_mul = tokio::time::timeout(
Duration::from_millis(200),
ctx.write_multiple_registers(0x01, &[0]),
)
.await;
println!("{:?}", write_mul); // Should timeout
tokio::time::sleep(Duration::from_millis(500)).await; // wait finish

let write_sig = tokio::time::timeout(
Duration::from_millis(200),
ctx.write_single_register(0x01, 0),
)
.await;
println!("{:?}", write_sig); // Should success

println!("CLIENT: Done.")
};
tokio::join!(task, tokio::time::sleep(Duration::from_secs(5)));
}
11 changes: 10 additions & 1 deletion src/service/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

use std::{fmt, io};

use core::pin::Pin;
use core::task::{Context, Poll};
use futures_util::task::noop_waker_ref;
use futures_util::{SinkExt as _, StreamExt as _};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_util::codec::Framed;

use crate::{
Expand Down Expand Up @@ -53,6 +56,12 @@ where
let req_adu = self.next_request_adu(req, disconnect);
let req_hdr = req_adu.hdr;

let mut buf = [0];
while let Poll::Ready(Ok(())) = Pin::new(self.framed.get_mut()).poll_read(
&mut Context::from_waker(noop_waker_ref()),
&mut ReadBuf::new(&mut buf),
) {}
self.framed.write_buffer_mut().clear();
self.framed.read_buffer_mut().clear();

self.framed.send(req_adu).await?;
Expand Down
11 changes: 10 additions & 1 deletion src/service/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use std::{
sync::atomic::{AtomicU16, Ordering},
};

use core::pin::Pin;
use core::task::{Context, Poll};
use futures_util::task::noop_waker_ref;
use futures_util::{SinkExt as _, StreamExt as _};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_util::codec::Framed;

use crate::{
Expand Down Expand Up @@ -76,6 +79,12 @@ where
let req_adu = self.next_request_adu(req, disconnect);
let req_hdr = req_adu.hdr;

let mut buf = [0];
while let Poll::Ready(Ok(())) = Pin::new(self.framed.get_mut()).poll_read(
&mut Context::from_waker(noop_waker_ref()),
&mut ReadBuf::new(&mut buf),
) {}
self.framed.write_buffer_mut().clear();
self.framed.read_buffer_mut().clear();

self.framed.send(req_adu).await?;
Expand Down

0 comments on commit b37931b

Please sign in to comment.