diff --git a/vulnfeeds/cmd/converters/cve/nvd-cve-osv/main.go b/vulnfeeds/cmd/converters/cve/nvd-cve-osv/main.go index d587f98e806..84431b7bbcf 100644 --- a/vulnfeeds/cmd/converters/cve/nvd-cve-osv/main.go +++ b/vulnfeeds/cmd/converters/cve/nvd-cve-osv/main.go @@ -140,12 +140,15 @@ func main() { logger.Info("GCS Upload Pool initialized", slog.String("bucket", *outputBucket)) } + var retryList []models.NVDCVE + var retryMu sync.Mutex + jobs := make(chan models.NVDCVE, *workers) var wg sync.WaitGroup for range *workers { wg.Add(1) - go worker(&wg, jobs, gcsHelper, *outDir, vpRepoCache, repoTagsCache) + go worker(&wg, jobs, gcsHelper, *outDir, vpRepoCache, repoTagsCache, &retryList, &retryMu) } for _, file := range files { @@ -170,6 +173,25 @@ func main() { close(jobs) wg.Wait() + + if len(retryList) > 0 { + logger.Info("Retrying records that failed with rate limit errors (429)", slog.Int("count", len(retryList))) + if repoTagsCache != nil { + repoTagsCache.ClearRateLimited() + } + retryJobs := make(chan models.NVDCVE, len(retryList)) + for _, cve := range retryList { + retryJobs <- cve + } + close(retryJobs) + + var retryWg sync.WaitGroup + for range *workers { + retryWg.Add(1) + go worker(&retryWg, retryJobs, gcsHelper, *outDir, vpRepoCache, repoTagsCache, nil, nil) + } + retryWg.Wait() + } timesBlocked := int64(0) if gcsHelper != nil { timesBlocked = gcsHelper.GetTimesBlocked() @@ -215,11 +237,13 @@ func processCVE(cve models.NVDCVE, vpRepoCache *c.VPRepoCache, repoTagsCache git return vuln, finalMetrics, outcome } -func worker(wg *sync.WaitGroup, jobs <-chan models.NVDCVE, gcsHelper *gcs.Helper, outDir string, vpRepoCache *c.VPRepoCache, repoTagsCache git.RepoTagsCache) { +func worker(wg *sync.WaitGroup, jobs <-chan models.NVDCVE, gcsHelper *gcs.Helper, outDir string, vpRepoCache *c.VPRepoCache, repoTagsCache git.RepoTagsCache, retryList *[]models.NVDCVE, retryMu *sync.Mutex) { defer wg.Done() for cve := range jobs { vuln, metrics, outcome := processCVE(cve, vpRepoCache, repoTagsCache) - totalConversionsCount.Add(1) + if retryList != nil { + totalConversionsCount.Add(1) + } cveID := string(cve.ID) if outcome == models.Error { var notes []string @@ -227,6 +251,11 @@ func worker(wg *sync.WaitGroup, jobs <-chan models.NVDCVE, gcsHelper *gcs.Helper notes = metrics.Notes } logger.Error("Error generating OSV record", slog.String("cve", cveID), slog.String("outcome", outcome.String()), slog.Any("notes", notes)) + if retryList != nil && retryMu != nil { + retryMu.Lock() + *retryList = append(*retryList, cve) + retryMu.Unlock() + } continue // Don't attempt to output files if there was an error } diff --git a/vulnfeeds/cmd/mirrors/cpe-repo-gen/main.go b/vulnfeeds/cmd/mirrors/cpe-repo-gen/main.go index 153a0fab545..27e853df36d 100644 --- a/vulnfeeds/cmd/mirrors/cpe-repo-gen/main.go +++ b/vulnfeeds/cmd/mirrors/cpe-repo-gen/main.go @@ -385,7 +385,7 @@ func analyzeCPEDictionary(cpes []CPE) (productToRepo c.VendorProductToRepoMap, d logger.Info("Disregarding derived repo", slog.String("repo", repo), slog.String("vendor", vp.Vendor), slog.String("product", vp.Product), slog.Any("err", err)) continue } - if !git.ValidRepoAndHasUsableRefs(repo) { + if valid, _ := git.ValidRepoAndHasUsableRefs(repo); !valid { logger.Info("Disregarding derived repo as unusable", slog.String("repo", repo), slog.String("vendor", vp.Vendor), slog.String("product", vp.Product)) continue } @@ -407,7 +407,7 @@ func validateRepos(prm c.VendorProductToRepoMap) (validated c.VendorProductToRep entryCount++ // As a side-effect, this also omits any with no repos. for _, r := range prm[vp] { - if !git.ValidRepoAndHasUsableRefs(r) { + if valid, _ := git.ValidRepoAndHasUsableRefs(r); !valid { logger.Info("Invalid repo", slog.Int("count", entryCount), slog.Int("total", len(prm)), slog.String("repo", r), slog.String("vendor", vp.Vendor), slog.String("product", vp.Product)) continue } diff --git a/vulnfeeds/conversion/versions.go b/vulnfeeds/conversion/versions.go index 6a8b6dbbed7..61f1a9b4ab9 100644 --- a/vulnfeeds/conversion/versions.go +++ b/vulnfeeds/conversion/versions.go @@ -1058,7 +1058,7 @@ func (c *VPRepoCache) MaybeUpdate(vp *VendorProduct, repo string) { return } // Avoid polluting the cache with existent-but-useless repos. - if git.ValidRepoAndHasUsableRefs(repo) { + if valid, _ := git.ValidRepoAndHasUsableRefs(repo); valid { c.m[*vp] = append(c.m[*vp], repo) } } @@ -1232,11 +1232,7 @@ func ReposFromReferences(cache *VPRepoCache, vp *VendorProduct, refs []models.Re continue } // If it's any other repo-shaped URL, it's only useful if it has tags. - if (err == nil && !git.ValidRepo(repo)) || (err != nil && !git.ValidRepoAndHasUsableRefs(repo)) { - if repoTagsCache != nil { - repoTagsCache.SetInvalid(repo) - } - + if isValid, _ := validateRepo(repo, err == nil, repoTagsCache); !isValid { continue } repos = append(repos, repo) @@ -1281,3 +1277,22 @@ func ReposFromReferencesCVEList(refs []models.Reference, tagDenyList []string, m return repos } + +func validateRepo(repo string, isCommit bool, cache git.RepoTagsCache) (bool, error) { + var valid bool + var err error + if isCommit { + valid, err = git.ValidRepo(repo) + } else { + valid, err = git.ValidRepoAndHasUsableRefs(repo) + } + if !valid && cache != nil { + if err != nil && (errors.Is(err, git.ErrRateLimit) || strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests")) { + cache.SetRateLimited(repo) + } else { + cache.SetInvalid(repo) + } + } + + return valid, err +} diff --git a/vulnfeeds/git/repository.go b/vulnfeeds/git/repository.go index 714adb316a7..d14e8d6123f 100644 --- a/vulnfeeds/git/repository.go +++ b/vulnfeeds/git/repository.go @@ -82,7 +82,9 @@ type RepoTagsCache interface { Get(repo string) (RepoTagsMap, bool) Set(repo string, tags RepoTagsMap) SetInvalid(repo string) + SetRateLimited(repo string) IsInvalid(repo string) bool + ClearRateLimited() SetCanonicalLink(repo string, canonicalLink string) GetCanonicalLink(repo string) (string, bool) } @@ -92,6 +94,7 @@ type InMemoryRepoTagsCache struct { m map[string]RepoTagsMap invalid map[string]bool + rateLimited map[string]bool canonicalLink map[string]string } @@ -124,9 +127,21 @@ func (c *InMemoryRepoTagsCache) SetInvalid(repo string) { c.invalid[repo] = true } +func (c *InMemoryRepoTagsCache) SetRateLimited(repo string) { + c.Lock() + defer c.Unlock() + if c.rateLimited == nil { + c.rateLimited = make(map[string]bool) + } + c.rateLimited[repo] = true +} + func (c *InMemoryRepoTagsCache) IsInvalid(repo string) bool { c.RLock() defer c.RUnlock() + if c.rateLimited != nil && c.rateLimited[repo] { + return true + } if c.invalid == nil { return false } @@ -134,6 +149,12 @@ func (c *InMemoryRepoTagsCache) IsInvalid(repo string) bool { return c.invalid[repo] } +func (c *InMemoryRepoTagsCache) ClearRateLimited() { + c.Lock() + defer c.Unlock() + c.rateLimited = make(map[string]bool) +} + func (c *InMemoryRepoTagsCache) SetCanonicalLink(repo string, canonicalLink string) { c.Lock() defer c.Unlock() @@ -156,6 +177,8 @@ func (c *InMemoryRepoTagsCache) GetCanonicalLink(repo string) (string, bool) { type RedisRepoTagsCache struct { redisClient *redis.Client + mu sync.RWMutex + rateLimited map[string]bool } func (c *RedisRepoTagsCache) Get(repo string) (RepoTagsMap, bool) { @@ -199,7 +222,23 @@ func (c *RedisRepoTagsCache) SetInvalid(repo string) { c.redisClient.Set(ctx, key, "true", ttl) } +func (c *RedisRepoTagsCache) SetRateLimited(repo string) { + c.mu.Lock() + defer c.mu.Unlock() + if c.rateLimited == nil { + c.rateLimited = make(map[string]bool) + } + c.rateLimited[repo] = true +} + func (c *RedisRepoTagsCache) IsInvalid(repo string) bool { + c.mu.RLock() + if c.rateLimited != nil && c.rateLimited[repo] { + c.mu.RUnlock() + return true + } + c.mu.RUnlock() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -212,6 +251,12 @@ func (c *RedisRepoTagsCache) IsInvalid(repo string) bool { return val == "true" } +func (c *RedisRepoTagsCache) ClearRateLimited() { + c.mu.Lock() + defer c.mu.Unlock() + c.rateLimited = make(map[string]bool) +} + func (c *RedisRepoTagsCache) SetCanonicalLink(repo string, canonicalLink string) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -246,12 +291,16 @@ func NewRepoTagsCache() RepoTagsCache { Protocol: 2, }) - return &RedisRepoTagsCache{redisClient: client} + return &RedisRepoTagsCache{ + redisClient: client, + rateLimited: make(map[string]bool), + } } return &InMemoryRepoTagsCache{ m: make(map[string]RepoTagsMap), invalid: make(map[string]bool), + rateLimited: make(map[string]bool), canonicalLink: make(map[string]string), } } @@ -412,16 +461,14 @@ func RepoTags(repoURL string, repoTagsCache RepoTagsCache) (tags Tags, e error) } } // Cache miss. - var refs []*plumbing.Reference - var err error - if os.Getenv("GITTER_HOST") != "" { - refs, err = gitterRepoRefs(repoURL) - } else { - refs, err = RemoteRepoRefsWithRetry(repoURL, 3) - } + refs, err := getRepoRefs(repoURL) if err != nil { if repoTagsCache != nil { - repoTagsCache.SetInvalid(repoURL) + if errors.Is(err, ErrRateLimit) || strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests") { + repoTagsCache.SetRateLimited(repoURL) + } else { + repoTagsCache.SetInvalid(repoURL) + } } return tags, err @@ -547,34 +594,28 @@ func RefBranches(refs []*plumbing.Reference) (branches []*plumbing.Reference) { return branches } -// Validate the repo by attempting to query it's references. -// *** Does external calls to verify repos *** -func ValidRepo(repoURL string) bool { +func getRepoRefs(repoURL string) ([]*plumbing.Reference, error) { if os.Getenv("GITTER_HOST") != "" { - _, err := gitterRepoRefs(repoURL) - return err == nil + return gitterRepoRefs(repoURL) } - _, err := RemoteRepoRefsWithRetry(repoURL, 3) - return err == nil + return RemoteRepoRefsWithRetry(repoURL, 3) } -// Otherwise functional repos that don't have any tags are not valid. +// Validate the repo by attempting to query it's references. // *** Does external calls to verify repos *** -func ValidRepoAndHasUsableRefs(repoURL string) (valid bool) { - if os.Getenv("GITTER_HOST") != "" { - refs, err := gitterRepoRefs(repoURL) - if err != nil || len(refs) == 0 { - return false - } +func ValidRepo(repoURL string) (bool, error) { + _, err := getRepoRefs(repoURL) + return err == nil, err +} - return len(RefTags(refs)) > 0 - } - refs, err := RemoteRepoRefsWithRetry(repoURL, 3) - // Return false if there's an error, or if the repo has no refs (e.g. is empty) +// Otherwise functional repos that don't have any tags are not valid. +// *** Does external calls to verify repos *** +func ValidRepoAndHasUsableRefs(repoURL string) (bool, error) { + refs, err := getRepoRefs(repoURL) if err != nil || len(refs) == 0 { - return false + return false, err } - // Repos with no tags aren't useful. - return len(RefTags(refs)) > 0 + + return len(RefTags(refs)) > 0, nil } diff --git a/vulnfeeds/git/repository_test.go b/vulnfeeds/git/repository_test.go index 89dd8075b26..9578c7b71b8 100644 --- a/vulnfeeds/git/repository_test.go +++ b/vulnfeeds/git/repository_test.go @@ -383,7 +383,7 @@ func TestValidRepo(t *testing.T) { if time.Now().Before(tc.disableExpiryDate) { t.Skipf("test %q: TestValidRepo(%q) has been skipped due to known outage and will be reenabled on %s.", tc.description, tc.repoURL, tc.disableExpiryDate) } - got := ValidRepoAndHasUsableRefs(tc.repoURL) + got, _ := ValidRepoAndHasUsableRefs(tc.repoURL) if diff := cmp.Diff(got, tc.expectedResult); diff != "" { t.Errorf("test %q: ValidRepo(%q) was incorrect: %s", tc.description, tc.repoURL, diff) t.Logf("Confirm that %s is reachable with `git ls-remote %s`", tc.repoURL, tc.repoURL) @@ -396,7 +396,7 @@ func TestInvalidRepos(t *testing.T) { testutils.SetupGitVCR(t) redundantRepos := []string{} for _, repo := range models.InvalidRepos { - if !ValidRepoAndHasUsableRefs(repo) { + if valid, _ := ValidRepoAndHasUsableRefs(repo); !valid { redundantRepos = append(redundantRepos, repo) } } @@ -537,10 +537,10 @@ func TestValidRepoWithGitter(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { responseStatus = tc.status - if got := ValidRepo(tc.repo); got != tc.wantValidRepo { + if got, _ := ValidRepo(tc.repo); got != tc.wantValidRepo { t.Errorf("ValidRepo(%q) = %v, want %v", tc.repo, got, tc.wantValidRepo) } - if got := ValidRepoAndHasUsableRefs(tc.repo); got != tc.wantValidRepoAndHasRefs { + if got, _ := ValidRepoAndHasUsableRefs(tc.repo); got != tc.wantValidRepoAndHasRefs { t.Errorf("ValidRepoAndHasUsableRefs(%q) = %v, want %v", tc.repo, got, tc.wantValidRepoAndHasRefs) } })