Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions vulnfeeds/cmd/converters/cve/nvd-cve-osv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,15 @@ func main() {
logger.Info("GCS Upload Pool initialized", slog.String("bucket", *outputBucket))
}

var retryList []models.NVDCVE
var retryMu sync.Mutex

jobs := make(chan models.NVDCVE, *workers)
var wg sync.WaitGroup

for range *workers {
wg.Add(1)
go worker(&wg, jobs, gcsHelper, *outDir, vpRepoCache, repoTagsCache)
go worker(&wg, jobs, gcsHelper, *outDir, vpRepoCache, repoTagsCache, &retryList, &retryMu)
}

for _, file := range files {
Expand All @@ -170,6 +173,25 @@ func main() {

close(jobs)
wg.Wait()

if len(retryList) > 0 {
logger.Info("Retrying records that failed with rate limit errors (429)", slog.Int("count", len(retryList)))
if repoTagsCache != nil {
repoTagsCache.ClearRateLimited()
}
retryJobs := make(chan models.NVDCVE, len(retryList))
for _, cve := range retryList {
retryJobs <- cve
}
close(retryJobs)

var retryWg sync.WaitGroup
for range *workers {
retryWg.Add(1)
go worker(&retryWg, retryJobs, gcsHelper, *outDir, vpRepoCache, repoTagsCache, nil, nil)
}
retryWg.Wait()
}
timesBlocked := int64(0)
if gcsHelper != nil {
timesBlocked = gcsHelper.GetTimesBlocked()
Expand Down Expand Up @@ -215,18 +237,25 @@ func processCVE(cve models.NVDCVE, vpRepoCache *c.VPRepoCache, repoTagsCache git
return vuln, finalMetrics, outcome
}

func worker(wg *sync.WaitGroup, jobs <-chan models.NVDCVE, gcsHelper *gcs.Helper, outDir string, vpRepoCache *c.VPRepoCache, repoTagsCache git.RepoTagsCache) {
func worker(wg *sync.WaitGroup, jobs <-chan models.NVDCVE, gcsHelper *gcs.Helper, outDir string, vpRepoCache *c.VPRepoCache, repoTagsCache git.RepoTagsCache, retryList *[]models.NVDCVE, retryMu *sync.Mutex) {
defer wg.Done()
for cve := range jobs {
vuln, metrics, outcome := processCVE(cve, vpRepoCache, repoTagsCache)
totalConversionsCount.Add(1)
if retryList != nil {
totalConversionsCount.Add(1)
}
cveID := string(cve.ID)
if outcome == models.Error {
var notes []string
if metrics != nil {
notes = metrics.Notes
}
logger.Error("Error generating OSV record", slog.String("cve", cveID), slog.String("outcome", outcome.String()), slog.Any("notes", notes))
if retryList != nil && retryMu != nil {
retryMu.Lock()
*retryList = append(*retryList, cve)
retryMu.Unlock()
}

continue // Don't attempt to output files if there was an error
}
Expand Down
4 changes: 2 additions & 2 deletions vulnfeeds/cmd/mirrors/cpe-repo-gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func analyzeCPEDictionary(cpes []CPE) (productToRepo c.VendorProductToRepoMap, d
logger.Info("Disregarding derived repo", slog.String("repo", repo), slog.String("vendor", vp.Vendor), slog.String("product", vp.Product), slog.Any("err", err))
continue
}
if !git.ValidRepoAndHasUsableRefs(repo) {
if valid, _ := git.ValidRepoAndHasUsableRefs(repo); !valid {
logger.Info("Disregarding derived repo as unusable", slog.String("repo", repo), slog.String("vendor", vp.Vendor), slog.String("product", vp.Product))
continue
}
Expand All @@ -407,7 +407,7 @@ func validateRepos(prm c.VendorProductToRepoMap) (validated c.VendorProductToRep
entryCount++
// As a side-effect, this also omits any with no repos.
for _, r := range prm[vp] {
if !git.ValidRepoAndHasUsableRefs(r) {
if valid, _ := git.ValidRepoAndHasUsableRefs(r); !valid {
logger.Info("Invalid repo", slog.Int("count", entryCount), slog.Int("total", len(prm)), slog.String("repo", r), slog.String("vendor", vp.Vendor), slog.String("product", vp.Product))
continue
}
Expand Down
27 changes: 21 additions & 6 deletions vulnfeeds/conversion/versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ func (c *VPRepoCache) MaybeUpdate(vp *VendorProduct, repo string) {
return
}
// Avoid polluting the cache with existent-but-useless repos.
if git.ValidRepoAndHasUsableRefs(repo) {
if valid, _ := git.ValidRepoAndHasUsableRefs(repo); valid {
c.m[*vp] = append(c.m[*vp], repo)
}
}
Expand Down Expand Up @@ -1232,11 +1232,7 @@ func ReposFromReferences(cache *VPRepoCache, vp *VendorProduct, refs []models.Re
continue
}
// If it's any other repo-shaped URL, it's only useful if it has tags.
if (err == nil && !git.ValidRepo(repo)) || (err != nil && !git.ValidRepoAndHasUsableRefs(repo)) {
if repoTagsCache != nil {
repoTagsCache.SetInvalid(repo)
}

if isValid, _ := validateRepo(repo, err == nil, repoTagsCache); !isValid {
continue
}
repos = append(repos, repo)
Expand Down Expand Up @@ -1281,3 +1277,22 @@ func ReposFromReferencesCVEList(refs []models.Reference, tagDenyList []string, m

return repos
}

func validateRepo(repo string, isCommit bool, cache git.RepoTagsCache) (bool, error) {
var valid bool
var err error
if isCommit {
valid, err = git.ValidRepo(repo)
} else {
valid, err = git.ValidRepoAndHasUsableRefs(repo)
}
if !valid && cache != nil {
if err != nil && (errors.Is(err, git.ErrRateLimit) || strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests")) {
cache.SetRateLimited(repo)
} else {
cache.SetInvalid(repo)
}
}

return valid, err
}
101 changes: 71 additions & 30 deletions vulnfeeds/git/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ type RepoTagsCache interface {
Get(repo string) (RepoTagsMap, bool)
Set(repo string, tags RepoTagsMap)
SetInvalid(repo string)
SetRateLimited(repo string)
IsInvalid(repo string) bool
ClearRateLimited()
SetCanonicalLink(repo string, canonicalLink string)
GetCanonicalLink(repo string) (string, bool)
}
Expand All @@ -92,6 +94,7 @@ type InMemoryRepoTagsCache struct {

m map[string]RepoTagsMap
invalid map[string]bool
rateLimited map[string]bool
canonicalLink map[string]string
}

Expand Down Expand Up @@ -124,16 +127,34 @@ func (c *InMemoryRepoTagsCache) SetInvalid(repo string) {
c.invalid[repo] = true
}

func (c *InMemoryRepoTagsCache) SetRateLimited(repo string) {
c.Lock()
defer c.Unlock()
if c.rateLimited == nil {
c.rateLimited = make(map[string]bool)
}
c.rateLimited[repo] = true
}

func (c *InMemoryRepoTagsCache) IsInvalid(repo string) bool {
c.RLock()
defer c.RUnlock()
if c.rateLimited != nil && c.rateLimited[repo] {
return true
}
if c.invalid == nil {
return false
}

return c.invalid[repo]
}

func (c *InMemoryRepoTagsCache) ClearRateLimited() {
c.Lock()
defer c.Unlock()
c.rateLimited = make(map[string]bool)
}

func (c *InMemoryRepoTagsCache) SetCanonicalLink(repo string, canonicalLink string) {
c.Lock()
defer c.Unlock()
Expand All @@ -156,6 +177,8 @@ func (c *InMemoryRepoTagsCache) GetCanonicalLink(repo string) (string, bool) {

type RedisRepoTagsCache struct {
redisClient *redis.Client
mu sync.RWMutex
rateLimited map[string]bool
}

func (c *RedisRepoTagsCache) Get(repo string) (RepoTagsMap, bool) {
Expand Down Expand Up @@ -199,7 +222,23 @@ func (c *RedisRepoTagsCache) SetInvalid(repo string) {
c.redisClient.Set(ctx, key, "true", ttl)
}

func (c *RedisRepoTagsCache) SetRateLimited(repo string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.rateLimited == nil {
c.rateLimited = make(map[string]bool)
}
c.rateLimited[repo] = true
}

func (c *RedisRepoTagsCache) IsInvalid(repo string) bool {
c.mu.RLock()
if c.rateLimited != nil && c.rateLimited[repo] {
c.mu.RUnlock()
return true
}
c.mu.RUnlock()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

Expand All @@ -212,6 +251,12 @@ func (c *RedisRepoTagsCache) IsInvalid(repo string) bool {
return val == "true"
}

func (c *RedisRepoTagsCache) ClearRateLimited() {
c.mu.Lock()
defer c.mu.Unlock()
c.rateLimited = make(map[string]bool)
}

func (c *RedisRepoTagsCache) SetCanonicalLink(repo string, canonicalLink string) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expand Down Expand Up @@ -246,12 +291,16 @@ func NewRepoTagsCache() RepoTagsCache {
Protocol: 2,
})

return &RedisRepoTagsCache{redisClient: client}
return &RedisRepoTagsCache{
redisClient: client,
rateLimited: make(map[string]bool),
}
}

return &InMemoryRepoTagsCache{
m: make(map[string]RepoTagsMap),
invalid: make(map[string]bool),
rateLimited: make(map[string]bool),
canonicalLink: make(map[string]string),
}
}
Expand Down Expand Up @@ -412,16 +461,14 @@ func RepoTags(repoURL string, repoTagsCache RepoTagsCache) (tags Tags, e error)
}
}
// Cache miss.
var refs []*plumbing.Reference
var err error
if os.Getenv("GITTER_HOST") != "" {
refs, err = gitterRepoRefs(repoURL)
} else {
refs, err = RemoteRepoRefsWithRetry(repoURL, 3)
}
refs, err := getRepoRefs(repoURL)
if err != nil {
if repoTagsCache != nil {
repoTagsCache.SetInvalid(repoURL)
if errors.Is(err, ErrRateLimit) || strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests") {
repoTagsCache.SetRateLimited(repoURL)
} else {
repoTagsCache.SetInvalid(repoURL)
}
}

return tags, err
Expand Down Expand Up @@ -547,34 +594,28 @@ func RefBranches(refs []*plumbing.Reference) (branches []*plumbing.Reference) {
return branches
}

// Validate the repo by attempting to query it's references.
// *** Does external calls to verify repos ***
func ValidRepo(repoURL string) bool {
func getRepoRefs(repoURL string) ([]*plumbing.Reference, error) {
if os.Getenv("GITTER_HOST") != "" {
_, err := gitterRepoRefs(repoURL)
return err == nil
return gitterRepoRefs(repoURL)
}
_, err := RemoteRepoRefsWithRetry(repoURL, 3)

return err == nil
return RemoteRepoRefsWithRetry(repoURL, 3)
}

// Otherwise functional repos that don't have any tags are not valid.
// Validate the repo by attempting to query it's references.
// *** Does external calls to verify repos ***
func ValidRepoAndHasUsableRefs(repoURL string) (valid bool) {
if os.Getenv("GITTER_HOST") != "" {
refs, err := gitterRepoRefs(repoURL)
if err != nil || len(refs) == 0 {
return false
}
func ValidRepo(repoURL string) (bool, error) {
_, err := getRepoRefs(repoURL)
return err == nil, err
}

return len(RefTags(refs)) > 0
}
refs, err := RemoteRepoRefsWithRetry(repoURL, 3)
// Return false if there's an error, or if the repo has no refs (e.g. is empty)
// Otherwise functional repos that don't have any tags are not valid.
// *** Does external calls to verify repos ***
func ValidRepoAndHasUsableRefs(repoURL string) (bool, error) {
refs, err := getRepoRefs(repoURL)
if err != nil || len(refs) == 0 {
return false
return false, err
}
// Repos with no tags aren't useful.
return len(RefTags(refs)) > 0

return len(RefTags(refs)) > 0, nil
}
8 changes: 4 additions & 4 deletions vulnfeeds/git/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func TestValidRepo(t *testing.T) {
if time.Now().Before(tc.disableExpiryDate) {
t.Skipf("test %q: TestValidRepo(%q) has been skipped due to known outage and will be reenabled on %s.", tc.description, tc.repoURL, tc.disableExpiryDate)
}
got := ValidRepoAndHasUsableRefs(tc.repoURL)
got, _ := ValidRepoAndHasUsableRefs(tc.repoURL)
if diff := cmp.Diff(got, tc.expectedResult); diff != "" {
t.Errorf("test %q: ValidRepo(%q) was incorrect: %s", tc.description, tc.repoURL, diff)
t.Logf("Confirm that %s is reachable with `git ls-remote %s`", tc.repoURL, tc.repoURL)
Expand All @@ -396,7 +396,7 @@ func TestInvalidRepos(t *testing.T) {
testutils.SetupGitVCR(t)
redundantRepos := []string{}
for _, repo := range models.InvalidRepos {
if !ValidRepoAndHasUsableRefs(repo) {
if valid, _ := ValidRepoAndHasUsableRefs(repo); !valid {
redundantRepos = append(redundantRepos, repo)
}
}
Expand Down Expand Up @@ -537,10 +537,10 @@ func TestValidRepoWithGitter(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
responseStatus = tc.status
if got := ValidRepo(tc.repo); got != tc.wantValidRepo {
if got, _ := ValidRepo(tc.repo); got != tc.wantValidRepo {
t.Errorf("ValidRepo(%q) = %v, want %v", tc.repo, got, tc.wantValidRepo)
}
if got := ValidRepoAndHasUsableRefs(tc.repo); got != tc.wantValidRepoAndHasRefs {
if got, _ := ValidRepoAndHasUsableRefs(tc.repo); got != tc.wantValidRepoAndHasRefs {
t.Errorf("ValidRepoAndHasUsableRefs(%q) = %v, want %v", tc.repo, got, tc.wantValidRepoAndHasRefs)
}
})
Expand Down
Loading