Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ samply = { git = "https://github.com/CodSpeedHQ/samply-codspeed", branch = "cods

[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.17.0"
# Enable the memtrack `ebpf` feature only on Linux so `codspeed tool memtrack`
# actually runs there. macOS builds keep memtrack as the IPC-client shell with
# no eBPF code; `codspeed tool memtrack` on macOS bails with a clear error.
memtrack = { path = "crates/memtrack", features = ["ebpf"] }

[dev-dependencies]
temp-env = { version = "0.3.6", features = ["async_closure"] }
Expand Down
67 changes: 66 additions & 1 deletion crates/exec-harness/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use clap::ValueEnum;
use clap::{Parser, ValueEnum};
use prelude::*;
use serde::{Deserialize, Serialize};
use std::ffi::OsString;
use std::io::{self, BufRead};

pub mod analysis;
Expand Down Expand Up @@ -84,3 +85,67 @@ pub fn execute_benchmarks(

Ok(())
}

/// Top-level CLI parser for the exec-harness entry point. The same parser is
/// used both by the standalone `exec-harness` binary (via [`run_cli`]) and by
/// `codspeed tool exec-harness` in the main CLI.
#[derive(Parser, Debug)]
#[command(name = "exec-harness")]
#[command(
version,
about = "CodSpeed exec harness - wraps commands with performance instrumentation"
)]
pub struct CliArgs {
/// Optional benchmark name, else the command will be used as the name
#[arg(long)]
name: Option<String>,

/// Set by the runner, should be coherent with the executor being used
#[arg(short, long, global = true, env = "CODSPEED_RUNNER_MODE", hide = true)]
measurement_mode: Option<MeasurementMode>,

#[command(flatten)]
walltime_args: walltime::WalltimeExecutionArgs,

/// The command and arguments to execute.
/// Use "-" as the only argument to read a JSON payload from stdin.
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
command: Vec<String>,
}

/// Parse `argv` as exec-harness CLI args (first element is the program name)
/// and run the harness. Initializes `env_logger` on the way in so logs from
/// the harness reach the runner's captured output.
pub fn run_cli<I, T>(argv: I) -> Result<()>
where
I: IntoIterator<Item = T>,
T: Into<OsString> + Clone,
{
env_logger::builder()
.parse_env(env_logger::Env::new().filter_or("CODSPEED_LOG", "info"))
.format(|buf, record| {
use std::io::Write;
writeln!(buf, "{}", record.args())
})
.try_init()
.ok();

debug!("Starting exec-harness with pid {}", std::process::id());

let args = CliArgs::parse_from(argv);
let measurement_mode = args.measurement_mode;

let commands = match args.command.as_slice() {
[single] if single == "-" => read_commands_from_stdin()?,
[] => bail!("No command provided"),
_ => vec![BenchmarkCommand {
command: args.command,
name: args.name,
walltime_args: args.walltime_args,
}],
};

execute_benchmarks(commands, measurement_mode)?;

Ok(())
}
57 changes: 1 addition & 56 deletions crates/exec-harness/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,5 @@
use clap::Parser;
use exec_harness::prelude::*;
use exec_harness::walltime::WalltimeExecutionArgs;
use exec_harness::{
BenchmarkCommand, MeasurementMode, execute_benchmarks, read_commands_from_stdin,
};

#[derive(Parser, Debug)]
#[command(name = "exec-harness")]
#[command(
version,
about = "CodSpeed exec harness - wraps commands with performance instrumentation"
)]
struct Args {
/// Optional benchmark name, else the command will be used as the name
#[arg(long)]
name: Option<String>,

/// Set by the runner, should be coherent with the executor being used
#[arg(short, long, global = true, env = "CODSPEED_RUNNER_MODE", hide = true)]
measurement_mode: Option<MeasurementMode>,

#[command(flatten)]
walltime_args: WalltimeExecutionArgs,

/// The command and arguments to execute.
/// Use "-" as the only argument to read a JSON payload from stdin.
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
command: Vec<String>,
}

fn main() -> Result<()> {
env_logger::builder()
.parse_env(env_logger::Env::new().filter_or("CODSPEED_LOG", "info"))
.format(|buf, record| {
use std::io::Write;
writeln!(buf, "{}", record.args())
})
.init();

debug!("Starting exec-harness with pid {}", std::process::id());

let args = Args::parse();
let measurement_mode = args.measurement_mode;

// Determine if we're in stdin mode or CLI mode
let commands = match args.command.as_slice() {
[single] if single == "-" => read_commands_from_stdin()?,
[] => bail!("No command provided"),
_ => vec![BenchmarkCommand {
command: args.command,
name: args.name,
walltime_args: args.walltime_args,
}],
};

execute_benchmarks(commands, measurement_mode)?;

Ok(())
exec_harness::run_cli(std::env::args_os())
}
241 changes: 241 additions & 0 deletions crates/memtrack/src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
//! Shared CLI entry for memtrack.
//!
//! Used by both the standalone `codspeed-memtrack` binary and by
//! `codspeed tool memtrack` in the main runner CLI.

use clap::Parser;
use ipc_channel::ipc;
use std::ffi::OsString;
use std::os::unix::process::CommandExt;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent, MemtrackWriter};

use crate::prelude::*;
use crate::{MemtrackIpcMessage, Tracker, handle_ipc_message};

#[derive(Parser, Debug)]
#[command(name = "memtrack")]
#[command(version, about = "Track memory allocations using eBPF", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}

#[derive(Parser, Debug)]
enum Commands {
/// Track memory allocations for a command
Track {
/// Command to execute and track
command: String,

/// Output folder for the allocations data
#[arg(short, long, default_value = ".")]
output: PathBuf,

/// Optional IPC server name for receiving control commands
#[arg(long)]
ipc_server: Option<String>,
},
}

/// Get the original user's UID and GID when running under sudo.
/// Returns None if not running under sudo or if the environment variables are not set.
fn get_user_uid_gid() -> Option<(u32, u32)> {
let uid = std::env::var("SUDO_UID").ok()?.parse().ok()?;
let gid = std::env::var("SUDO_GID").ok()?.parse().ok()?;
Some((uid, gid))
}

/// Parse `argv` as memtrack CLI args (first element is the program name) and
/// run the requested subcommand. Initializes `env_logger` on the way in.
pub fn run_cli<I, T>(argv: I) -> Result<()>
where
I: IntoIterator<Item = T>,
T: Into<OsString> + Clone,
{
env_logger::builder()
.parse_env(env_logger::Env::new().filter_or("CODSPEED_LOG", "info"))
.format_timestamp(None)
.try_init()
.ok();

let cli = Cli::parse_from(argv);

match cli.command {
Commands::Track {
command,
output: out_dir,
ipc_server,
} => {
debug!("Starting memtrack for command: {command}");

let status =
track_command(&command, ipc_server, &out_dir).context("Failed to track command")?;

std::process::exit(status.code().unwrap_or(1));
}
}
}

fn track_command(
cmd_string: &str,
ipc_server_name: Option<String>,
out_dir: &Path,
) -> anyhow::Result<std::process::ExitStatus> {
// First, establish IPC connection if needed to avoid timeouts on the runner because
// creating the Tracker instance takes some time.
let ipc_channel = if let Some(server_name) = ipc_server_name {
debug!("Connecting to IPC server: {server_name}");

let (tx, rx) = ipc::channel::<MemtrackIpcMessage>()?;
let sender = ipc::IpcSender::connect(server_name)?;
sender.send(tx)?;

Some(rx)
} else {
None
};

let tracker = Tracker::new()?;
let tracker_arc = Arc::new(Mutex::new(tracker));

// Spawn IPC handler thread with the now-available tracker
let ipc_handle = if let Some(rx) = ipc_channel {
let tracker_clone = tracker_arc.clone();
Some(thread::spawn(move || {
while let Ok(msg) = rx.recv() {
handle_ipc_message(msg, &tracker_clone);
}
}))
} else {
None
};

// Start the target command using bash to handle shell syntax
let mut cmd = Command::new("bash");
cmd.arg("-c").arg(cmd_string);

// Drop privileges if running under sudo. This is required to avoid permission issues
// when the target command tries to access files or directories that the current user
// does not have permission to access.
if let Some((uid, gid)) = get_user_uid_gid() {
debug!("Running under sudo, dropping privileges to uid={uid}, gid={gid}");
cmd.uid(uid).gid(gid);
}

let mut child = cmd
.spawn()
.map_err(|e| anyhow!("Failed to spawn child process: {e}"))?;
let root_pid = child.id() as i32;
let event_rx = { tracker_arc.lock().unwrap().track(root_pid)? };
debug!("Spawned child with pid {root_pid}");

// Generate output file name and create file for streaming events
let file_name = MemtrackArtifact::file_name(Some(root_pid));
let out_file = std::fs::File::create(out_dir.join(file_name))?;

let (write_tx, write_rx) = channel::<MemtrackEvent>();

// Stage A: Fast drain thread - This is required so that we immediately clear the ring buffer
// because it only has a limited size. The signal is per-call (was a function-scoped
// `static` when this lived in `main.rs`; in library form a second call would otherwise
// see the previous run's `false` and skip the regular draining loop entirely).
let drain_events = Arc::new(AtomicBool::new(true));
let drain_events_for_thread = drain_events.clone();
let write_tx_clone = write_tx.clone();
let drain_thread = thread::spawn(move || {
// Regular draining loop
while drain_events_for_thread.load(Ordering::Relaxed) {
let Ok(event) = event_rx.recv_timeout(Duration::from_millis(100)) else {
continue;
};
let _ = write_tx_clone.send(event);
}

// Final aggressive drain - keep trying until truly empty
loop {
match event_rx.try_recv() {
Ok(event) => {
let _ = write_tx_clone.send(event);
}
Err(_) => {
// Sleep briefly and try once more to catch late arrivals
thread::sleep(Duration::from_millis(50));
if let Ok(event) = event_rx.try_recv() {
let _ = write_tx_clone.send(event);
} else {
break;
}
}
}
}
});

// Stage B: Writer thread - Immediately writes the events to disk
let writer_thread = thread::spawn(move || -> anyhow::Result<()> {
let mut writer = MemtrackWriter::new(out_file)?;

let mut i = 0;
while let Ok(first) = write_rx.recv() {
writer.write_event(&first)?;
i += 1;

// Drain any backlog in a tight loop (batching)
while let Ok(ev) = write_rx.try_recv() {
writer.write_event(&ev)?;
i += 1;
}
}
writer.finish()?;

info!("Wrote {i} memtrack events to disk");

Ok(())
});

// Wait for the command to complete
let status = child.wait().context("Failed to wait for command")?;
debug!("Command exited with status: {status}");

// Wait for drain thread to finish
debug!("Waiting for the drain thread to finish");
drain_events.store(false, Ordering::Relaxed);
drain_thread
.join()
.map_err(|_| anyhow::anyhow!("Failed to join drain thread"))?;

// Wait for writer thread to finish and propagate errors
debug!("Waiting for the writer thread to finish");
drop(write_tx);
writer_thread
.join()
.map_err(|_| anyhow::anyhow!("Failed to join writer thread"))??;

// Read the eBPF dropped-event counter after the run is complete.
// A non-zero value means the ring buffer overflowed and the trace is
// incomplete.
let dropped_events = tracker_arc
.lock()
.map_err(|_| anyhow!("tracker mutex poisoned"))?
.dropped_events_count()
.context("Failed to read memtrack dropped-event counter")?;
if dropped_events > 0 {
bail!(
"Memtrack ring buffer overflowed: {dropped_events} events lost, aborting since the trace is incomplete.\n\
Try reducing the benchmark's allocation rate (fewer iterations or smaller inputs), \
or report it at https://github.com/CodSpeedHQ/codspeed/issues."
);
}

// IPC thread will exit when channel closes
drop(ipc_handle);

Ok(status)
}
Loading