first commit

This commit is contained in:
2026-05-21 23:45:42 +02:00
commit b1d35d5ce1
6 changed files with 398 additions and 0 deletions
+2
View File
@@ -0,0 +1,2 @@
/Cargo.lock
/target
+22
View File
@@ -0,0 +1,22 @@
[package]
name = "client"
version = "0.1.0"
edition = "2024"
[dependencies.serde]
features = ["derive"]
version = "1.0.228"
[dependencies.tokio]
features = ["full"]
version = "1.52.3"
[dependencies.tokio-tungstenite]
features = ["native-tls"]
version = "0.29.0"
[dependencies]
env_logger = "0.11.10"
futures = "0.3.32"
log = "0.4.29"
serde_json = "1.0.150"
+20
View File
@@ -0,0 +1,20 @@
# Build stage
FROM rust:alpine AS builder
RUN apk add --no-cache pkgconfig
RUN apk add --no-cache openssl-dev musl-dev
RUN apk add --no-cache openssl-libs-static
ARG build="cargo build --release"
COPY Cargo.toml /app/Cargo.toml
COPY src /app/src
WORKDIR "/app"
RUN $build
# Runtime stage
FROM alpine:latest
RUN apk add --no-cache libgcc
RUN apk add --no-cache openssl-libs-static
ARG binary="/app/target/release/client"
ARG target="/usr/local/bin/client"
COPY --from=builder $binary $target
WORKDIR "/root"
CMD ["client"]
+60
View File
@@ -0,0 +1,60 @@
pub struct Config {
pub token: String,
pub device_id: String,
pub base_url: String,
pub max_output_bytes: u64,
pub writer_channel_capacity: usize,
}
impl Config {
const DEFAULT_BASE_URL: &str = "ws://127.0.0.1:8000";
const DEFAULT_DEVICE_ID: &str = "device";
const DEFAULT_MAX_OUTPUT_BYTES: u64 = 10 * 1024 * 1024;
const DEFAULT_WRITER_CHANNEL_CAPACITY: usize = 32;
pub fn from_env() -> Result<Self, String> {
let token = std::env::var("TOKEN")
.map_err(|_| "TOKEN environment variable is required".to_string())?;
let device_id =
std::env::var("DEVICE_ID").unwrap_or_else(|_| Self::DEFAULT_DEVICE_ID.to_string());
validate_device_id(&device_id).map_err(|e| format!("Invalid DEVICE_ID: {e}"))?;
let base_url =
std::env::var("BASE_URL").unwrap_or_else(|_| Self::DEFAULT_BASE_URL.to_string());
Ok(Self {
token,
device_id,
base_url,
max_output_bytes: parse_env("MAX_OUTPUT_BYTES", Self::DEFAULT_MAX_OUTPUT_BYTES),
writer_channel_capacity: parse_env(
"WRITER_CHANNEL_CAPACITY",
Self::DEFAULT_WRITER_CHANNEL_CAPACITY,
),
})
}
}
fn parse_env<T: std::str::FromStr>(key: &str, default: T) -> T {
std::env::var(key)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn validate_device_id(id: &str) -> Result<(), String> {
if id.is_empty() {
return Err("cannot be empty".into());
}
if id.len() > 64 {
return Err("too long (max 64 characters)".into());
}
if !id
.chars()
.all(|c| c.is_alphanumeric() || c == '-' || c == '_')
{
return Err("contains invalid characters (allowed: a-z A-Z 0-9 - _)".into());
}
Ok(())
}
+20
View File
@@ -0,0 +1,20 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ClientMsg {
ExecResult {
exec_id: String,
exit_code: i32,
stdout: String,
stderr: String,
},
}
#[derive(Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ServerMsg {
AuthOk,
AuthError { reason: String },
Exec { exec_id: String, command: String },
}
+274
View File
@@ -0,0 +1,274 @@
mod app {
pub mod config;
pub mod protocol;
}
use std::{process::Stdio, sync::Arc, time::Duration};
use app::config::Config;
use app::protocol::{ClientMsg, ServerMsg};
use futures::{
SinkExt, StreamExt,
stream::{SplitSink, SplitStream},
};
use log::{error, info};
use tokio::{
io::AsyncRead,
io::AsyncReadExt,
process::Command,
sync::{mpsc, oneshot},
};
use tokio_tungstenite::{
connect_async_with_config,
tungstenite::{Message, Utf8Bytes, client::IntoClientRequest, http::HeaderValue},
};
type Sock =
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
type Writer = SplitSink<Sock, Message>;
enum OutgoingMessage {
Application(ClientMsg),
Pong(Vec<u8>),
}
#[tokio::main]
async fn main() {
env_logger::init();
let config = match Config::from_env() {
Ok(config) => Arc::new(config),
Err(e) => {
error!("Configuration error: {e}");
std::process::exit(1);
}
};
loop {
match run_client(&config).await {
Ok(()) => info!("Disconnected. Reconnecting in 5s..."),
Err(e) => error!("Error: {e}. Reconnecting in 5s..."),
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
async fn run_client(config: &Arc<Config>) -> Result<(), String> {
let mut socket = connect_to_server(config).await?;
authenticate(&mut socket).await?;
let (sink, stream) = socket.split();
let (outgoing_tx, writer_closed) = spawn_writer_task(sink, config.writer_channel_capacity);
process_messages(stream, outgoing_tx, writer_closed, config).await
}
async fn connect_to_server(config: &Config) -> Result<Sock, String> {
let url = format!("{}/connect", config.base_url);
let mut request = url.into_client_request().map_err(|e| e.to_string())?;
let headers = request.headers_mut();
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {}", config.token)).map_err(|e| e.to_string())?,
);
headers.insert(
"X-Device-ID",
HeaderValue::from_str(&config.device_id).map_err(|e| e.to_string())?,
);
let (socket, _) = connect_async_with_config(request, None, false)
.await
.map_err(|e| e.to_string())?;
info!("Connected.");
Ok(socket)
}
async fn authenticate(socket: &mut Sock) -> Result<(), String> {
match read_protocol_message(socket).await? {
ServerMsg::AuthOk => {
info!("Authenticated.");
Ok(())
}
ServerMsg::AuthError { reason } => Err(format!("Auth failed: {reason}")),
ServerMsg::Exec { .. } => Err("Unexpected message during auth".into()),
}
}
async fn read_protocol_message(socket: &mut Sock) -> Result<ServerMsg, String> {
loop {
match socket.next().await {
Some(Ok(Message::Text(text))) => return parse_server_message(&text),
Some(Ok(Message::Ping(data))) => {
socket
.send(Message::Pong(data))
.await
.map_err(|e| e.to_string())?;
}
Some(Ok(Message::Close(_))) | None => return Err("Connection closed".into()),
Some(Err(error)) => return Err(error.to_string()),
_ => {}
}
}
}
fn parse_server_message(text: &str) -> Result<ServerMsg, String> {
serde_json::from_str(text).map_err(|e| e.to_string())
}
async fn process_messages(
mut stream: SplitStream<Sock>,
outgoing_tx: mpsc::Sender<OutgoingMessage>,
mut writer_closed: oneshot::Receiver<()>,
config: &Arc<Config>,
) -> Result<(), String> {
loop {
tokio::select! {
_ = &mut writer_closed => return Err("Write half closed".into()),
next_message = stream.next() => {
handle_stream_message(next_message, &outgoing_tx, config.max_output_bytes).await?;
}
}
}
}
fn spawn_writer_task(
sink: Writer,
capacity: usize,
) -> (mpsc::Sender<OutgoingMessage>, oneshot::Receiver<()>) {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(capacity);
let (closed_tx, closed_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
let mut sink = sink;
let _closed = closed_tx;
while let Some(message) = outgoing_rx.recv().await {
if write_message(&mut sink, message).await.is_err() {
break;
}
}
});
(outgoing_tx, closed_rx)
}
async fn write_message(sink: &mut Writer, message: OutgoingMessage) -> Result<(), String> {
match message {
OutgoingMessage::Application(message) => {
let text = serde_json::to_string(&message).map_err(|e| e.to_string())?;
sink.send(Message::Text(Utf8Bytes::from(text)))
.await
.map_err(|e| e.to_string())
}
OutgoingMessage::Pong(payload) => sink
.send(Message::Pong(payload.into()))
.await
.map_err(|e| e.to_string()),
}
}
async fn send_outgoing(
outgoing_tx: &mpsc::Sender<OutgoingMessage>,
message: OutgoingMessage,
) -> Result<(), String> {
outgoing_tx.send(message).await.map_err(|e| e.to_string())
}
async fn handle_stream_message(
message: Option<Result<Message, tokio_tungstenite::tungstenite::Error>>,
outgoing_tx: &mpsc::Sender<OutgoingMessage>,
max_output_bytes: u64,
) -> Result<(), String> {
match message {
Some(Ok(Message::Text(text))) => {
handle_server_message(parse_server_message(&text)?, outgoing_tx, max_output_bytes).await
}
Some(Ok(Message::Ping(payload))) => {
send_outgoing(outgoing_tx, OutgoingMessage::Pong(payload.to_vec())).await
}
Some(Ok(Message::Close(_))) | None => Err("Connection closed".into()),
Some(Err(error)) => Err(error.to_string()),
Some(Ok(_)) => Ok(()),
}
}
async fn handle_server_message(
message: ServerMsg,
outgoing_tx: &mpsc::Sender<OutgoingMessage>,
max_output_bytes: u64,
) -> Result<(), String> {
match message {
ServerMsg::Exec { exec_id, command } => {
spawn_command_task(exec_id, command, outgoing_tx.clone(), max_output_bytes);
Ok(())
}
ServerMsg::AuthError { reason } => Err(reason),
ServerMsg::AuthOk => Ok(()),
}
}
fn spawn_command_task(
exec_id: String,
command: String,
outgoing_tx: mpsc::Sender<OutgoingMessage>,
max_output_bytes: u64,
) {
tokio::spawn(async move {
info!("Executing [{exec_id}]: {command}");
let (exit_code, stdout, stderr) = execute_command(&command, max_output_bytes).await;
let result = ClientMsg::ExecResult {
exec_id,
exit_code,
stdout,
stderr,
};
if let Err(error) = send_outgoing(&outgoing_tx, OutgoingMessage::Application(result)).await
{
error!("Failed to send command result: {error}");
}
});
}
async fn execute_command(command: &str, max_output_bytes: u64) -> (i32, String, String) {
let mut child = match Command::new("sh")
.arg("-c")
.arg(command)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => return (-1, String::new(), e.to_string()),
};
let Some(stdout) = child.stdout.take() else {
return (-1, String::new(), "stdout pipe unavailable".to_string());
};
let Some(stderr) = child.stderr.take() else {
return (-1, String::new(), "stderr pipe unavailable".to_string());
};
let (stdout_data, stderr_data) = tokio::join!(
read_limited_output(stdout, max_output_bytes),
read_limited_output(stderr, max_output_bytes),
);
child.kill().await.ok();
let exit_code = child.wait().await.ok().and_then(|s| s.code()).unwrap_or(-1);
(
exit_code,
String::from_utf8_lossy(&stdout_data).into_owned(),
String::from_utf8_lossy(&stderr_data).into_owned(),
)
}
async fn read_limited_output<R>(reader: R, max_output_bytes: u64) -> Vec<u8>
where
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
let _ = reader.take(max_output_bytes).read_to_end(&mut buffer).await;
buffer
}