Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/dustin/go-humanize v1.0.0
github.com/enescakir/emoji v1.0.0
github.com/gammazero/deque v0.2.1
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e
github.com/hashicorp/go-version v1.2.1
github.com/hillu/go-yara/v4 v4.2.4
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fzipp/gocyclo v0.3.1/go.mod h1:DJHO6AUmbdqj2ET4Z9iArSuwWgYDRryYt2wASxc7x3E=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
Expand Down
55 changes: 38 additions & 17 deletions internal/etw/processors/fs_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package processors

import (
"expvar"
"github.com/gammazero/deque"
"github.com/rabbitstack/fibratus/pkg/config"
"github.com/rabbitstack/fibratus/pkg/fs"
"github.com/rabbitstack/fibratus/pkg/handle"
Expand Down Expand Up @@ -60,9 +59,10 @@ type fsProcessor struct {
devPathResolver fs.DevPathResolver
config *config.Config

deq *deque.Deque[*kevent.Kevent]
mu sync.Mutex
purger *time.Ticker
// buckets stores stack walk events per stack id
buckets map[uint64][]*kevent.Kevent
mu sync.Mutex
purger *time.Ticker

quit chan struct{}
}
Expand All @@ -88,7 +88,7 @@ func newFsProcessor(
devMapper: devMapper,
devPathResolver: devPathResolver,
config: config,
deq: deque.New[*kevent.Kevent](100),
buckets: make(map[uint64][]*kevent.Kevent),
purger: time.NewTicker(time.Second * 5),
quit: make(chan struct{}, 1),
}
Expand Down Expand Up @@ -159,7 +159,15 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
if !kevent.IsCurrentProcDropped(e.PID) {
f.mu.Lock()
defer f.mu.Unlock()
f.deq.PushBack(e)

// append the event to the bucket indexed by stack id
id := e.StackID()
q, ok := f.buckets[id]
if !ok {
f.buckets[id] = []*kevent.Kevent{e}
} else {
f.buckets[id] = append(q, e)
}
}
case ktypes.FileOpEnd:
// get the CreateFile pending event by IRP identifier
Expand All @@ -169,6 +177,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
dispo = e.Kparams.MustGetUint64(kparams.FileExtraInfo)
status = e.Kparams.MustGetUint32(kparams.NTStatus)
)

if dispo > windows.FILE_MAXIMUM_DISPOSITION {
return e, nil
}
Expand All @@ -177,10 +186,12 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
return e, nil
}
delete(f.irps, irp)

// reset the wait status to allow passage of this event to
// the aggregator queue. Additionally, append params to it
ev.WaitEnqueue = false
fileObject := ev.Kparams.MustGetUint64(kparams.FileObject)

// try to get extended file info. If the file object is already
// present in the map, we'll reuse the existing file information
fileinfo, ok := f.files[fileObject]
Expand All @@ -191,9 +202,11 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
fileinfo = f.getFileInfo(filepath, opts)
f.files[fileObject] = fileinfo
}

if f.config.Kstream.EnableHandleKevents {
f.devPathResolver.AddPath(ev.GetParamAsString(kparams.FilePath))
}

ev.AppendParam(kparams.NTStatus, kparams.Status, status)
if fileinfo.Type != fs.Unknown {
ev.AppendEnum(kparams.FileType, uint32(fileinfo.Type), fs.FileTypes)
Expand All @@ -205,17 +218,20 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
// the events are delayed until the respective FileOpEnd
// event arrives, we enable stack tracing for CreateFile
// events. When the CreateFile event is generated, we store
// it pending IRP map. Subsequently, the stack walk event is
// generated which we put inside the queue. After FileOpEnd
// arrives, the previous stack walk for CreateFile is popped
// from the queue and the callstack parameter attached to the
// it in pending IRP map. Subsequently, the stack walk event
// is put inside the queue. After FileOpEnd event arrives,
// the previous stack walk for CreateFile is popped from
// the queue and the callstack parameter attached to the
// event.
if f.config.Kstream.StackEnrichment {
f.mu.Lock()
defer f.mu.Unlock()
i := f.deq.RIndex(func(evt *kevent.Kevent) bool { return evt.StackID() == ev.StackID() })
if i != -1 {
s := f.deq.Remove(i)

id := ev.StackID()
q, ok := f.buckets[id]
if ok && len(q) > 0 {
var s *kevent.Kevent
s, f.buckets[id] = q[len(q)-1], q[:len(q)-1]
callstack := s.Kparams.MustGetSlice(kparams.Callstack)
ev.AppendParam(kparams.Callstack, kparams.Slice, callstack)
}
Expand All @@ -228,6 +244,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
return ev, nil
}
}

return ev, nil
case ktypes.ReleaseFile:
fileReleaseCount.Add(1)
Expand Down Expand Up @@ -297,6 +314,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
return e, f.psnap.AddMmap(e)
}
}

return e, nil
}

Expand Down Expand Up @@ -336,13 +354,16 @@ func (f *fsProcessor) purge() {
select {
case <-f.purger.C:
f.mu.Lock()

// evict unmatched stack traces
for i := 0; i < f.deq.Len(); i++ {
evt := f.deq.At(i)
if time.Since(evt.Timestamp) > time.Second*30 {
f.deq.Remove(i)
for id, q := range f.buckets {
for i, evt := range q {
if time.Since(evt.Timestamp) > time.Second*30 {
f.buckets[id] = append(q[:i], q[i+1:]...)
}
}
}

f.mu.Unlock()
case <-f.quit:
return
Expand Down
72 changes: 53 additions & 19 deletions pkg/kevent/callstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package kevent

import (
"expvar"
"github.com/gammazero/deque"
"github.com/rabbitstack/fibratus/pkg/kevent/kparams"
"github.com/rabbitstack/fibratus/pkg/util/multierror"
"github.com/rabbitstack/fibratus/pkg/util/va"
Expand Down Expand Up @@ -378,9 +377,9 @@ func (s Callstack) CallsiteInsns(pid uint32, leading bool) []string {
// popped from the queue and enriched with return addresses
// which are later subject to symbolization.
type CallstackDecorator struct {
deq *deque.Deque[*Kevent]
q *Queue
mux sync.Mutex
buckets map[uint64][]*Kevent
q *Queue
mux sync.Mutex

flusher *time.Ticker
quit chan struct{}
Expand All @@ -392,19 +391,29 @@ type CallstackDecorator struct {
func NewCallstackDecorator(q *Queue) *CallstackDecorator {
c := &CallstackDecorator{
q: q,
deq: deque.New[*Kevent](100),
buckets: make(map[uint64][]*Kevent),
flusher: time.NewTicker(flusherInterval),
quit: make(chan struct{}, 1),
}

go c.doFlush()

return c
}

// Push pushes a new event to the queue.
func (cd *CallstackDecorator) Push(e *Kevent) {
cd.mux.Lock()
defer cd.mux.Unlock()
cd.deq.PushBack(e)

// append the event to the bucket indexed by stack id
id := e.StackID()
q, ok := cd.buckets[id]
if !ok {
cd.buckets[id] = []*Kevent{e}
} else {
cd.buckets[id] = append(q, e)
}
}

// Pop receives the stack walk event and pops the oldest
Expand All @@ -414,13 +423,25 @@ func (cd *CallstackDecorator) Push(e *Kevent) {
func (cd *CallstackDecorator) Pop(e *Kevent) *Kevent {
cd.mux.Lock()
defer cd.mux.Unlock()
i := cd.deq.Index(func(evt *Kevent) bool { return evt.StackID() == e.StackID() })
if i == -1 {

id := e.StackID()
q, ok := cd.buckets[id]
if !ok {
return e
}
evt := cd.deq.Remove(i)

var evt *Kevent
if len(q) > 0 {
evt, cd.buckets[id] = q[0], q[1:]
}

if evt == nil {
return e
}

callstack := e.Kparams.MustGetSlice(kparams.Callstack)
evt.AppendParam(kparams.Callstack, kparams.Slice, callstack)

return evt
}

Expand All @@ -429,6 +450,13 @@ func (cd *CallstackDecorator) Stop() {
cd.quit <- struct{}{}
}

// RemoveBucket removes the bucket and all enqueued events.
func (cd *CallstackDecorator) RemoveBucket(id uint64) {
cd.mux.Lock()
defer cd.mux.Unlock()
delete(cd.buckets, id)
}

func (cd *CallstackDecorator) doFlush() {
for {
select {
Expand All @@ -449,20 +477,26 @@ func (cd *CallstackDecorator) doFlush() {
func (cd *CallstackDecorator) flush() []error {
cd.mux.Lock()
defer cd.mux.Unlock()
if cd.deq.Len() == 0 {

if len(cd.buckets) == 0 {
return nil
}

errs := make([]error, 0)
for i := 0; i < cd.deq.Len(); i++ {
evt := cd.deq.At(i)
if time.Since(evt.Timestamp) < maxDequeFlushPeriod {
continue
}
callstackFlushes.Add(1)
err := cd.q.push(cd.deq.Remove(i))
if err != nil {
errs = append(errs, err)

for id, q := range cd.buckets {
for i, evt := range q {
if time.Since(evt.Timestamp) < maxDequeFlushPeriod {
continue
}
callstackFlushes.Add(1)
err := cd.q.push(evt)
if err != nil {
errs = append(errs, err)
}
cd.buckets[id] = append(q[:i], q[i+1:]...)
}
}

return errs
}
8 changes: 4 additions & 4 deletions pkg/kevent/callstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestCallstackDecorator(t *testing.T) {
cd.Push(e)
cd.Push(e1)

assert.True(t, cd.deq.Len() == 2)
assert.Len(t, cd.buckets[e.StackID()], 2)

sw := &Kevent{
Type: ktypes.StackWalk,
Expand All @@ -129,7 +129,7 @@ func TestCallstackDecorator(t *testing.T) {
}

evt := cd.Pop(sw)
assert.True(t, cd.deq.Len() == 1)
assert.Len(t, cd.buckets[e.StackID()], 1)
assert.Equal(t, ktypes.CreateFile, evt.Type)
assert.True(t, evt.Kparams.Contains(kparams.Callstack))
assert.Equal(t, "C:\\Windows\\system32\\user32.dll", evt.GetParamAsString(kparams.FilePath))
Expand Down Expand Up @@ -164,11 +164,11 @@ func TestCallstackDecoratorFlush(t *testing.T) {
}

cd.Push(e)
assert.True(t, cd.deq.Len() == 1)
assert.Len(t, cd.buckets[e.StackID()], 1)
time.Sleep(time.Millisecond * 3100)

evt := <-q.Events()
assert.True(t, cd.deq.Len() == 0)
assert.Len(t, cd.buckets[e.StackID()], 0)
assert.Equal(t, ktypes.CreateFile, evt.Type)
assert.False(t, evt.Kparams.Contains(kparams.Callstack))
}
4 changes: 4 additions & 0 deletions pkg/kevent/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (q *Queue) push(e *Kevent) error {
enqueue = true
}
}
if q.stackEnrichment && e.IsTerminateThread() {
id := uint64(e.Kparams.MustGetPid() + e.Kparams.MustGetTid())
q.cd.RemoveBucket(id)
}
if enqueue || len(q.listeners) == 0 {
q.q <- e
keventsEnqueued.Add(1)
Expand Down