clu_middleware_tron/
tcp.rs

1/*
2 *  @Author: José Sánchez-Gallego (gallegoj@uw.edu)
3 *  @Date: 2025-11-21
4 *  @Filename: tcp.rs
5 *  @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
6 */
7
8use std::time::Duration;
9
10use async_channel::{Receiver, Sender};
11use bytes::BytesMut;
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
13use tokio::net::TcpStream;
14use tokio::time::sleep;
15
16use crate::parser::{Reply, parse_reply};
17
18/// Configuration options for the TCP client.
19pub struct TCPClientConfig {
20    /// Hostname or IP address of the TCP server.
21    pub host: String,
22    /// Port number of the TCP server.
23    pub port: u16,
24    /// Whether the client should attempt to reconnect on disconnection.
25    pub reconnect: bool,
26    /// Delay in seconds before attempting to reconnect.
27    pub reconnect_delay: f32,
28    /// Propagate parsed message to the RabbitMQ exchange.
29    pub propagate_to_rabbitmq: bool,
30}
31
32impl Default for TCPClientConfig {
33    /// Creates a default TCP client configuration.
34    fn default() -> Self {
35        Self {
36            host: String::from("127.0.0.1"),
37            port: 8080,
38            reconnect: false,
39            reconnect_delay: 5.0,
40            propagate_to_rabbitmq: false,
41        }
42    }
43}
44
45/// Starts a TCP client that connects to the specified host and port,
46/// reads incoming messages, and processes them using the [parse_reply] function.
47///
48/// # Arguments
49///
50/// * `config` - Configuration options for the TCP client.
51/// * `tcp_receiver` - An async channel receiver for sending messages to the TCP server.
52/// * `rabbitmq_sender` - An async channel sender for propagating parsed messages to RabbitMQ.
53///
54pub async fn start_tcp_client(
55    config: TCPClientConfig,
56    tcp_receiver: Receiver<BytesMut>,
57    rabbitmq_sender: Sender<Reply>,
58) -> Result<(), String> {
59    // Main connection loop. Reconnects if the connection is lost and reconnect is enabled.
60    loop {
61        // Attempt to connect to the TCP server.
62        let stream = match TcpStream::connect((config.host.as_str(), config.port)).await {
63            Ok(s) => {
64                log::debug!("Connected to TCP server at {}:{}", config.host, config.port);
65                s
66            }
67            Err(e) => {
68                log::error!(
69                    "Failed to connect to TCP server at {}:{}: {}",
70                    config.host,
71                    config.port,
72                    e
73                );
74
75                if config.reconnect {
76                    log::warn!("Reconnecting in {} seconds...", config.reconnect_delay);
77                    sleep(Duration::from_secs_f32(config.reconnect_delay)).await;
78                    continue;
79                } else {
80                    return Err("Failed to connect to TCP server".to_string());
81                }
82            }
83        };
84
85        // Split the stream into reader and writer and create buffered versions.
86        let (reader, writer) = stream.into_split();
87        let mut reader = BufReader::new(reader);
88        let mut writer = BufWriter::new(writer);
89
90        // Clone the receiver for the sending task. This is necessary because we are in a loop
91        // and if we reconnect the original receiver would be moved.
92        let tcp_receiver_clone = tcp_receiver.clone();
93
94        // Monitor the TCP queue for commands to send to the actor.
95        tokio::spawn(async move {
96            while let Ok(message) = tcp_receiver_clone.recv().await {
97                log::debug!("Processing command to send to actor: {:?}", message);
98                let message_lf = [message.as_ref(), b"\n"].concat();
99                if let Err(e) = writer.write_all(&message_lf).await {
100                    log::error!("Failed to send command to actor: {}", e);
101                    continue;
102                }
103                if let Err(e) = writer.flush().await {
104                    log::error!("Failed to flush command to actor: {}", e);
105                    continue;
106                }
107                log::debug!("Command sent to actor: {:?}", message);
108            }
109        });
110
111        // Read from the actor TCP stream. Parse replies from the actor and
112        // propagate them to RabbitMQ if configured.
113        loop {
114            // Read a line from the TCP stream. Handle EOF and errors.
115            let mut line: Vec<u8> = Vec::new();
116            match reader.read_until(b'\n', &mut line).await {
117                Ok(0) => {
118                    log::debug!("Connection closed by client");
119                    if config.reconnect {
120                        log::warn!("Reconnecting in {} seconds...", config.reconnect_delay);
121                        sleep(Duration::from_secs_f32(config.reconnect_delay)).await;
122                        break; // Break to outer loop to reconnect
123                    }
124                    return Ok(()); // connection closed
125                }
126
127                Ok(_) => {
128                    // Successfully read a line. Parse and process it.
129
130                    // Strip newline characters
131                    while let Some(&last) = line.last() {
132                        if last == b'\n' || last == b'\r' {
133                            line.pop();
134                        } else {
135                            break;
136                        }
137                    }
138
139                    log::debug!(
140                        "Received reply from {}:{}: {:?}",
141                        config.host,
142                        config.port,
143                        bytes::Bytes::from(line.clone())
144                    );
145
146                    if let Some(reply) = parse_reply(&line) {
147                        log::info!(
148                            "Parsed reply: user_id={}, command_id={}, code={}, keywords={}",
149                            reply.user_id,
150                            reply.command_id,
151                            reply.code,
152                            serde_json::to_string(&reply.keywords).unwrap()
153                        );
154
155                        if config.propagate_to_rabbitmq {
156                            log::debug!("Sending reply to RabbitMQ service for processing.");
157                            log::debug!("Reply: {:?}", reply);
158                            if let Err(e) = rabbitmq_sender.send(reply).await {
159                                log::error!("Failed to send reply to RabbitMQ queue: {}", e);
160                            } else {
161                                log::debug!("Reply sent to RabbitMQ service.");
162                            }
163                        }
164                    } else {
165                        log::warn!(
166                            "Failed to parse reply from {}:{}: {:?}",
167                            config.host,
168                            config.port,
169                            bytes::Bytes::from(line.clone())
170                        );
171                        continue;
172                    }
173                }
174
175                Err(e) => {
176                    log::error!("Failed to read from stream: {}", e);
177                    if config.reconnect {
178                        break;
179                    } else {
180                        return Err(e.to_string());
181                    }
182                }
183            }
184        }
185    }
186}