Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 5 additions & 36 deletions pkg/event/event_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ package event
import (
"encoding/binary"
"fmt"
"os"
"strings"
"sync"
"unsafe"

"github.com/rabbitstack/fibratus/pkg/event/params"
"github.com/rabbitstack/fibratus/pkg/sys"
"github.com/rabbitstack/fibratus/pkg/sys/etw"
Expand All @@ -29,10 +34,6 @@ import (
"github.com/rabbitstack/fibratus/pkg/util/hostname"
"github.com/rabbitstack/fibratus/pkg/util/ntstatus"
"golang.org/x/sys/windows"
"os"
"strings"
"sync"
"unsafe"
)

var (
Expand Down Expand Up @@ -153,18 +154,6 @@ func (e *Event) IsDropped(capture bool) bool {
// current process is dropped.
func IsCurrentProcDropped(pid uint32) bool { return DropCurrentProc && pid == currentPid }

// DelayKey returns the value that is used to
// store and reference delayed events in the event
// backlog state. The delayed event is indexed by
// the sequence identifier.
func (e *Event) DelayKey() uint64 {
switch e.Type {
case CreateHandle, CloseHandle:
return e.Params.MustGetUint64(params.HandleObject)
}
return 0
}

// IsNetworkTCP determines whether the event pertains to network TCP events.
func (e *Event) IsNetworkTCP() bool {
return e.Category == Net && !e.IsNetworkUDP()
Expand Down Expand Up @@ -401,26 +390,6 @@ func (e *Event) PartialKey() uint64 {
return 0
}

// BacklogKey represents the key used to index the events in the backlog store.
func (e *Event) BacklogKey() uint64 {
switch e.Type {
case CreateHandle, CloseHandle:
return e.Params.MustGetUint64(params.HandleObject)
}
return 0
}

// CopyState adds parameters, tags, or process state from the provided event.
func (e *Event) CopyState(evt *Event) {
switch evt.Type {
case CloseHandle:
if evt.Params.Contains(params.ImagePath) {
e.Params.Append(params.ImagePath, params.UnicodeString, evt.GetParamAsString(params.ImagePath))
}
_ = e.Params.SetValue(params.HandleObjectName, evt.GetParamAsString(params.HandleObjectName))
}
}

// Summary returns a brief summary of this event. Various important substrings
// in the summary text are highlighted by surrounding them inside <code> HTML tags.
func (e *Event) Summary() string {
Expand Down
58 changes: 0 additions & 58 deletions pkg/event/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ package event

import (
"expvar"
"github.com/golang/groupcache/lru"
"github.com/rabbitstack/fibratus/pkg/util/multierror"
)

// backlogCacheSize specifies the max size of the backlog cache.
// When the backlog cache size is reached, the oldest entries are
// removed from the cache.
const backlogCacheSize = 800

// eventsEnqueued counts the number of events that are pushed to the queue
var eventsEnqueued = expvar.NewInt("eventsource.events.enqueued")

Expand All @@ -52,7 +45,6 @@ type Listener interface {
type Queue struct {
q chan *Event
listeners []Listener
backlog *backlog
decorator *StackwalkDecorator
stackEnrichment bool
enqueueAlways bool
Expand All @@ -63,7 +55,6 @@ func NewQueue(size int, stackEnrichment bool, enqueueAlways bool) *Queue {
q := &Queue{
q: make(chan *Event, size),
listeners: make([]Listener, 0),
backlog: newBacklog(backlogCacheSize),
stackEnrichment: stackEnrichment,
enqueueAlways: enqueueAlways,
}
Expand All @@ -76,7 +67,6 @@ func NewQueueWithChannel(ch chan *Event, stackEnrichment bool, enqueueAlways boo
q := &Queue{
q: ch,
listeners: make([]Listener, 0),
backlog: newBacklog(backlogCacheSize),
stackEnrichment: stackEnrichment,
enqueueAlways: enqueueAlways,
}
Expand Down Expand Up @@ -128,14 +118,6 @@ func (q *Queue) Push(e *Event) error {
e = q.decorator.Pop(e)
}
}
if isEventDelayed(e) {
q.backlog.put(e)
return nil
}
evt := q.backlog.pop(e)
if evt != nil {
return multierror.Wrap(q.push(evt), q.push(e))
}
// drop stack walk events
if e.IsStackWalk() {
return nil
Expand Down Expand Up @@ -167,43 +149,3 @@ func (q *Queue) push(e *Event) error {
}
return nil
}

func isEventDelayed(e *Event) bool {
return e.IsCreateHandle()
}

type backlog struct {
cache *lru.Cache
}

func newBacklog(size int) *backlog {
return &backlog{cache: lru.New(size)}
}

func (b *backlog) put(evt *Event) {
if b.cache.Len() > backlogCacheSize {
b.cache.RemoveOldest()
}
key := evt.BacklogKey()
if key != 0 {
b.cache.Add(key, evt)
}
}

func (b *backlog) pop(evt *Event) *Event {
key := evt.BacklogKey()
if key == 0 {
return nil
}
ev, ok := b.cache.Get(key)
if !ok {
return nil
}
b.cache.Remove(key)
e := ev.(*Event)
e.CopyState(evt)
return e
}

func (b *backlog) size() int { return b.cache.Len() }
func (b *backlog) empty() bool { return b.size() == 0 }
53 changes: 3 additions & 50 deletions pkg/event/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ package event

import (
"errors"
"testing"
"time"

"github.com/rabbitstack/fibratus/pkg/event/params"
"github.com/rabbitstack/fibratus/pkg/fs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"reflect"
"testing"
"time"
)

// AddParamListener receives the event and appends a parameter to it
Expand Down Expand Up @@ -190,49 +189,3 @@ func TestQueuePush(t *testing.T) {
})
}
}

func TestPushBacklog(t *testing.T) {
e := &Event{
Type: CreateHandle,
Tid: 2484,
PID: 859,
Category: Handle,
Params: Params{
params.HandleID: {Name: params.HandleID, Type: params.Uint32, Value: uint32(21)},
params.HandleObjectTypeID: {Name: params.HandleObjectTypeID, Type: params.AnsiString, Value: "Key"},
params.HandleObject: {Name: params.HandleObject, Type: params.Uint64, Value: uint64(18446692422059208560)},
params.HandleObjectName: {Name: params.HandleObjectName, Type: params.UnicodeString, Value: ""},
},
Metadata: make(Metadata),
}

q := NewQueue(100, false, true)
q.RegisterListener(&DummyListener{})

require.NoError(t, q.Push(e))
require.Len(t, q.Events(), 0)
require.False(t, q.backlog.empty())

e1 := &Event{
Type: CloseHandle,
Tid: 2484,
PID: 859,
Category: Handle,
Params: Params{
params.HandleID: {Name: params.HandleID, Type: params.Uint32, Value: uint32(21)},
params.HandleObjectTypeID: {Name: params.HandleObjectTypeID, Type: params.AnsiString, Value: "Key"},
params.HandleObject: {Name: params.HandleObject, Type: params.Uint64, Value: uint64(18446692422059208560)},
params.HandleObjectName: {Name: params.HandleObjectName, Type: params.UnicodeString, Value: `\REGISTRY\MACHINE\SYSTEM\ControlSet001\Services\Tcpip\Parameters\Interfaces\{b677c565-6ca5-45d3-b618-736b4e09b036}`},
},
Metadata: make(Metadata),
}

require.NoError(t, q.Push(e1))
require.True(t, q.backlog.empty())

ev := <-q.Events()
require.NotNil(t, ev)
assert.Equal(t, `\REGISTRY\MACHINE\SYSTEM\ControlSet001\Services\Tcpip\Parameters\Interfaces\{b677c565-6ca5-45d3-b618-736b4e09b036}`, ev.GetParamAsString(params.HandleObjectName))

require.True(t, reflect.DeepEqual(e1, <-q.Events()))
}
2 changes: 0 additions & 2 deletions rules/macros/macros.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@
- macro: load_driver
expr: >
(load_module and (image.name iendswith '.sys' or image.is_driver))
or
(create_handle and handle.type = 'Driver')
description: |
Detects the loading of the kernel driver. Image load events are published when
executable images, DLLs, or driver PE objects are loaded. On the contrary, we can
Expand Down
Loading