From 73d949426582ae2c2d979affafde128b308f298b Mon Sep 17 00:00:00 2001 From: rabbitstack Date: Sun, 9 Feb 2025 21:15:23 +0100 Subject: [PATCH] refactor(event): Improve callstack decorator Keep the stack walk events indexed by stack id. The stack id is a tuple of pid,tid associated with the thread executing the code. For every unique stack id, the queue of pending events is maintained. When the corresponding stack walk event arrives, the oldest event is pulled from the queue and enriched with stack addresses. --- go.mod | 1 - go.sum | 2 - internal/etw/processors/fs_windows.go | 55 +++++++++++++------- pkg/kevent/callstack.go | 72 ++++++++++++++++++++------- pkg/kevent/callstack_test.go | 8 +-- pkg/kevent/queue.go | 4 ++ 6 files changed, 99 insertions(+), 43 deletions(-) diff --git a/go.mod b/go.mod index dc16db376..0d8b05a38 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/dustin/go-humanize v1.0.0 github.com/enescakir/emoji v1.0.0 - github.com/gammazero/deque v0.2.1 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/hashicorp/go-version v1.2.1 github.com/hillu/go-yara/v4 v4.2.4 diff --git a/go.sum b/go.sum index d1ac1dbbc..e09b195d4 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,6 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fzipp/gocyclo v0.3.1/go.mod h1:DJHO6AUmbdqj2ET4Z9iArSuwWgYDRryYt2wASxc7x3E= -github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= -github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= diff --git a/internal/etw/processors/fs_windows.go b/internal/etw/processors/fs_windows.go index c8e0c798d..abc5476e5 100644 --- a/internal/etw/processors/fs_windows.go +++ b/internal/etw/processors/fs_windows.go @@ -20,7 +20,6 @@ package processors import ( "expvar" - "github.com/gammazero/deque" "github.com/rabbitstack/fibratus/pkg/config" "github.com/rabbitstack/fibratus/pkg/fs" "github.com/rabbitstack/fibratus/pkg/handle" @@ -60,9 +59,10 @@ type fsProcessor struct { devPathResolver fs.DevPathResolver config *config.Config - deq *deque.Deque[*kevent.Kevent] - mu sync.Mutex - purger *time.Ticker + // buckets stores stack walk events per stack id + buckets map[uint64][]*kevent.Kevent + mu sync.Mutex + purger *time.Ticker quit chan struct{} } @@ -88,7 +88,7 @@ func newFsProcessor( devMapper: devMapper, devPathResolver: devPathResolver, config: config, - deq: deque.New[*kevent.Kevent](100), + buckets: make(map[uint64][]*kevent.Kevent), purger: time.NewTicker(time.Second * 5), quit: make(chan struct{}, 1), } @@ -159,7 +159,15 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { if !kevent.IsCurrentProcDropped(e.PID) { f.mu.Lock() defer f.mu.Unlock() - f.deq.PushBack(e) + + // append the event to the bucket indexed by stack id + id := e.StackID() + q, ok := f.buckets[id] + if !ok { + f.buckets[id] = []*kevent.Kevent{e} + } else { + f.buckets[id] = append(q, e) + } } case ktypes.FileOpEnd: // get the CreateFile pending event by IRP identifier @@ -169,6 +177,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { dispo = e.Kparams.MustGetUint64(kparams.FileExtraInfo) status = e.Kparams.MustGetUint32(kparams.NTStatus) ) + if dispo > windows.FILE_MAXIMUM_DISPOSITION { return e, nil } @@ -177,10 +186,12 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { return e, nil } delete(f.irps, irp) + // reset the wait status to allow passage of this event to // the aggregator queue. Additionally, append params to it ev.WaitEnqueue = false fileObject := ev.Kparams.MustGetUint64(kparams.FileObject) + // try to get extended file info. If the file object is already // present in the map, we'll reuse the existing file information fileinfo, ok := f.files[fileObject] @@ -191,9 +202,11 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { fileinfo = f.getFileInfo(filepath, opts) f.files[fileObject] = fileinfo } + if f.config.Kstream.EnableHandleKevents { f.devPathResolver.AddPath(ev.GetParamAsString(kparams.FilePath)) } + ev.AppendParam(kparams.NTStatus, kparams.Status, status) if fileinfo.Type != fs.Unknown { ev.AppendEnum(kparams.FileType, uint32(fileinfo.Type), fs.FileTypes) @@ -205,17 +218,20 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { // the events are delayed until the respective FileOpEnd // event arrives, we enable stack tracing for CreateFile // events. When the CreateFile event is generated, we store - // it pending IRP map. Subsequently, the stack walk event is - // generated which we put inside the queue. After FileOpEnd - // arrives, the previous stack walk for CreateFile is popped - // from the queue and the callstack parameter attached to the + // it in pending IRP map. Subsequently, the stack walk event + // is put inside the queue. After FileOpEnd event arrives, + // the previous stack walk for CreateFile is popped from + // the queue and the callstack parameter attached to the // event. if f.config.Kstream.StackEnrichment { f.mu.Lock() defer f.mu.Unlock() - i := f.deq.RIndex(func(evt *kevent.Kevent) bool { return evt.StackID() == ev.StackID() }) - if i != -1 { - s := f.deq.Remove(i) + + id := ev.StackID() + q, ok := f.buckets[id] + if ok && len(q) > 0 { + var s *kevent.Kevent + s, f.buckets[id] = q[len(q)-1], q[:len(q)-1] callstack := s.Kparams.MustGetSlice(kparams.Callstack) ev.AppendParam(kparams.Callstack, kparams.Slice, callstack) } @@ -228,6 +244,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { return ev, nil } } + return ev, nil case ktypes.ReleaseFile: fileReleaseCount.Add(1) @@ -297,6 +314,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { return e, f.psnap.AddMmap(e) } } + return e, nil } @@ -336,13 +354,16 @@ func (f *fsProcessor) purge() { select { case <-f.purger.C: f.mu.Lock() + // evict unmatched stack traces - for i := 0; i < f.deq.Len(); i++ { - evt := f.deq.At(i) - if time.Since(evt.Timestamp) > time.Second*30 { - f.deq.Remove(i) + for id, q := range f.buckets { + for i, evt := range q { + if time.Since(evt.Timestamp) > time.Second*30 { + f.buckets[id] = append(q[:i], q[i+1:]...) + } } } + f.mu.Unlock() case <-f.quit: return diff --git a/pkg/kevent/callstack.go b/pkg/kevent/callstack.go index 5396868c5..611f23f9b 100644 --- a/pkg/kevent/callstack.go +++ b/pkg/kevent/callstack.go @@ -20,7 +20,6 @@ package kevent import ( "expvar" - "github.com/gammazero/deque" "github.com/rabbitstack/fibratus/pkg/kevent/kparams" "github.com/rabbitstack/fibratus/pkg/util/multierror" "github.com/rabbitstack/fibratus/pkg/util/va" @@ -378,9 +377,9 @@ func (s Callstack) CallsiteInsns(pid uint32, leading bool) []string { // popped from the queue and enriched with return addresses // which are later subject to symbolization. type CallstackDecorator struct { - deq *deque.Deque[*Kevent] - q *Queue - mux sync.Mutex + buckets map[uint64][]*Kevent + q *Queue + mux sync.Mutex flusher *time.Ticker quit chan struct{} @@ -392,11 +391,13 @@ type CallstackDecorator struct { func NewCallstackDecorator(q *Queue) *CallstackDecorator { c := &CallstackDecorator{ q: q, - deq: deque.New[*Kevent](100), + buckets: make(map[uint64][]*Kevent), flusher: time.NewTicker(flusherInterval), quit: make(chan struct{}, 1), } + go c.doFlush() + return c } @@ -404,7 +405,15 @@ func NewCallstackDecorator(q *Queue) *CallstackDecorator { func (cd *CallstackDecorator) Push(e *Kevent) { cd.mux.Lock() defer cd.mux.Unlock() - cd.deq.PushBack(e) + + // append the event to the bucket indexed by stack id + id := e.StackID() + q, ok := cd.buckets[id] + if !ok { + cd.buckets[id] = []*Kevent{e} + } else { + cd.buckets[id] = append(q, e) + } } // Pop receives the stack walk event and pops the oldest @@ -414,13 +423,25 @@ func (cd *CallstackDecorator) Push(e *Kevent) { func (cd *CallstackDecorator) Pop(e *Kevent) *Kevent { cd.mux.Lock() defer cd.mux.Unlock() - i := cd.deq.Index(func(evt *Kevent) bool { return evt.StackID() == e.StackID() }) - if i == -1 { + + id := e.StackID() + q, ok := cd.buckets[id] + if !ok { return e } - evt := cd.deq.Remove(i) + + var evt *Kevent + if len(q) > 0 { + evt, cd.buckets[id] = q[0], q[1:] + } + + if evt == nil { + return e + } + callstack := e.Kparams.MustGetSlice(kparams.Callstack) evt.AppendParam(kparams.Callstack, kparams.Slice, callstack) + return evt } @@ -429,6 +450,13 @@ func (cd *CallstackDecorator) Stop() { cd.quit <- struct{}{} } +// RemoveBucket removes the bucket and all enqueued events. +func (cd *CallstackDecorator) RemoveBucket(id uint64) { + cd.mux.Lock() + defer cd.mux.Unlock() + delete(cd.buckets, id) +} + func (cd *CallstackDecorator) doFlush() { for { select { @@ -449,20 +477,26 @@ func (cd *CallstackDecorator) doFlush() { func (cd *CallstackDecorator) flush() []error { cd.mux.Lock() defer cd.mux.Unlock() - if cd.deq.Len() == 0 { + + if len(cd.buckets) == 0 { return nil } + errs := make([]error, 0) - for i := 0; i < cd.deq.Len(); i++ { - evt := cd.deq.At(i) - if time.Since(evt.Timestamp) < maxDequeFlushPeriod { - continue - } - callstackFlushes.Add(1) - err := cd.q.push(cd.deq.Remove(i)) - if err != nil { - errs = append(errs, err) + + for id, q := range cd.buckets { + for i, evt := range q { + if time.Since(evt.Timestamp) < maxDequeFlushPeriod { + continue + } + callstackFlushes.Add(1) + err := cd.q.push(evt) + if err != nil { + errs = append(errs, err) + } + cd.buckets[id] = append(q[:i], q[i+1:]...) } } + return errs } diff --git a/pkg/kevent/callstack_test.go b/pkg/kevent/callstack_test.go index 5a30ba4ec..39468b8fb 100644 --- a/pkg/kevent/callstack_test.go +++ b/pkg/kevent/callstack_test.go @@ -113,7 +113,7 @@ func TestCallstackDecorator(t *testing.T) { cd.Push(e) cd.Push(e1) - assert.True(t, cd.deq.Len() == 2) + assert.Len(t, cd.buckets[e.StackID()], 2) sw := &Kevent{ Type: ktypes.StackWalk, @@ -129,7 +129,7 @@ func TestCallstackDecorator(t *testing.T) { } evt := cd.Pop(sw) - assert.True(t, cd.deq.Len() == 1) + assert.Len(t, cd.buckets[e.StackID()], 1) assert.Equal(t, ktypes.CreateFile, evt.Type) assert.True(t, evt.Kparams.Contains(kparams.Callstack)) assert.Equal(t, "C:\\Windows\\system32\\user32.dll", evt.GetParamAsString(kparams.FilePath)) @@ -164,11 +164,11 @@ func TestCallstackDecoratorFlush(t *testing.T) { } cd.Push(e) - assert.True(t, cd.deq.Len() == 1) + assert.Len(t, cd.buckets[e.StackID()], 1) time.Sleep(time.Millisecond * 3100) evt := <-q.Events() - assert.True(t, cd.deq.Len() == 0) + assert.Len(t, cd.buckets[e.StackID()], 0) assert.Equal(t, ktypes.CreateFile, evt.Type) assert.False(t, evt.Kparams.Contains(kparams.Callstack)) } diff --git a/pkg/kevent/queue.go b/pkg/kevent/queue.go index f1d4d85c9..974ab97f7 100644 --- a/pkg/kevent/queue.go +++ b/pkg/kevent/queue.go @@ -157,6 +157,10 @@ func (q *Queue) push(e *Kevent) error { enqueue = true } } + if q.stackEnrichment && e.IsTerminateThread() { + id := uint64(e.Kparams.MustGetPid() + e.Kparams.MustGetTid()) + q.cd.RemoveBucket(id) + } if enqueue || len(q.listeners) == 0 { q.q <- e keventsEnqueued.Add(1)