Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/howl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use flatbuffers::FlatBufferBuilder;
use isis_streaming_data_types::flatbuffers_generated::events_ev44::{
Event44Message, Event44MessageArgs, finish_event_44_message_buffer,
};
use isis_streaming_data_types::flatbuffers_generated::pulse_metadata_pu00::{
Pu00Message, Pu00MessageArgs, finish_pu_00_message_buffer,
};
use isis_streaming_data_types::flatbuffers_generated::run_start_pl72::{
RunStart, RunStartArgs, SpectraDetectorMapping, SpectraDetectorMappingArgs,
finish_run_start_buffer,
Expand Down Expand Up @@ -126,6 +129,18 @@ fn produce_messages(
.try_into()
.expect("This will fail after April 11th, 2262");

match producer.send(
BaseRecord::to(conf.event_topic)
.key("")
.payload(generate_fake_metadata(fbb, now_nanos))
.timestamp(now_nanos / 1_000_000),
) {
Ok(_) => {}
Err(err) => {
error!("Failed to send messages: {}", err.0);
}
}

for _ in 0..conf.messages_per_frame {
match producer.send(
BaseRecord::to(conf.event_topic)
Expand Down Expand Up @@ -222,6 +237,21 @@ fn generate_fake_events<'a>(
fbb.finished_data()
}

fn generate_fake_metadata<'a>(fbb: &'a mut FlatBufferBuilder<'_>, timestamp_ns: i64) -> &'a [u8] {
fbb.reset();
let args = Pu00MessageArgs {
reference_time: timestamp_ns,
message_id: 0,
source_name: Some(fbb.create_string("saluki")),
period_number: Some(1),
vetos: Some(0),
proton_charge: Some(0.1),
};
let pu00 = Pu00Message::create(fbb, &args);
finish_pu_00_message_buffer(fbb, pu00);
fbb.finished_data()
}

pub struct HowlConfig<'a> {
pub broker: &'a str,
pub event_topic: &'a str,
Expand All @@ -243,14 +273,18 @@ pub fn howl(conf: &HowlConfig) {
.as_nanos()
.try_into()
.expect("This will fail after April 11th, 2262");

let ev44_size =
generate_fake_events(&mut fbb, &mut rng, 0, conf.event_message_config, now_nanos).len()
as u32;

debug!("ev44 size is {ev44_size} bytes");

// calculate rate
let rate_bytes_per_sec = ev44_size * conf.messages_per_frame * conf.frames_per_second;
let pu00_size = generate_fake_metadata(&mut fbb, now_nanos).len() as u32;
debug!("pu00 size is {pu00_size} bytes");

// calculate overall rate (with both ev44 and pu00)
let rate_bytes_per_sec = ev44_size * conf.messages_per_frame * conf.frames_per_second
+ pu00_size * conf.frames_per_second;
debug!("bytes per second: {rate_bytes_per_sec}");

let rate_mbit_per_sec = (rate_bytes_per_sec as f64 / (1024. * 1024.)) * 8.0;
Expand All @@ -259,6 +293,7 @@ pub fn howl(conf: &HowlConfig) {
println!(
"Attempting to simulate data rate: {rate_mbit_per_sec:.3} Mbit/s ({rate_mebibits_per_sec:.3} MiB/s)"
);
println!("Each pu00 is {pu00_size} bytes");
println!("Each ev44 is {ev44_size} bytes");

let producer: ThreadedProducer<DefaultProducerContext> = ClientConfig::new()
Expand Down Expand Up @@ -291,6 +326,7 @@ pub fn howl(conf: &HowlConfig) {
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get system time");
debug!("Target time: {target_time:?}");

loop {
target_time += target_frame_time;
debug!("New target: {target_time:?}");
Expand Down