Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions peers/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"log"
"net/http"
"net/netip"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -64,3 +66,252 @@ backend st_src_global
}
})
}

func TestE2EWriter(t *testing.T) {
writerCh := make(chan *Writer, 1)
a := Peer{HandlerSource: func() Handler {
return &writerE2EHandler{writerCh: writerCh}
}}

l := testutil.TCPListener(t)
go a.Serve(l)

cfg := testutil.HAProxyConfig{
FrontendPort: fmt.Sprintf("%d", testutil.TCPPort(t)),
CustomFrontendConfig: `
http-request track-sc0 src table st_blocklist
http-request deny deny_status 403 if { sc0_get_gpc0 gt 0 }
`,
CustomConfig: `
backend st_blocklist
stick-table type ip size 200k expire 5m store gpc0 peers mypeers
`,
PeerAddr: l.Addr().String(),
}

t.Run("push entry blocks request", func(t *testing.T) {
cfg.Run(t)

var w *Writer
select {
case w = <-writerCh:
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for HAProxy peer connection")
}

time.Sleep(500 * time.Millisecond)

resp, err := http.Get("http://127.0.0.1:" + cfg.FrontendPort)
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("expected 200 before push, got %d", resp.StatusCode)
}

tableDef := &sticktable.Definition{
StickTableID: 0,
Name: "st_blocklist",
KeyType: sticktable.KeyTypeIPv4Address,
KeyLength: 4,
DataTypes: []sticktable.DataTypeDefinition{
{DataType: sticktable.DataTypeGPC0},
},
Expiry: 300000,
}

if err := w.SendTableDefinition(tableDef); err != nil {
t.Fatal(err)
}

key := sticktable.IPv4AddressKey(netip.MustParseAddr("127.0.0.1"))
gpc0 := sticktable.UnsignedIntegerData(1)
entry := &sticktable.EntryUpdate{
StickTable: tableDef,
Key: &key,
Data: []sticktable.MapData{&gpc0},
}
if err := w.SendEntry(entry); err != nil {
t.Fatal(err)
}

time.Sleep(1 * time.Second)

resp, err = http.Get("http://127.0.0.1:" + cfg.FrontendPort)
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusForbidden {
t.Errorf("expected 403 after push, got %d", resp.StatusCode)
}
})
}

type writerE2EHandler struct {
writerCh chan *Writer
once sync.Once
}

func (h *writerE2EHandler) HandleUpdate(_ context.Context, u *sticktable.EntryUpdate) {
log.Println(u)
}

func (h *writerE2EHandler) HandleHandshake(ctx context.Context, _ *Handshake) {
h.once.Do(func() {
h.writerCh <- WriterFromContext(ctx)
})
}

func (h *writerE2EHandler) Close() error { return nil }

func TestE2EWriterTimedEntry(t *testing.T) {
writerCh := make(chan *Writer, 1)
a := Peer{HandlerSource: func() Handler {
return &writerE2EHandler{writerCh: writerCh}
}}

l := testutil.TCPListener(t)
go a.Serve(l)

cfg := testutil.HAProxyConfig{
FrontendPort: fmt.Sprintf("%d", testutil.TCPPort(t)),
CustomConfig: `
backend st_timed
stick-table type ip size 200k expire 5m peers mypeers
`,
BackendConfig: `
http-request set-var(txn.lookup_ip) str(127.0.0.2)
http-request return status 200 content-type "text/plain" hdr X-Expire %[var(txn.lookup_ip),table_expire(st_timed)] string "OK\n"
`,
PeerAddr: l.Addr().String(),
}

t.Run("push timed entry with 60s expiry", func(t *testing.T) {
cfg.Run(t)

var w *Writer
select {
case w = <-writerCh:
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for HAProxy peer connection")
}

time.Sleep(500 * time.Millisecond)

tableDef := &sticktable.Definition{
StickTableID: 0,
Name: "st_timed",
KeyType: sticktable.KeyTypeIPv4Address,
KeyLength: 4,
Expiry: 300000,
}

if err := w.SendTableDefinition(tableDef); err != nil {
t.Fatal(err)
}

key := sticktable.IPv4AddressKey(netip.MustParseAddr("127.0.0.2"))
entry := &sticktable.EntryUpdate{
StickTable: tableDef,
Key: &key,
WithExpiry: true,
Expiry: 60000,
}
if err := w.SendEntry(entry); err != nil {
t.Fatal(err)
}

time.Sleep(1 * time.Second)

resp, err := http.Get("http://127.0.0.1:" + cfg.FrontendPort)
if err != nil {
t.Fatal(err)
}
resp.Body.Close()

xexpire := resp.Header.Get("X-Expire")
t.Logf("X-Expire: %s", xexpire)

if xexpire == "" || xexpire == "0" {
t.Errorf("expected non-zero X-Expire header, got %q", xexpire)
}
})
}

func TestE2EWriterBulkEntries(t *testing.T) {
writerCh := make(chan *Writer, 1)
a := Peer{HandlerSource: func() Handler {
return &writerE2EHandler{writerCh: writerCh}
}}

l := testutil.TCPListener(t)
go a.Serve(l)

cfg := testutil.HAProxyConfig{
FrontendPort: fmt.Sprintf("%d", testutil.TCPPort(t)),
CustomConfig: `
backend st_bulk
stick-table type ip size 200k expire 5m peers mypeers
`,
BackendConfig: `
http-request return status 200 content-type "text/plain" hdr X-Count %[table_cnt(st_bulk)] string "OK\n"
`,
PeerAddr: l.Addr().String(),
}

t.Run("push 20 entries", func(t *testing.T) {
cfg.Run(t)

var w *Writer
select {
case w = <-writerCh:
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for HAProxy peer connection")
}

time.Sleep(500 * time.Millisecond)

tableDef := &sticktable.Definition{
StickTableID: 0,
Name: "st_bulk",
KeyType: sticktable.KeyTypeIPv4Address,
KeyLength: 4,
Expiry: 300000,
}

if err := w.SendTableDefinition(tableDef); err != nil {
t.Fatal(err)
}

for i := 0; i < 20; i++ {
ip := netip.AddrFrom4([4]byte{10, 0, 0, byte(i + 1)})
key := sticktable.IPv4AddressKey(ip)
entry := &sticktable.EntryUpdate{
StickTable: tableDef,
Key: &key,
WithExpiry: true,
Expiry: 60000,
}
if err := w.SendEntry(entry); err != nil {
t.Fatalf("sending entry %d (%s): %v", i, ip, err)
}
}

time.Sleep(1 * time.Second)

resp, err := http.Get("http://127.0.0.1:" + cfg.FrontendPort)
if err != nil {
t.Fatal(err)
}
resp.Body.Close()

xcount := resp.Header.Get("X-Count")
t.Logf("X-Count: %s", xcount)

if xcount != "20" {
t.Errorf("expected X-Count=20, got %q", xcount)
}
})
}
7 changes: 7 additions & 0 deletions peers/example/push/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/dropmorepackets/haproxy-go/peers/example/push

go 1.21

replace github.com/dropmorepackets/haproxy-go => ../../../

require github.com/dropmorepackets/haproxy-go v0.0.0-00010101000000-000000000000
62 changes: 62 additions & 0 deletions peers/example/push/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// push is an example that demonstrates how to push stick table entries
// to HAProxy over an existing peer connection. When HAProxy connects to
// this peer, the handler uses WriterFromContext to obtain a Writer and
// sends a table definition followed by entry updates.
package main

import (
"context"
"log"
"net/netip"

"github.com/dropmorepackets/haproxy-go/peers"
"github.com/dropmorepackets/haproxy-go/peers/sticktable"
)

func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)

err := peers.ListenAndServe(":21000", peers.HandlerFunc(func(ctx context.Context, u *sticktable.EntryUpdate) {
log.Println("received:", u.String())

// Get the writer for this connection to push entries back.
w := peers.WriterFromContext(ctx)

// Define the stick table we want to push to.
// Matches: stick-table type ip size 200k expire 5m store gpc0 peers local-peers
tableDef := &sticktable.Definition{
StickTableID: 0,
Name: "my_blocklist",
KeyType: sticktable.KeyTypeIPv4Address,
KeyLength: 4,
DataTypes: []sticktable.DataTypeDefinition{
{DataType: sticktable.DataTypeGPC0},
},
Expiry: 300000, // 5 minutes in ms
}

if err := w.SendTableDefinition(tableDef); err != nil {
log.Printf("error sending table definition: %v", err)
return
}

// Push an entry marking an IP as blocked (gpc0 = 1).
key := sticktable.IPv4AddressKey(netip.MustParseAddr("10.0.0.1"))
gpc0 := sticktable.UnsignedIntegerData(1)
entry := &sticktable.EntryUpdate{
StickTable: tableDef,
Key: &key,
Data: []sticktable.MapData{&gpc0},
}

if err := w.SendEntry(entry); err != nil {
log.Printf("error sending entry: %v", err)
return
}

log.Println("pushed blocklist entry for 10.0.0.1")
}))
if err != nil {
log.Fatal(err)
}
}
15 changes: 14 additions & 1 deletion peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net"
"sync"
)

type Peer struct {
Expand Down Expand Up @@ -59,7 +60,10 @@ func (a *Peer) Serve(l net.Listener) error {
// Wrap the context to provide access to the underlying connection.
// TODO(tim): Do we really want this?
ctx := context.WithValue(a.BaseContext, connectionKey, nc)
p := newProtocolClient(ctx, nc, a.HandlerSource())
wmu := &sync.Mutex{}
w := newWriter(nc, wmu)
ctx = context.WithValue(ctx, writerKey, w)
p := newProtocolClient(ctx, nc, a.HandlerSource(), wmu, w.bufferedWriter())
go func() {
defer nc.Close()
defer p.Close()
Expand All @@ -75,10 +79,19 @@ type contextKey string

const (
connectionKey = contextKey("connection")
writerKey = contextKey("writer")
)

// Connection returns the underlying connection used in calls
// to function in a Handler.
func Connection(ctx context.Context) net.Conn {
return ctx.Value(connectionKey).(net.Conn)
}

// WriterFromContext returns the Writer associated with the current peer
// connection. Use this inside a Handler to push stick table updates back
// to HAProxy over the same connection that HAProxy established to us.
// Panics if called outside a handler context.
func WriterFromContext(ctx context.Context) *Writer {
return ctx.Value(writerKey).(*Writer)
}
Loading
Loading