diff --git a/src/howl.rs b/src/howl.rs index b1776a1..453712d 100644 --- a/src/howl.rs +++ b/src/howl.rs @@ -5,6 +5,9 @@ use flatbuffers::FlatBufferBuilder; use isis_streaming_data_types::flatbuffers_generated::events_ev44::{ Event44Message, Event44MessageArgs, finish_event_44_message_buffer, }; +use isis_streaming_data_types::flatbuffers_generated::pulse_metadata_pu00::{ + Pu00Message, Pu00MessageArgs, finish_pu_00_message_buffer, +}; use isis_streaming_data_types::flatbuffers_generated::run_start_pl72::{ RunStart, RunStartArgs, SpectraDetectorMapping, SpectraDetectorMappingArgs, finish_run_start_buffer, @@ -126,6 +129,18 @@ fn produce_messages( .try_into() .expect("This will fail after April 11th, 2262"); + match producer.send( + BaseRecord::to(conf.event_topic) + .key("") + .payload(generate_fake_metadata(fbb, now_nanos)) + .timestamp(now_nanos / 1_000_000), + ) { + Ok(_) => {} + Err(err) => { + error!("Failed to send messages: {}", err.0); + } + } + for _ in 0..conf.messages_per_frame { match producer.send( BaseRecord::to(conf.event_topic) @@ -222,6 +237,21 @@ fn generate_fake_events<'a>( fbb.finished_data() } +fn generate_fake_metadata<'a>(fbb: &'a mut FlatBufferBuilder<'_>, timestamp_ns: i64) -> &'a [u8] { + fbb.reset(); + let args = Pu00MessageArgs { + reference_time: timestamp_ns, + message_id: 0, + source_name: Some(fbb.create_string("saluki")), + period_number: Some(1), + vetos: Some(0), + proton_charge: Some(0.1), + }; + let pu00 = Pu00Message::create(fbb, &args); + finish_pu_00_message_buffer(fbb, pu00); + fbb.finished_data() +} + pub struct HowlConfig<'a> { pub broker: &'a str, pub event_topic: &'a str, @@ -243,14 +273,18 @@ pub fn howl(conf: &HowlConfig) { .as_nanos() .try_into() .expect("This will fail after April 11th, 2262"); + let ev44_size = generate_fake_events(&mut fbb, &mut rng, 0, conf.event_message_config, now_nanos).len() as u32; - debug!("ev44 size is {ev44_size} bytes"); - // calculate rate - let rate_bytes_per_sec = ev44_size * conf.messages_per_frame * conf.frames_per_second; + let pu00_size = generate_fake_metadata(&mut fbb, now_nanos).len() as u32; + debug!("pu00 size is {pu00_size} bytes"); + + // calculate overall rate (with both ev44 and pu00) + let rate_bytes_per_sec = ev44_size * conf.messages_per_frame * conf.frames_per_second + + pu00_size * conf.frames_per_second; debug!("bytes per second: {rate_bytes_per_sec}"); let rate_mbit_per_sec = (rate_bytes_per_sec as f64 / (1024. * 1024.)) * 8.0; @@ -259,6 +293,7 @@ pub fn howl(conf: &HowlConfig) { println!( "Attempting to simulate data rate: {rate_mbit_per_sec:.3} Mbit/s ({rate_mebibits_per_sec:.3} MiB/s)" ); + println!("Each pu00 is {pu00_size} bytes"); println!("Each ev44 is {ev44_size} bytes"); let producer: ThreadedProducer = ClientConfig::new() @@ -291,6 +326,7 @@ pub fn howl(conf: &HowlConfig) { .duration_since(SystemTime::UNIX_EPOCH) .expect("Failed to get system time"); debug!("Target time: {target_time:?}"); + loop { target_time += target_frame_time; debug!("New target: {target_time:?}");