diff --git a/go.mod b/go.mod index dc16db376..0d8b05a38 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/dustin/go-humanize v1.0.0 github.com/enescakir/emoji v1.0.0 - github.com/gammazero/deque v0.2.1 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/hashicorp/go-version v1.2.1 github.com/hillu/go-yara/v4 v4.2.4 diff --git a/go.sum b/go.sum index d1ac1dbbc..e09b195d4 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,6 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fzipp/gocyclo v0.3.1/go.mod h1:DJHO6AUmbdqj2ET4Z9iArSuwWgYDRryYt2wASxc7x3E= -github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= -github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= diff --git a/internal/etw/processors/fs_windows.go b/internal/etw/processors/fs_windows.go index c8e0c798d..abc5476e5 100644 --- a/internal/etw/processors/fs_windows.go +++ b/internal/etw/processors/fs_windows.go @@ -20,7 +20,6 @@ package processors import ( "expvar" - "github.com/gammazero/deque" "github.com/rabbitstack/fibratus/pkg/config" "github.com/rabbitstack/fibratus/pkg/fs" "github.com/rabbitstack/fibratus/pkg/handle" @@ -60,9 +59,10 @@ type fsProcessor struct { devPathResolver fs.DevPathResolver config *config.Config - deq *deque.Deque[*kevent.Kevent] - mu sync.Mutex - purger *time.Ticker + // buckets stores stack walk events per stack id + buckets map[uint64][]*kevent.Kevent + mu sync.Mutex + purger *time.Ticker quit chan struct{} } @@ -88,7 +88,7 @@ func newFsProcessor( devMapper: devMapper, devPathResolver: devPathResolver, config: config, - deq: deque.New[*kevent.Kevent](100), + buckets: make(map[uint64][]*kevent.Kevent), purger: time.NewTicker(time.Second * 5), quit: make(chan struct{}, 1), } @@ -159,7 +159,15 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { if !kevent.IsCurrentProcDropped(e.PID) { f.mu.Lock() defer f.mu.Unlock() - f.deq.PushBack(e) + + // append the event to the bucket indexed by stack id + id := e.StackID() + q, ok := f.buckets[id] + if !ok { + f.buckets[id] = []*kevent.Kevent{e} + } else { + f.buckets[id] = append(q, e) + } } case ktypes.FileOpEnd: // get the CreateFile pending event by IRP identifier @@ -169,6 +177,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { dispo = e.Kparams.MustGetUint64(kparams.FileExtraInfo) status = e.Kparams.MustGetUint32(kparams.NTStatus) ) + if dispo > windows.FILE_MAXIMUM_DISPOSITION { return e, nil } @@ -177,10 +186,12 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { return e, nil } delete(f.irps, irp) + // reset the wait status to allow passage of this event to // the aggregator queue. Additionally, append params to it ev.WaitEnqueue = false fileObject := ev.Kparams.MustGetUint64(kparams.FileObject) + // try to get extended file info. If the file object is already // present in the map, we'll reuse the existing file information fileinfo, ok := f.files[fileObject] @@ -191,9 +202,11 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { fileinfo = f.getFileInfo(filepath, opts) f.files[fileObject] = fileinfo } + if f.config.Kstream.EnableHandleKevents { f.devPathResolver.AddPath(ev.GetParamAsString(kparams.FilePath)) } + ev.AppendParam(kparams.NTStatus, kparams.Status, status) if fileinfo.Type != fs.Unknown { ev.AppendEnum(kparams.FileType, uint32(fileinfo.Type), fs.FileTypes) @@ -205,17 +218,20 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { // the events are delayed until the respective FileOpEnd // event arrives, we enable stack tracing for CreateFile // events. When the CreateFile event is generated, we store - // it pending IRP map. Subsequently, the stack walk event is - // generated which we put inside the queue. After FileOpEnd - // arrives, the previous stack walk for CreateFile is popped - // from the queue and the callstack parameter attached to the + // it in pending IRP map. Subsequently, the stack walk event + // is put inside the queue. After FileOpEnd event arrives, + // the previous stack walk for CreateFile is popped from + // the queue and the callstack parameter attached to the // event. if f.config.Kstream.StackEnrichment { f.mu.Lock() defer f.mu.Unlock() - i := f.deq.RIndex(func(evt *kevent.Kevent) bool { return evt.StackID() == ev.StackID() }) - if i != -1 { - s := f.deq.Remove(i) + + id := ev.StackID() + q, ok := f.buckets[id] + if ok && len(q) > 0 { + var s *kevent.Kevent + s, f.buckets[id] = q[len(q)-1], q[:len(q)-1] callstack := s.Kparams.MustGetSlice(kparams.Callstack) ev.AppendParam(kparams.Callstack, kparams.Slice, callstack) } @@ -228,6 +244,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { return ev, nil } } + return ev, nil case ktypes.ReleaseFile: fileReleaseCount.Add(1) @@ -297,6 +314,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) { return e, f.psnap.AddMmap(e) } } + return e, nil } @@ -336,13 +354,16 @@ func (f *fsProcessor) purge() { select { case <-f.purger.C: f.mu.Lock() + // evict unmatched stack traces - for i := 0; i < f.deq.Len(); i++ { - evt := f.deq.At(i) - if time.Since(evt.Timestamp) > time.Second*30 { - f.deq.Remove(i) + for id, q := range f.buckets { + for i, evt := range q { + if time.Since(evt.Timestamp) > time.Second*30 { + f.buckets[id] = append(q[:i], q[i+1:]...) + } } } + f.mu.Unlock() case <-f.quit: return diff --git a/pkg/kevent/callstack.go b/pkg/kevent/callstack.go index 5396868c5..611f23f9b 100644 --- a/pkg/kevent/callstack.go +++ b/pkg/kevent/callstack.go @@ -20,7 +20,6 @@ package kevent import ( "expvar" - "github.com/gammazero/deque" "github.com/rabbitstack/fibratus/pkg/kevent/kparams" "github.com/rabbitstack/fibratus/pkg/util/multierror" "github.com/rabbitstack/fibratus/pkg/util/va" @@ -378,9 +377,9 @@ func (s Callstack) CallsiteInsns(pid uint32, leading bool) []string { // popped from the queue and enriched with return addresses // which are later subject to symbolization. type CallstackDecorator struct { - deq *deque.Deque[*Kevent] - q *Queue - mux sync.Mutex + buckets map[uint64][]*Kevent + q *Queue + mux sync.Mutex flusher *time.Ticker quit chan struct{} @@ -392,11 +391,13 @@ type CallstackDecorator struct { func NewCallstackDecorator(q *Queue) *CallstackDecorator { c := &CallstackDecorator{ q: q, - deq: deque.New[*Kevent](100), + buckets: make(map[uint64][]*Kevent), flusher: time.NewTicker(flusherInterval), quit: make(chan struct{}, 1), } + go c.doFlush() + return c } @@ -404,7 +405,15 @@ func NewCallstackDecorator(q *Queue) *CallstackDecorator { func (cd *CallstackDecorator) Push(e *Kevent) { cd.mux.Lock() defer cd.mux.Unlock() - cd.deq.PushBack(e) + + // append the event to the bucket indexed by stack id + id := e.StackID() + q, ok := cd.buckets[id] + if !ok { + cd.buckets[id] = []*Kevent{e} + } else { + cd.buckets[id] = append(q, e) + } } // Pop receives the stack walk event and pops the oldest @@ -414,13 +423,25 @@ func (cd *CallstackDecorator) Push(e *Kevent) { func (cd *CallstackDecorator) Pop(e *Kevent) *Kevent { cd.mux.Lock() defer cd.mux.Unlock() - i := cd.deq.Index(func(evt *Kevent) bool { return evt.StackID() == e.StackID() }) - if i == -1 { + + id := e.StackID() + q, ok := cd.buckets[id] + if !ok { return e } - evt := cd.deq.Remove(i) + + var evt *Kevent + if len(q) > 0 { + evt, cd.buckets[id] = q[0], q[1:] + } + + if evt == nil { + return e + } + callstack := e.Kparams.MustGetSlice(kparams.Callstack) evt.AppendParam(kparams.Callstack, kparams.Slice, callstack) + return evt } @@ -429,6 +450,13 @@ func (cd *CallstackDecorator) Stop() { cd.quit <- struct{}{} } +// RemoveBucket removes the bucket and all enqueued events. +func (cd *CallstackDecorator) RemoveBucket(id uint64) { + cd.mux.Lock() + defer cd.mux.Unlock() + delete(cd.buckets, id) +} + func (cd *CallstackDecorator) doFlush() { for { select { @@ -449,20 +477,26 @@ func (cd *CallstackDecorator) doFlush() { func (cd *CallstackDecorator) flush() []error { cd.mux.Lock() defer cd.mux.Unlock() - if cd.deq.Len() == 0 { + + if len(cd.buckets) == 0 { return nil } + errs := make([]error, 0) - for i := 0; i < cd.deq.Len(); i++ { - evt := cd.deq.At(i) - if time.Since(evt.Timestamp) < maxDequeFlushPeriod { - continue - } - callstackFlushes.Add(1) - err := cd.q.push(cd.deq.Remove(i)) - if err != nil { - errs = append(errs, err) + + for id, q := range cd.buckets { + for i, evt := range q { + if time.Since(evt.Timestamp) < maxDequeFlushPeriod { + continue + } + callstackFlushes.Add(1) + err := cd.q.push(evt) + if err != nil { + errs = append(errs, err) + } + cd.buckets[id] = append(q[:i], q[i+1:]...) } } + return errs } diff --git a/pkg/kevent/callstack_test.go b/pkg/kevent/callstack_test.go index 5a30ba4ec..39468b8fb 100644 --- a/pkg/kevent/callstack_test.go +++ b/pkg/kevent/callstack_test.go @@ -113,7 +113,7 @@ func TestCallstackDecorator(t *testing.T) { cd.Push(e) cd.Push(e1) - assert.True(t, cd.deq.Len() == 2) + assert.Len(t, cd.buckets[e.StackID()], 2) sw := &Kevent{ Type: ktypes.StackWalk, @@ -129,7 +129,7 @@ func TestCallstackDecorator(t *testing.T) { } evt := cd.Pop(sw) - assert.True(t, cd.deq.Len() == 1) + assert.Len(t, cd.buckets[e.StackID()], 1) assert.Equal(t, ktypes.CreateFile, evt.Type) assert.True(t, evt.Kparams.Contains(kparams.Callstack)) assert.Equal(t, "C:\\Windows\\system32\\user32.dll", evt.GetParamAsString(kparams.FilePath)) @@ -164,11 +164,11 @@ func TestCallstackDecoratorFlush(t *testing.T) { } cd.Push(e) - assert.True(t, cd.deq.Len() == 1) + assert.Len(t, cd.buckets[e.StackID()], 1) time.Sleep(time.Millisecond * 3100) evt := <-q.Events() - assert.True(t, cd.deq.Len() == 0) + assert.Len(t, cd.buckets[e.StackID()], 0) assert.Equal(t, ktypes.CreateFile, evt.Type) assert.False(t, evt.Kparams.Contains(kparams.Callstack)) } diff --git a/pkg/kevent/queue.go b/pkg/kevent/queue.go index f1d4d85c9..974ab97f7 100644 --- a/pkg/kevent/queue.go +++ b/pkg/kevent/queue.go @@ -157,6 +157,10 @@ func (q *Queue) push(e *Kevent) error { enqueue = true } } + if q.stackEnrichment && e.IsTerminateThread() { + id := uint64(e.Kparams.MustGetPid() + e.Kparams.MustGetTid()) + q.cd.RemoveBucket(id) + } if enqueue || len(q.listeners) == 0 { q.q <- e keventsEnqueued.Add(1)