clu_middleware_tron/tcp.rs
1/*
2 * @Author: José Sánchez-Gallego (gallegoj@uw.edu)
3 * @Date: 2025-11-21
4 * @Filename: tcp.rs
5 * @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
6 */
7
8use std::time::Duration;
9
10use async_channel::{Receiver, Sender};
11use bytes::BytesMut;
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
13use tokio::net::TcpStream;
14use tokio::time::sleep;
15
16use crate::parser::{Reply, parse_reply};
17
18/// Configuration options for the TCP client.
19pub struct TCPClientConfig {
20 /// Hostname or IP address of the TCP server.
21 pub host: String,
22 /// Port number of the TCP server.
23 pub port: u16,
24 /// Whether the client should attempt to reconnect on disconnection.
25 pub reconnect: bool,
26 /// Delay in seconds before attempting to reconnect.
27 pub reconnect_delay: f32,
28 /// Propagate parsed message to the RabbitMQ exchange.
29 pub propagate_to_rabbitmq: bool,
30}
31
32impl Default for TCPClientConfig {
33 /// Creates a default TCP client configuration.
34 fn default() -> Self {
35 Self {
36 host: String::from("127.0.0.1"),
37 port: 8080,
38 reconnect: false,
39 reconnect_delay: 5.0,
40 propagate_to_rabbitmq: false,
41 }
42 }
43}
44
45/// Starts a TCP client that connects to the specified host and port,
46/// reads incoming messages, and processes them using the [parse_reply] function.
47///
48/// # Arguments
49///
50/// * `config` - Configuration options for the TCP client.
51/// * `tcp_receiver` - An async channel receiver for sending messages to the TCP server.
52/// * `rabbitmq_sender` - An async channel sender for propagating parsed messages to RabbitMQ.
53///
54pub async fn start_tcp_client(
55 config: TCPClientConfig,
56 tcp_receiver: Receiver<BytesMut>,
57 rabbitmq_sender: Sender<Reply>,
58) -> Result<(), String> {
59 // Main connection loop. Reconnects if the connection is lost and reconnect is enabled.
60 loop {
61 // Attempt to connect to the TCP server.
62 let stream = match TcpStream::connect((config.host.as_str(), config.port)).await {
63 Ok(s) => {
64 log::debug!("Connected to TCP server at {}:{}", config.host, config.port);
65 s
66 }
67 Err(e) => {
68 log::error!(
69 "Failed to connect to TCP server at {}:{}: {}",
70 config.host,
71 config.port,
72 e
73 );
74
75 if config.reconnect {
76 log::warn!("Reconnecting in {} seconds...", config.reconnect_delay);
77 sleep(Duration::from_secs_f32(config.reconnect_delay)).await;
78 continue;
79 } else {
80 return Err("Failed to connect to TCP server".to_string());
81 }
82 }
83 };
84
85 // Split the stream into reader and writer and create buffered versions.
86 let (reader, writer) = stream.into_split();
87 let mut reader = BufReader::new(reader);
88 let mut writer = BufWriter::new(writer);
89
90 // Clone the receiver for the sending task. This is necessary because we are in a loop
91 // and if we reconnect the original receiver would be moved.
92 let tcp_receiver_clone = tcp_receiver.clone();
93
94 // Monitor the TCP queue for commands to send to the actor.
95 tokio::spawn(async move {
96 while let Ok(message) = tcp_receiver_clone.recv().await {
97 log::debug!("Processing command to send to actor: {:?}", message);
98 let message_lf = [message.as_ref(), b"\n"].concat();
99 if let Err(e) = writer.write_all(&message_lf).await {
100 log::error!("Failed to send command to actor: {}", e);
101 continue;
102 }
103 if let Err(e) = writer.flush().await {
104 log::error!("Failed to flush command to actor: {}", e);
105 continue;
106 }
107 log::debug!("Command sent to actor: {:?}", message);
108 }
109 });
110
111 // Read from the actor TCP stream. Parse replies from the actor and
112 // propagate them to RabbitMQ if configured.
113 loop {
114 // Read a line from the TCP stream. Handle EOF and errors.
115 let mut line: Vec<u8> = Vec::new();
116 match reader.read_until(b'\n', &mut line).await {
117 Ok(0) => {
118 log::debug!("Connection closed by client");
119 if config.reconnect {
120 log::warn!("Reconnecting in {} seconds...", config.reconnect_delay);
121 sleep(Duration::from_secs_f32(config.reconnect_delay)).await;
122 break; // Break to outer loop to reconnect
123 }
124 return Ok(()); // connection closed
125 }
126
127 Ok(_) => {
128 // Successfully read a line. Parse and process it.
129
130 // Strip newline characters
131 while let Some(&last) = line.last() {
132 if last == b'\n' || last == b'\r' {
133 line.pop();
134 } else {
135 break;
136 }
137 }
138
139 log::debug!(
140 "Received reply from {}:{}: {:?}",
141 config.host,
142 config.port,
143 bytes::Bytes::from(line.clone())
144 );
145
146 if let Some(reply) = parse_reply(&line) {
147 log::info!(
148 "Parsed reply: user_id={}, command_id={}, code={}, keywords={}",
149 reply.user_id,
150 reply.command_id,
151 reply.code,
152 serde_json::to_string(&reply.keywords).unwrap()
153 );
154
155 if config.propagate_to_rabbitmq {
156 log::debug!("Sending reply to RabbitMQ service for processing.");
157 log::debug!("Reply: {:?}", reply);
158 if let Err(e) = rabbitmq_sender.send(reply).await {
159 log::error!("Failed to send reply to RabbitMQ queue: {}", e);
160 } else {
161 log::debug!("Reply sent to RabbitMQ service.");
162 }
163 }
164 } else {
165 log::warn!(
166 "Failed to parse reply from {}:{}: {:?}",
167 config.host,
168 config.port,
169 bytes::Bytes::from(line.clone())
170 );
171 continue;
172 }
173 }
174
175 Err(e) => {
176 log::error!("Failed to read from stream: {}", e);
177 if config.reconnect {
178 break;
179 } else {
180 return Err(e.to_string());
181 }
182 }
183 }
184 }
185 }
186}