Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ingestors/cocoapods.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ func (ingestor *cocoapods) ingestURL(feedUrl string) []data.PackageVersion {
}

for _, item := range feed.Items {
if item.UpdatedParsed == nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("feed item missing updated date, skipping")
continue
}
nameAndVersion := strings.SplitN(item.Title, " ", 3)
if len(nameAndVersion) < 3 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("unexpected feed item title format, skipping")
continue
}
results = append(results,
data.PackageVersion{
Platform: ingestor.Name(),
Expand Down
8 changes: 8 additions & 0 deletions ingestors/cpan.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ func (ingestor *CPAN) ingestURL(feedUrl string) []data.PackageVersion {
}

for _, item := range feed.Items {
if item.PublishedParsed == nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("feed item missing published date, skipping")
continue
}
pieces := strings.Split(item.Title, "-")
if len(pieces) < 2 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("unexpected feed item title format, skipping")
continue
}
results = append(results,
data.PackageVersion{
Platform: ingestor.Name(),
Expand Down
8 changes: 8 additions & 0 deletions ingestors/drupal.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (ingestor *Drupal) Ingest() []data.PackageVersion {
var id string
if idAttr, exists := s.Attr("id"); exists {
parts := strings.SplitN(idAttr, "-", 2) // e.g. "node-1234"
if len(parts) < 2 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "id": idAttr}).Warn("unexpected node id format, skipping")
return
}
id = parts[1]
}
packageResults := ingestor.getVersions(id, bookmark)
Expand Down Expand Up @@ -88,6 +92,10 @@ func (ingestor *Drupal) getVersions(id string, bookmark time.Time) []data.Packag
for _, item := range feed.Items {
createdAtTime, _ := time.Parse(time.RFC1123, item.Published)
nameAndVersion := strings.SplitN(item.Title, " ", 2) // e.g. ctools 7.x-1.19
if len(nameAndVersion) < 2 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("unexpected release title format, skipping")
continue
}
if createdAtTime.After(bookmark) {
discoveryLag := time.Since(createdAtTime)
results = append(results,
Expand Down
8 changes: 8 additions & 0 deletions ingestors/elm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,16 @@ func (ingestor *Elm) ingestURL(feedUrl string) []data.PackageVersion {
return results
}
for _, item := range feed.Items {
if item.PublishedParsed == nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("feed item missing published date, skipping")
continue
}
parsed, _ := url.Parse(item.Link)
parts := strings.Split(parsed.Path, "/")
if len(parts) < 5 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "link": item.Link}).Warn("unexpected feed item path format, skipping")
continue
}
discoveryLag := time.Since(*item.PublishedParsed)
results = append(results,
data.PackageVersion{
Expand Down
8 changes: 8 additions & 0 deletions ingestors/hackage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ func (ingestor *Hackage) ingestURL(feedUrl string) []data.PackageVersion {
}

for _, item := range feed.Items {
if item.PublishedParsed == nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("feed item missing published date, skipping")
continue
}
nameAndVersion := strings.SplitN(item.Title, " ", 2)
if len(nameAndVersion) < 2 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("unexpected feed item title format, skipping")
continue
}
results = append(results,
data.PackageVersion{
Platform: ingestor.Name(),
Expand Down
7 changes: 5 additions & 2 deletions ingestors/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"github.com/mmcdole/gofeed"
)

var httpClient = &http.Client{
Timeout: 30 * time.Second,
}

func depperGetUrl(url string) (*http.Response, error) {
return depperGetUrlWithHeaders(url, map[string]string{})
}

func depperGetUrlWithHeaders(url string, headers map[string]string) (*http.Response, error) {
client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
Expand All @@ -24,7 +27,7 @@ func depperGetUrlWithHeaders(url string, headers map[string]string) (*http.Respo
req.Header.Set(key, value)
}

return client.Do(req)
return httpClient.Do(req)
}

func depperGetFeed(url string) (feed *gofeed.Feed, err error) {
Expand Down
8 changes: 8 additions & 0 deletions ingestors/packagist.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ func (ingestor *Packagist) ingestURL(feedUrl string) []data.PackageVersion {
}

for _, item := range feed.Items {
if item.PublishedParsed == nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("feed item missing published date, skipping")
continue
}
nameAndVersion := strings.SplitN(item.GUID, " ", 2)
if len(nameAndVersion) < 2 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("unexpected feed item GUID format, skipping")
continue
}
results = append(results,
data.PackageVersion{
Platform: ingestor.Name(),
Expand Down
8 changes: 8 additions & 0 deletions ingestors/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,16 @@ func (ingestor *Pub) ingestURL(feedUrl string) []data.PackageVersion {
}

for _, item := range feed.Items {
if item.UpdatedParsed == nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("feed item missing updated date, skipping")
continue
}
// version of name is the title, for example v0.0.2 of foobar_flutter
nameAndVersion := strings.SplitN(item.Title, " ", 3)
if len(nameAndVersion) < 3 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("unexpected feed item title format, skipping")
continue
}
results = append(results,
data.PackageVersion{
Platform: ingestor.Name(),
Expand Down
20 changes: 20 additions & 0 deletions ingestors/pypi_rss.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ func (ingestor *PyPiRss) getUpdates() []data.PackageVersion {
}

for _, item := range feed.Items {
if item.PublishedParsed == nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("feed item missing published date, skipping")
continue
}
if len(strings.SplitN(item.Title, " ", 2)) < 2 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("unexpected feed item title format, skipping")
continue
}
results = append(results, createUpdateItemPackageVersion(item))
}

Expand All @@ -100,11 +108,19 @@ func (ingestor *PyPiRss) getNewPackages() []data.PackageVersion {

// Get releases for items not yet seen
for _, item := range feed.Items {
if item.PublishedParsed == nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("feed item missing published date, skipping")
continue
}
if !item.PublishedParsed.After(bookmark) {
continue
}

linkBits := strings.Split(item.Link, "/")
if len(linkBits) < 2 {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "link": item.Link}).Warn("unexpected feed item link format, skipping")
continue
}
packageName := linkBits[len(linkBits)-2]

results = append(results, ingestor.getReleases(packageName)...)
Expand All @@ -129,6 +145,10 @@ func (ingestor *PyPiRss) getReleases(packageName string) []data.PackageVersion {
}

for _, item := range feed.Items {
if item.PublishedParsed == nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "title": item.Title}).Warn("feed item missing published date, skipping")
continue
}
results = append(results,
data.PackageVersion{
Platform: "pypi",
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (depper *Depper) registerIngestors() {
func (depper *Depper) registerIngestor(ingestor ingestors.PollingIngestor) {
c := cron.New()
ingestAndPublish := func() {
defer func() {
if r := recover(); r != nil {
log.WithFields(log.Fields{"ingestor": ingestor.Name(), "panic": r}).Error("ingestor panicked")
}
}()
span := tracer.StartSpan("ingest_and_publish")
span.SetTag("ingestor", ingestor.Name())
defer span.Finish()
Expand Down
6 changes: 3 additions & 3 deletions publishers/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ type Pipeline struct {
}

func NewPipeline() *Pipeline {
pipeline := &Pipeline{}
pipeline := &Pipeline{
queue: make(chan publishing, maxQueueSize),
}
go pipeline.run()

return pipeline
Expand All @@ -35,8 +37,6 @@ func (pipeline *Pipeline) Publish(ttl time.Duration, packageVersion data.Package
}

func (pipeline *Pipeline) run() {
pipeline.queue = make(chan publishing, maxQueueSize)

for publishing := range pipeline.queue {
pipeline.process(publishing)
}
Expand Down
6 changes: 4 additions & 2 deletions publishers/sidekiq.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func randomHex(n int) string {
id := make([]byte, n)
_, err := io.ReadFull(rand.Reader, id)
if err != nil {
log.WithFields(log.Fields{"error": err})
log.WithFields(log.Fields{"error": err}).Error("failed to generate random job ID")
}
return hex.EncodeToString(id)
}
Expand All @@ -59,5 +59,7 @@ func (lib *Sidekiq) Publish(packageVersion data.PackageVersion) {
log.WithFields(log.Fields{"publisher": "sidekiq"}).Error(err)
return
}
redis.Client.LPush(context.Background(), fmt.Sprintf("queue:%s", job.Queue), string(encoded))
if err := redis.Client.LPush(context.Background(), fmt.Sprintf("queue:%s", job.Queue), string(encoded)).Err(); err != nil {
log.WithFields(log.Fields{"publisher": "sidekiq"}).Error(err)
}
}