commit b1d35d5ce12e805d319db40a3e6628d9240653e6 Author: ZeroZipp Date: Thu May 21 23:45:42 2026 +0200 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1b72444 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/Cargo.lock +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..34f4c37 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "client" +version = "0.1.0" +edition = "2024" + +[dependencies.serde] +features = ["derive"] +version = "1.0.228" + +[dependencies.tokio] +features = ["full"] +version = "1.52.3" + +[dependencies.tokio-tungstenite] +features = ["native-tls"] +version = "0.29.0" + +[dependencies] +env_logger = "0.11.10" +futures = "0.3.32" +log = "0.4.29" +serde_json = "1.0.150" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7d4f41a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +# Build stage +FROM rust:alpine AS builder +RUN apk add --no-cache pkgconfig +RUN apk add --no-cache openssl-dev musl-dev +RUN apk add --no-cache openssl-libs-static +ARG build="cargo build --release" +COPY Cargo.toml /app/Cargo.toml +COPY src /app/src +WORKDIR "/app" +RUN $build + +# Runtime stage +FROM alpine:latest +RUN apk add --no-cache libgcc +RUN apk add --no-cache openssl-libs-static +ARG binary="/app/target/release/client" +ARG target="/usr/local/bin/client" +COPY --from=builder $binary $target +WORKDIR "/root" +CMD ["client"] diff --git a/src/app/config.rs b/src/app/config.rs new file mode 100644 index 0000000..0ad3fd7 --- /dev/null +++ b/src/app/config.rs @@ -0,0 +1,60 @@ +pub struct Config { + pub token: String, + pub device_id: String, + pub base_url: String, + pub max_output_bytes: u64, + pub writer_channel_capacity: usize, +} + +impl Config { + const DEFAULT_BASE_URL: &str = "ws://127.0.0.1:8000"; + const DEFAULT_DEVICE_ID: &str = "device"; + const DEFAULT_MAX_OUTPUT_BYTES: u64 = 10 * 1024 * 1024; + const DEFAULT_WRITER_CHANNEL_CAPACITY: usize = 32; + + pub fn from_env() -> Result { + let token = std::env::var("TOKEN") + .map_err(|_| "TOKEN environment variable is required".to_string())?; + + let device_id = + std::env::var("DEVICE_ID").unwrap_or_else(|_| Self::DEFAULT_DEVICE_ID.to_string()); + validate_device_id(&device_id).map_err(|e| format!("Invalid DEVICE_ID: {e}"))?; + + let base_url = + std::env::var("BASE_URL").unwrap_or_else(|_| Self::DEFAULT_BASE_URL.to_string()); + + Ok(Self { + token, + device_id, + base_url, + max_output_bytes: parse_env("MAX_OUTPUT_BYTES", Self::DEFAULT_MAX_OUTPUT_BYTES), + writer_channel_capacity: parse_env( + "WRITER_CHANNEL_CAPACITY", + Self::DEFAULT_WRITER_CHANNEL_CAPACITY, + ), + }) + } +} + +fn parse_env(key: &str, default: T) -> T { + std::env::var(key) + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(default) +} + +fn validate_device_id(id: &str) -> Result<(), String> { + if id.is_empty() { + return Err("cannot be empty".into()); + } + if id.len() > 64 { + return Err("too long (max 64 characters)".into()); + } + if !id + .chars() + .all(|c| c.is_alphanumeric() || c == '-' || c == '_') + { + return Err("contains invalid characters (allowed: a-z A-Z 0-9 - _)".into()); + } + Ok(()) +} diff --git a/src/app/protocol.rs b/src/app/protocol.rs new file mode 100644 index 0000000..5b35312 --- /dev/null +++ b/src/app/protocol.rs @@ -0,0 +1,20 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ClientMsg { + ExecResult { + exec_id: String, + exit_code: i32, + stdout: String, + stderr: String, + }, +} + +#[derive(Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ServerMsg { + AuthOk, + AuthError { reason: String }, + Exec { exec_id: String, command: String }, +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..3aaef68 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,274 @@ +mod app { + pub mod config; + pub mod protocol; +} + +use std::{process::Stdio, sync::Arc, time::Duration}; + +use app::config::Config; +use app::protocol::{ClientMsg, ServerMsg}; +use futures::{ + SinkExt, StreamExt, + stream::{SplitSink, SplitStream}, +}; +use log::{error, info}; +use tokio::{ + io::AsyncRead, + io::AsyncReadExt, + process::Command, + sync::{mpsc, oneshot}, +}; +use tokio_tungstenite::{ + connect_async_with_config, + tungstenite::{Message, Utf8Bytes, client::IntoClientRequest, http::HeaderValue}, +}; + +type Sock = + tokio_tungstenite::WebSocketStream>; +type Writer = SplitSink; + +enum OutgoingMessage { + Application(ClientMsg), + Pong(Vec), +} + +#[tokio::main] +async fn main() { + env_logger::init(); + + let config = match Config::from_env() { + Ok(config) => Arc::new(config), + Err(e) => { + error!("Configuration error: {e}"); + std::process::exit(1); + } + }; + + loop { + match run_client(&config).await { + Ok(()) => info!("Disconnected. Reconnecting in 5s..."), + Err(e) => error!("Error: {e}. Reconnecting in 5s..."), + } + tokio::time::sleep(Duration::from_secs(5)).await; + } +} + +async fn run_client(config: &Arc) -> Result<(), String> { + let mut socket = connect_to_server(config).await?; + authenticate(&mut socket).await?; + + let (sink, stream) = socket.split(); + let (outgoing_tx, writer_closed) = spawn_writer_task(sink, config.writer_channel_capacity); + + process_messages(stream, outgoing_tx, writer_closed, config).await +} + +async fn connect_to_server(config: &Config) -> Result { + let url = format!("{}/connect", config.base_url); + let mut request = url.into_client_request().map_err(|e| e.to_string())?; + let headers = request.headers_mut(); + headers.insert( + "Authorization", + HeaderValue::from_str(&format!("Bearer {}", config.token)).map_err(|e| e.to_string())?, + ); + headers.insert( + "X-Device-ID", + HeaderValue::from_str(&config.device_id).map_err(|e| e.to_string())?, + ); + + let (socket, _) = connect_async_with_config(request, None, false) + .await + .map_err(|e| e.to_string())?; + info!("Connected."); + Ok(socket) +} + +async fn authenticate(socket: &mut Sock) -> Result<(), String> { + match read_protocol_message(socket).await? { + ServerMsg::AuthOk => { + info!("Authenticated."); + Ok(()) + } + ServerMsg::AuthError { reason } => Err(format!("Auth failed: {reason}")), + ServerMsg::Exec { .. } => Err("Unexpected message during auth".into()), + } +} + +async fn read_protocol_message(socket: &mut Sock) -> Result { + loop { + match socket.next().await { + Some(Ok(Message::Text(text))) => return parse_server_message(&text), + Some(Ok(Message::Ping(data))) => { + socket + .send(Message::Pong(data)) + .await + .map_err(|e| e.to_string())?; + } + Some(Ok(Message::Close(_))) | None => return Err("Connection closed".into()), + Some(Err(error)) => return Err(error.to_string()), + _ => {} + } + } +} + +fn parse_server_message(text: &str) -> Result { + serde_json::from_str(text).map_err(|e| e.to_string()) +} + +async fn process_messages( + mut stream: SplitStream, + outgoing_tx: mpsc::Sender, + mut writer_closed: oneshot::Receiver<()>, + config: &Arc, +) -> Result<(), String> { + loop { + tokio::select! { + _ = &mut writer_closed => return Err("Write half closed".into()), + next_message = stream.next() => { + handle_stream_message(next_message, &outgoing_tx, config.max_output_bytes).await?; + } + } + } +} + +fn spawn_writer_task( + sink: Writer, + capacity: usize, +) -> (mpsc::Sender, oneshot::Receiver<()>) { + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(capacity); + let (closed_tx, closed_rx) = oneshot::channel::<()>(); + + tokio::spawn(async move { + let mut sink = sink; + let _closed = closed_tx; + + while let Some(message) = outgoing_rx.recv().await { + if write_message(&mut sink, message).await.is_err() { + break; + } + } + }); + + (outgoing_tx, closed_rx) +} + +async fn write_message(sink: &mut Writer, message: OutgoingMessage) -> Result<(), String> { + match message { + OutgoingMessage::Application(message) => { + let text = serde_json::to_string(&message).map_err(|e| e.to_string())?; + sink.send(Message::Text(Utf8Bytes::from(text))) + .await + .map_err(|e| e.to_string()) + } + OutgoingMessage::Pong(payload) => sink + .send(Message::Pong(payload.into())) + .await + .map_err(|e| e.to_string()), + } +} + +async fn send_outgoing( + outgoing_tx: &mpsc::Sender, + message: OutgoingMessage, +) -> Result<(), String> { + outgoing_tx.send(message).await.map_err(|e| e.to_string()) +} + +async fn handle_stream_message( + message: Option>, + outgoing_tx: &mpsc::Sender, + max_output_bytes: u64, +) -> Result<(), String> { + match message { + Some(Ok(Message::Text(text))) => { + handle_server_message(parse_server_message(&text)?, outgoing_tx, max_output_bytes).await + } + Some(Ok(Message::Ping(payload))) => { + send_outgoing(outgoing_tx, OutgoingMessage::Pong(payload.to_vec())).await + } + Some(Ok(Message::Close(_))) | None => Err("Connection closed".into()), + Some(Err(error)) => Err(error.to_string()), + Some(Ok(_)) => Ok(()), + } +} + +async fn handle_server_message( + message: ServerMsg, + outgoing_tx: &mpsc::Sender, + max_output_bytes: u64, +) -> Result<(), String> { + match message { + ServerMsg::Exec { exec_id, command } => { + spawn_command_task(exec_id, command, outgoing_tx.clone(), max_output_bytes); + Ok(()) + } + ServerMsg::AuthError { reason } => Err(reason), + ServerMsg::AuthOk => Ok(()), + } +} + +fn spawn_command_task( + exec_id: String, + command: String, + outgoing_tx: mpsc::Sender, + max_output_bytes: u64, +) { + tokio::spawn(async move { + info!("Executing [{exec_id}]: {command}"); + let (exit_code, stdout, stderr) = execute_command(&command, max_output_bytes).await; + let result = ClientMsg::ExecResult { + exec_id, + exit_code, + stdout, + stderr, + }; + + if let Err(error) = send_outgoing(&outgoing_tx, OutgoingMessage::Application(result)).await + { + error!("Failed to send command result: {error}"); + } + }); +} + +async fn execute_command(command: &str, max_output_bytes: u64) -> (i32, String, String) { + let mut child = match Command::new("sh") + .arg("-c") + .arg(command) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Ok(c) => c, + Err(e) => return (-1, String::new(), e.to_string()), + }; + + let Some(stdout) = child.stdout.take() else { + return (-1, String::new(), "stdout pipe unavailable".to_string()); + }; + let Some(stderr) = child.stderr.take() else { + return (-1, String::new(), "stderr pipe unavailable".to_string()); + }; + + let (stdout_data, stderr_data) = tokio::join!( + read_limited_output(stdout, max_output_bytes), + read_limited_output(stderr, max_output_bytes), + ); + + child.kill().await.ok(); + let exit_code = child.wait().await.ok().and_then(|s| s.code()).unwrap_or(-1); + + ( + exit_code, + String::from_utf8_lossy(&stdout_data).into_owned(), + String::from_utf8_lossy(&stderr_data).into_owned(), + ) +} + +async fn read_limited_output(reader: R, max_output_bytes: u64) -> Vec +where + R: AsyncRead + Unpin, +{ + let mut buffer = Vec::new(); + let _ = reader.take(max_output_bytes).read_to_end(&mut buffer).await; + buffer +}