diff --git a/.github/workflows/binary-release.yml b/.github/workflows/binary-release.yml index 7298936..4af586f 100644 --- a/.github/workflows/binary-release.yml +++ b/.github/workflows/binary-release.yml @@ -37,7 +37,11 @@ jobs: run: | go mod download go test -v ./... - + + - name: Security scan (gosec) + run: | + docker run --rm -v ${{ github.workspace }}:/src -w /src securego/gosec:v2.22.4 gosec ./... + - name: Build for multiple platforms run: | mkdir -p dist diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index a379fbd..7c8d79b 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -31,7 +31,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v6 + uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for proper versioning @@ -49,32 +49,33 @@ jobs: echo "TIMESTAMP=$(date -u +%Y-%m-%dT%H:%M:%SZ)" >> $GITHUB_ENV - name: Set up Go - uses: actions/setup-go@v6 + uses: actions/setup-go@v5 with: go-version: '1.24' - name: Static analysis (gofmt, go vet) run: | - docker run --rm -v ${{ github.workspace }}:/src -w /src golang:1.24 sh -c "gofmt -l . || true; go vet ./... || true" + docker run --rm -v ${{ github.workspace }}:/src -w /src golang:1.24 sh -c \ + 'GOFMT_OUT=$(gofmt -l .); [ -z "$GOFMT_OUT" ] || (echo "$GOFMT_OUT"; exit 1); go vet ./...' - name: Security scan (gosec) run: | - docker run --rm -v ${{ github.workspace }}:/src -w /src securego/gosec:latest gosec ./... + docker run --rm -v ${{ github.workspace }}:/src -w /src securego/gosec:v2.22.4 gosec ./... - name: Run unit tests (inside Docker) run: | docker run --rm -v ${{ github.workspace }}:/src -w /src golang:1.24 go test -v -race ./... - name: Set up QEMU - uses: docker/setup-qemu-action@v4 + uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v4 + uses: docker/setup-buildx-action@v3 - name: Log in to the Container registry # Only login when we'll be pushing (main branch or tags, never on PR) if: (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/v')) && github.event_name != 'pull_request' - uses: docker/login-action@v4 + uses: docker/login-action@v3 with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} @@ -82,7 +83,7 @@ jobs: - name: Extract metadata (tags, labels) for Docker id: meta - uses: docker/metadata-action@v6 + uses: docker/metadata-action@v5 with: images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} tags: | @@ -95,7 +96,7 @@ jobs: type=sha,prefix=,suffix=,format=short - name: Build and push Docker image - uses: docker/build-push-action@v7 + uses: docker/build-push-action@v6 with: context: . platforms: linux/amd64,linux/arm64,linux/arm/v7 diff --git a/fanout.go b/fanout.go index 109515c..709b91b 100644 --- a/fanout.go +++ b/fanout.go @@ -6,6 +6,7 @@ package main import ( "bytes" "context" + cryptorand "crypto/rand" "encoding/json" "errors" "fmt" @@ -16,6 +17,7 @@ import ( "net/http" "net/url" "os" + "sort" "strconv" "strings" "sync" @@ -70,6 +72,13 @@ var ( metricsEnabled bool httpClient *http.Client + // configuredTargets holds the parsed TARGETS list, populated at startup. + configuredTargets []string + + // echoModeHeader and echoModeResponse cache ECHO_MODE_* env vars. + echoModeHeader bool + echoModeResponse string + requestsTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "fanout_requests_total", @@ -167,6 +176,7 @@ func init() { for k, v := range entry.Context { parts = append(parts, fmt.Sprintf("%s=%s", k, v)) } + sort.Strings(parts) contextStr = " " + strings.Join(parts, " ") } log.Printf("[%s]%s %s", entry.Level, contextStr, entry.Message) @@ -272,7 +282,26 @@ func init() { maxRetries = defaultMaxRetries } - rand.Seed(time.Now().UnixNano()) + // Cache TARGETS at startup, validating each entry is an absolute http/https URL. + if t := os.Getenv("TARGETS"); t != "" && t != "localonly" { + for _, u := range strings.Split(t, ",") { + trimmed := strings.TrimSpace(u) + if trimmed == "" { + continue + } + parsed, err := url.Parse(trimmed) + if err != nil || !parsed.IsAbs() || parsed.Host == "" || + (parsed.Scheme != "http" && parsed.Scheme != "https") { + log.Printf("WARNING: Skipping invalid target URL %q (must be absolute http/https with non-empty host)", trimmed) + continue + } + configuredTargets = append(configuredTargets, trimmed) + } + } + + // Cache echo mode config. + echoModeHeader = strings.ToLower(os.Getenv("ECHO_MODE_HEADER")) == "true" + echoModeResponse = strings.ToLower(os.Getenv("ECHO_MODE_RESPONSE")) } func logDebug(format string, args ...interface{}) { @@ -333,7 +362,7 @@ func cloneHeaders(original http.Header) http.Header { cloned := make(http.Header) for k, vv := range original { if sensitiveHeaders[http.CanonicalHeaderKey(k)] { - logWarn("Sensitive header detected and propagated: %s", k) + logDebug("Sensitive header detected and forwarded: %s", k) } cloned[k] = vv } @@ -354,11 +383,11 @@ func echoHandler(w http.ResponseWriter, r *http.Request) { "body": string(bodyBytes), } - if os.Getenv("ECHO_MODE_HEADER") == "true" { + if echoModeHeader { w.Header().Set("X-Echo-Mode", "active") } - switch os.Getenv("ECHO_MODE_RESPONSE") { + switch echoModeResponse { case "full": w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(echoData); err != nil { @@ -393,13 +422,26 @@ func healthCheck(w http.ResponseWriter, r *http.Request) { } } +// Response holds the result of a single fan-out request to one target. +// Latency is expressed in seconds as a float64 for easy consumption. type Response struct { - Target string `json:"target"` - Status int `json:"status"` - Body string `json:"body,omitempty"` - Error string `json:"error,omitempty"` - Latency time.Duration `json:"latency"` - Attempts int `json:"attempts,omitempty"` + Target string `json:"target"` + Status int `json:"status"` + Body string `json:"body,omitempty"` + Error string `json:"error,omitempty"` + Latency float64 `json:"latency_seconds"` + Attempts int `json:"attempts,omitempty"` + Truncated bool `json:"truncated,omitempty"` +} + +// generateRequestID returns a UUID v4-like hex string using crypto/rand. +// Falls back to a timestamp string if the random source fails. +func generateRequestID() string { + b := make([]byte, 16) + if _, err := cryptorand.Read(b); err != nil { + return fmt.Sprintf("%d", time.Now().UnixNano()) + } + return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) } func multiplex(w http.ResponseWriter, r *http.Request) { @@ -418,6 +460,14 @@ func multiplex(w http.ResponseWriter, r *http.Request) { return } + // Set or generate X-Request-ID for correlation across all fan-out targets. + requestID := r.Header.Get("X-Request-ID") + if requestID == "" { + requestID = generateRequestID() + r.Header.Set("X-Request-ID", requestID) + } + w.Header().Set("X-Request-ID", requestID) + getBody := r.GetBody var bodyBytes []byte @@ -427,9 +477,9 @@ func multiplex(w http.ResponseWriter, r *http.Request) { // into memory up to maxBodySize so retries can recreate the body safely. if getBody == nil { bodyBytes, readErr = io.ReadAll(io.LimitReader(r.Body, maxBodySize+1)) - if err := r.Body.Close(); err != nil { - logWarn("Failed to close request body after read: %v", err) - } + if err := r.Body.Close(); err != nil { + logWarn("Failed to close request body after read: %v", err) + } if readErr != nil { logError("Error reading request body: %v", readErr) writeJSONError(w, "Failed to read request body", http.StatusBadRequest) @@ -440,6 +490,9 @@ func multiplex(w http.ResponseWriter, r *http.Request) { writeJSONError(w, "Payload too large", http.StatusRequestEntityTooLarge) return } + if metricsEnabled { + bodySize.WithLabelValues(r.URL.Path).Observe(float64(len(bodyBytes))) + } // Provide a GetBody closure for retries getBody = func() (io.ReadCloser, error) { @@ -454,9 +507,9 @@ func multiplex(w http.ResponseWriter, r *http.Request) { return } bodyBytes, readErr = io.ReadAll(io.LimitReader(bodyReader, maxBodySize+1)) - if err := bodyReader.Close(); err != nil { - logWarn("Failed to close body reader after size check: %v", err) - } + if err := bodyReader.Close(); err != nil { + logWarn("Failed to close body reader after size check: %v", err) + } if readErr != nil { logError("Error reading body for size check: %v", readErr) @@ -468,38 +521,34 @@ func multiplex(w http.ResponseWriter, r *http.Request) { writeJSONError(w, "Payload too large", http.StatusRequestEntityTooLarge) return } + if metricsEnabled { + bodySize.WithLabelValues(r.URL.Path).Observe(float64(len(bodyBytes))) + } } else { if metricsEnabled { bodySize.WithLabelValues(r.URL.Path).Observe(float64(r.ContentLength)) } } - targetsEnv := os.Getenv("TARGETS") - targets := strings.Split(targetsEnv, ",") - if len(targets) == 0 || (len(targets) == 1 && targets[0] == "") { + if len(configuredTargets) == 0 { logError("No targets configured (TARGETS env var is empty or not set)") writeJSONError(w, "No targets configured", http.StatusServiceUnavailable) return } - responses := make([]Response, 0, len(targets)) + responses := make([]Response, 0, len(configuredTargets)) var wg sync.WaitGroup - respChan := make(chan Response, len(targets)) + respChan := make(chan Response, len(configuredTargets)) - for _, target := range targets { - targetURL := strings.TrimSpace(target) - if targetURL == "" { - logWarn("Skipping empty target URL found in TARGETS list") - continue - } + for _, target := range configuredTargets { wg.Add(1) go func(target string) { defer wg.Done() start := time.Now() resp := sendRequest(ctx, httpClient, target, r, getBody, bodyBytes) - resp.Latency = time.Since(start) + resp.Latency = time.Since(start).Seconds() respChan <- resp - }(targetURL) + }(target) } go func() { @@ -570,8 +619,8 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin } } - // target was validated by caller (must be absolute http/https with host). Suppress gosec SSRF warning. - // #nosec G704 -- validated target URL in multiplex + // target is validated as an absolute http/https URL in init() when configuredTargets is built. + // #nosec G107 -- validated target URL at startup req, err := http.NewRequestWithContext(ctx, originalReq.Method, target, bodyReader) if err != nil { resp.Status = http.StatusInternalServerError @@ -596,7 +645,7 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin req.Header.Set("X-Retry-Count", strconv.Itoa(attempts)) } - response, err = client.Do(req) // #nosec G704 -- target validated by caller + response, err = client.Do(req) // #nosec G107 -- target validated at startup if err != nil { if isRetryableError(err) && attempts < maxRetries { @@ -648,13 +697,15 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin if response == nil { resp.Status = http.StatusServiceUnavailable - resp.Error = fmt.Sprintf("Request failed after %d attempts (no response received)", attempts+1) + resp.Error = fmt.Sprintf("Request failed after %d attempts (no response received)", attempts) logErrorWithContext(map[string]string{"target": target}, "%s", resp.Error) return resp } defer response.Body.Close() - respBody, readErr := io.ReadAll(io.LimitReader(response.Body, maxBodySize)) + // Read one extra byte so we can distinguish an exact-maxBodySize response + // from a genuinely truncated one (io.LimitReader alone cannot tell the difference). + respBody, readErr := io.ReadAll(io.LimitReader(response.Body, maxBodySize+1)) if readErr != nil { resp.Status = http.StatusInternalServerError resp.Error = fmt.Sprintf("Failed to read response body: %v", readErr) @@ -665,6 +716,12 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin return resp } + if int64(len(respBody)) > maxBodySize { + resp.Truncated = true + respBody = respBody[:maxBodySize] + logWarnWithContext(map[string]string{"target": target}, "Response body truncated at limit (%d bytes)", maxBodySize) + } + resp.Status = response.StatusCode resp.Body = string(respBody) resp.Attempts = attempts + 1 @@ -674,7 +731,7 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin "target": target, "status": strconv.Itoa(resp.Status), "attempts": strconv.Itoa(resp.Attempts), - "latency": resp.Latency.String(), + "latency": time.Since(startTime).String(), }, "Request completed successfully", ) @@ -687,27 +744,16 @@ func isRetryableError(err error) bool { return false } - // Prefer typed checks for net errors and url errors + // Prefer typed checks for net errors and url errors. + // Note: net.Error.Temporary() is deprecated since Go 1.18 and intentionally omitted. var nerr net.Error - if errors.As(err, &nerr) { - if nerr.Timeout() || nerr.Temporary() { - return true - } + if errors.As(err, &nerr) && nerr.Timeout() { + return true } var ue *url.Error - if errors.As(err, &ue) { - // If the wrapped error is a net.Error with timeout/temporary, treat as retryable - var innerNetErr net.Error - if errors.As(ue.Err, &innerNetErr) { - if innerNetErr.Timeout() || innerNetErr.Temporary() { - return true - } - } - // Some url.Errors include "timeout" in the string; treat as retryable - if ue.Timeout() { - return true - } + if errors.As(err, &ue) && ue.Timeout() { + return true } // Fallback: substring matching for common transient messages @@ -730,18 +776,10 @@ func addJitter(d time.Duration) time.Duration { return time.Duration(jitter) } -func min(a, b time.Duration) time.Duration { - if a < b { - return a - } - return b -} - func writeJSON(w http.ResponseWriter, v interface{}) error { encoder := json.NewEncoder(w) encoder.SetEscapeHTML(false) if err := encoder.Encode(v); err != nil { - w.WriteHeader(http.StatusInternalServerError) logError("Error encoding JSON: %v", err) return err } @@ -754,22 +792,45 @@ func writeJSONError(w http.ResponseWriter, message string, status int) { if err := writeJSON(w, map[string]string{"error": message}); err != nil { logError("Failed to write JSON error response: %v", err) } - } func versionHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") if err := writeJSON(w, map[string]string{ - "version": Version, - "git_commit": GitCommit, - "build_time": BuildTime, - }); err != nil { - logError("Failed to write version response: %v", err) - } - + "version": Version, + "git_commit": GitCommit, + "build_time": BuildTime, + }); err != nil { + logError("Failed to write version response: %v", err) + } } func main() { + // Handle CLI flags before any server setup so they run cleanly without side effects. + if len(os.Args) > 1 { + switch os.Args[1] { + case "-version": + fmt.Printf("FanOut %s (commit: %s, built: %s)\n", Version, GitCommit, BuildTime) + os.Exit(0) + case "-healthcheck": + port := os.Getenv("PORT") + if port == "" { + port = "8080" + } + hcClient := &http.Client{Timeout: 5 * time.Second} + // #nosec G107 -- localhost-only health probe, port sourced from env + hcResp, err := hcClient.Get("http://localhost:" + port + "/health") + if err != nil { + os.Exit(1) + } + hcResp.Body.Close() + if hcResp.StatusCode != http.StatusOK { + os.Exit(1) + } + os.Exit(0) + } + } + log.SetOutput(os.Stdout) logInfo("Starting FanOut service version %s (commit: %s, built: %s)", Version, GitCommit, BuildTime) @@ -780,19 +841,12 @@ func main() { http.HandleFunc(endpointPath, echoHandler) logInfo("Running in ECHO MODE (Endpoint: %s)", endpointPath) } else { - targetList := strings.Split(targets, ",") - validTargets := 0 - for _, t := range targetList { - if strings.TrimSpace(t) != "" { - validTargets++ - } - } - if validTargets == 0 { + if len(configuredTargets) == 0 { logError("FATAL: No valid target URLs configured in TARGETS environment variable.") os.Exit(1) } http.HandleFunc(endpointPath, multiplex) - logInfo("Running in MULTIPLEX MODE (Endpoint: %s) with %d targets", endpointPath, validTargets) + logInfo("Running in MULTIPLEX MODE (Endpoint: %s) with %d targets", endpointPath, len(configuredTargets)) } http.HandleFunc("/health", healthCheck) @@ -811,11 +865,6 @@ func main() { port = "8080" } - if len(os.Args) > 1 && os.Args[1] == "-version" { - fmt.Printf("FanOut %s (commit: %s, built: %s)\n", Version, GitCommit, BuildTime) - os.Exit(0) - } - var displayMaxBody uint64 if maxBodySize < 0 { displayMaxBody = 0 diff --git a/fanout_additional_test.go b/fanout_additional_test.go index 8d7b65c..b7222a5 100644 --- a/fanout_additional_test.go +++ b/fanout_additional_test.go @@ -1,11 +1,11 @@ package main import ( - "bytes" "encoding/json" + "io" "net/http" "net/http/httptest" - "os" + "strings" "testing" ) @@ -18,14 +18,15 @@ func TestMultiplexNoGetBody(t *testing.T) { })) defer server.Close() - // Set TARGETS to the mock server - origTargets := os.Getenv("TARGETS") - defer os.Setenv("TARGETS", origTargets) - os.Setenv("TARGETS", server.URL) + // Set configuredTargets directly (TARGETS env var is cached at init time, not per-request) + origTargets := configuredTargets + defer func() { configuredTargets = origTargets }() + configuredTargets = []string{server.URL} - // Create a request WITHOUT GetBody (http.NewRequest leaves GetBody nil) - body := []byte("hello") - req, err := http.NewRequest("POST", "/fanout", bytes.NewReader(body)) + // http.NewRequest sets GetBody automatically for *bytes.Reader / *bytes.Buffer. + // Wrap in io.NopCloser so the body is a plain io.ReadCloser and GetBody stays nil, + // which forces multiplex into the pre-read code path. + req, err := http.NewRequest("POST", "/fanout", io.NopCloser(strings.NewReader("hello"))) if err != nil { t.Fatalf("Failed to create request: %v", err) } diff --git a/fanout_test.go b/fanout_test.go index 0bbc161..ba17fa4 100644 --- a/fanout_test.go +++ b/fanout_test.go @@ -9,7 +9,6 @@ import ( "io" "net/http" "net/http/httptest" - "os" "reflect" "strings" "testing" @@ -160,8 +159,8 @@ func TestWriteJSON(t *testing.T) { Body: "OK", Attempts: 0, // Due to omitempty tag, this won't appear in JSON when 0 }, - // Updated expected value - removed attempts since it has omitempty tag - expected: `{"target":"http://example.com","status":200,"body":"OK","latency":0}`, + // Updated expected value - removed attempts since it has omitempty tag; latency_seconds is float64 + expected: `{"target":"http://example.com","status":200,"body":"OK","latency_seconds":0}`, }, } @@ -266,17 +265,16 @@ func TestHealthCheck(t *testing.T) { // TestEchoHandlerSimpleMode tests the echo handler in simple mode func TestEchoHandlerSimpleMode(t *testing.T) { - // Save and restore original env vars - originalHeader := os.Getenv("ECHO_MODE_HEADER") - originalResponse := os.Getenv("ECHO_MODE_RESPONSE") + // Save and restore cached echo mode vars (env vars are read at init time, not per-request) + origHeader := echoModeHeader + origResponse := echoModeResponse defer func() { - os.Setenv("ECHO_MODE_HEADER", originalHeader) - os.Setenv("ECHO_MODE_RESPONSE", originalResponse) + echoModeHeader = origHeader + echoModeResponse = origResponse }() - // Set environment for this test - os.Setenv("ECHO_MODE_HEADER", "false") - os.Setenv("ECHO_MODE_RESPONSE", "simple") + echoModeHeader = false + echoModeResponse = "simple" // Create a request with a test body body := []byte(`{"test":"data"}`) @@ -308,17 +306,16 @@ func TestEchoHandlerSimpleMode(t *testing.T) { // TestEchoHandlerFullMode tests the echo handler in full mode func TestEchoHandlerFullMode(t *testing.T) { - // Save and restore original env vars - originalHeader := os.Getenv("ECHO_MODE_HEADER") - originalResponse := os.Getenv("ECHO_MODE_RESPONSE") + // Save and restore cached echo mode vars (env vars are read at init time, not per-request) + origHeader := echoModeHeader + origResponse := echoModeResponse defer func() { - os.Setenv("ECHO_MODE_HEADER", originalHeader) - os.Setenv("ECHO_MODE_RESPONSE", originalResponse) + echoModeHeader = origHeader + echoModeResponse = origResponse }() - // Set environment for this test - os.Setenv("ECHO_MODE_HEADER", "true") - os.Setenv("ECHO_MODE_RESPONSE", "full") + echoModeHeader = true + echoModeResponse = "full" // Create a request with a test body body := []byte(`{"test":"data"}`) @@ -383,7 +380,9 @@ func TestSendRequest(t *testing.T) { t.Fatalf("Failed to create request: %v", err) } - // Set app-level retry count for test + // Set app-level retry count for test (restored on exit) + origMaxRetries := maxRetries + defer func() { maxRetries = origMaxRetries }() maxRetries = 2 // Call sendRequest with mock server URL @@ -416,7 +415,9 @@ func TestSendRequestNetworkError(t *testing.T) { t.Fatalf("Failed to create request: %v", err) } - // Set app-level retry count for test + // Set app-level retry count for test (restored on exit) + origMaxRetries := maxRetries + defer func() { maxRetries = origMaxRetries }() maxRetries = 1 // Call sendRequest with a non-existent endpoint (will cause error)