|
| 1 | +use aya::include_bytes_aligned; |
| 2 | +use aya::maps::perf::AsyncPerfEventArray; |
| 3 | +use aya::programs::TracePoint; |
| 4 | +use aya::util::online_cpus; |
| 5 | +use aya::Bpf; |
| 6 | +use aya_log::BpfLogger; |
| 7 | +use bytes::BytesMut; |
| 8 | +use log::debug; |
| 9 | +use log::error; |
| 10 | +use signals_common::Signal; |
| 11 | +use std::mem::size_of; |
| 12 | +use std::sync::mpsc::SyncSender; |
| 13 | +use tokio::task; |
| 14 | + |
| 15 | +pub struct SignalEmitter { |
| 16 | + bpf: Bpf, |
| 17 | + dst: SyncSender<Signal>, |
| 18 | +} |
| 19 | + |
| 20 | +impl SignalEmitter { |
| 21 | + // new returns a new Signal that loads a bpf program from disk and loads it |
| 22 | + // into the kernel. signals are written to the destination channel. |
| 23 | + pub fn new(dst: SyncSender<Signal>) -> Result<Self, anyhow::Error> { |
| 24 | + #[cfg(debug_assertions)] |
| 25 | + let mut bpf = Bpf::load(include_bytes_aligned!( |
| 26 | + "../../target/bpfel-unknown-none/debug/signals" |
| 27 | + ))?; |
| 28 | + |
| 29 | + #[cfg(not(debug_assertions))] |
| 30 | + let mut bpf = Bpf::load(include_bytes_aligned!( |
| 31 | + "../../target/bpfel-unknown-none/release/signals" |
| 32 | + ))?; |
| 33 | + |
| 34 | + BpfLogger::init(&mut bpf).unwrap(); |
| 35 | + |
| 36 | + let program: &mut TracePoint = bpf.program_mut("signals").unwrap().try_into()?; |
| 37 | + program.load()?; |
| 38 | + program.attach("signal", "signal_generate")?; |
| 39 | + |
| 40 | + Ok(Self { bpf, dst }) |
| 41 | + } |
| 42 | + |
| 43 | + // attach starts to read signal events from the kernel and pipe them through the |
| 44 | + // destination channel. spawns a task per cpu, each task process events from its |
| 45 | + // own perf array. |
| 46 | + pub fn attach(&mut self) -> Result<(), anyhow::Error> { |
| 47 | + let cpus = online_cpus()?; |
| 48 | + let mut senders: Vec<SyncSender<Signal>> = vec![]; |
| 49 | + for _ in 0..cpus.len() { |
| 50 | + senders.push(self.dst.clone()); |
| 51 | + } |
| 52 | + |
| 53 | + let signal_struct_size: usize = size_of::<Signal>(); |
| 54 | + let mut perf_array = AsyncPerfEventArray::try_from(self.bpf.map_mut("SIGNALS")?)?; |
| 55 | + for cpu_id in cpus { |
| 56 | + debug!("spawning task for cpu {}", cpu_id); |
| 57 | + let mut parray = perf_array.open(cpu_id, None)?; |
| 58 | + let txcopy = senders.pop().unwrap(); |
| 59 | + task::spawn(async move { |
| 60 | + debug!("task for cpu awaiting for events {}", cpu_id); |
| 61 | + let mut buffers = (0..100) |
| 62 | + .map(|_| BytesMut::with_capacity(signal_struct_size)) |
| 63 | + .collect::<Vec<_>>(); |
| 64 | + |
| 65 | + loop { |
| 66 | + let events = match parray.read_events(&mut buffers).await { |
| 67 | + Ok(events) => events, |
| 68 | + Err(error) => { |
| 69 | + error!("fail to read events from the perf, bailing out: {}", error); |
| 70 | + return; |
| 71 | + } |
| 72 | + }; |
| 73 | + |
| 74 | + if events.lost > 0 { |
| 75 | + error!("queues are getting full, lost {} signals", events.lost); |
| 76 | + } |
| 77 | + |
| 78 | + for i in 0..events.read { |
| 79 | + let buf = &mut buffers[i]; |
| 80 | + let ptr = buf.as_ptr() as *const Signal; |
| 81 | + let signal = unsafe { ptr.read_unaligned() }; |
| 82 | + match txcopy.send(signal) { |
| 83 | + Ok(_) => continue, |
| 84 | + Err(err) => error!("failed to send signal internally: {}", err), |
| 85 | + } |
| 86 | + } |
| 87 | + } |
| 88 | + }); |
| 89 | + } |
| 90 | + |
| 91 | + Ok(()) |
| 92 | + } |
| 93 | +} |
0 commit comments