From 2827620cfe25a8278e4f6e0d269bbfc80cb1e93a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Wed, 20 May 2026 13:40:37 +0000 Subject: [PATCH 1/3] fix(publish): pre-register published repo key before task submission MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit apiPublishRepoOrSnapshot appended published.Key() to resources inside the task closure, after maybeRunTaskInBackground had already been called. The task's locked-resource set is fixed at submission time, so that append had no effect — the published repo key was never registered as a resource. Two concurrent POST /api/publish/{prefix} requests for the same prefix/distribution therefore did not conflict in the task queue: both ran in parallel, each loaded an empty PublishedRepoCollection from the DB, both passed CheckDuplicate, and the second Add silently overwrote the first. Fix: compute the published repo key ("U{storagePrefix}>>{distribution}") from the already-known storage/prefix/distribution values and append it to resources before calling maybeRunTaskInBackground, so concurrent creates for the same destination are serialised by the task queue. The now-dead append inside the closure is removed. --- api/publish.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/api/publish.go b/api/publish.go index 67b260d47..b0cedfc08 100644 --- a/api/publish.go +++ b/api/publish.go @@ -300,6 +300,17 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { collection := collectionFactory.PublishedRepoCollection() + // Pre-register the published repo key in resources so that concurrent + // POST requests for the same prefix/distribution are serialized by the + // task queue rather than racing on CheckDuplicate + Add. + if b.Distribution != "" { + storagePrefix := prefix + if storage != "" { + storagePrefix = storage + ":" + prefix + } + resources = append(resources, "U"+storagePrefix+">>"+b.Distribution) + } + taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { @@ -332,8 +343,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - resources = append(resources, string(published.Key())) - if b.Origin != "" { published.Origin = b.Origin } From 2a5992c74eb6f396335fb62f57dde9376825f364 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Wed, 20 May 2026 13:41:58 +0000 Subject: [PATCH 2/3] fix(publish): reload published inside task for source-management endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Affected endpoints: apiPublishAddSource, apiPublishSetSources, apiPublishUpdateSource, apiPublishRemoveSource, apiPublishDropChanges. All five handlers shared the same flawed pattern: they loaded the published repo from the DB and mutated it (ObtainRevision / DropRevision) outside the task closure, before the task lock was acquired. Each task closure then just wrote back the already-mutated, pre-lock object. Because the task queue serialises tasks that share a resource key, two concurrent requests appear safe — but each task closure holds a stale copy of the object captured before the lock was taken: Request A loads published: revision = {} Request B loads published: revision = {} <- same DB state A mutates: revision = {main: snap1} B mutates: revision = {contrib: snap2} Task A runs: saves {main: snap1} OK Task B runs: saves {contrib: snap2} <- clobbers A's change Fix: perform only a shallow ByStoragePrefixDistribution outside the task (for the early 404 response, resource key, and task name). Inside the task closure a dedicated taskCollectionFactory is created, the published repo is re-read fresh from the DB (after the lock is acquired), and LoadComplete + all mutations + Update are executed against that authoritative copy. --- api/publish.go | 234 +++++++++++++++++++++++++++++-------------------- 1 file changed, 138 insertions(+), 96 deletions(-) diff --git a/api/publish.go b/api/publish.go index b0cedfc08..73e049fc9 100644 --- a/api/publish.go +++ b/api/publish.go @@ -648,43 +648,52 @@ func apiPublishAddSource(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly (no LoadComplete) to verify existence and obtain the + // resource key and task name. The actual mutation is performed inside + // the task on a freshly loaded copy to prevent lost-update races. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - if c.Bind(&b) != nil { - return - } + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err) + } - revision := published.ObtainRevision() - sources := revision.Sources + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err) + } - component := b.Component - name := b.Name + revision := published.ObtainRevision() + sources := revision.Sources - _, exists := sources[component] - if exists { - AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component)) - return - } + component := b.Component + name := b.Name - sources[component] = name + _, exists := sources[component] + if exists { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component) + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + sources[component] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -766,39 +775,48 @@ func apiPublishSetSources(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - if c.Bind(&b) != nil { - return - } + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - revision := published.ObtainRevision() - sources := make(map[string]string, len(b)) - revision.Sources = sources + revision := published.ObtainRevision() + sources := make(map[string]string, len(b)) + revision.Sources = sources - for _, source := range b { - component := source.Component - name := source.Name - sources[component] = name - } + for _, source := range b { + component := source.Component + name := source.Name + sources[component] = name + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -831,24 +849,33 @@ func apiPublishDropChanges(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and DropRevision happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - published.DropRevision() - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + published.DropRevision() + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -884,51 +911,58 @@ func apiPublishUpdateSource(c *gin.Context) { param := slashEscape(c.Params.ByName("prefix")) storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) - component := slashEscape(c.Params.ByName("component")) + urlComponent := slashEscape(c.Params.ByName("component")) + + // Default component to the URL path segment; the body may rename it. + b.Component = urlComponent + if c.Bind(&b) != nil { + return + } collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - revision := published.ObtainRevision() - sources := revision.Sources + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component)) - return - } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - b.Component = component - b.Name = revision.Sources[component] + revision := published.ObtainRevision() + sources := revision.Sources - if c.Bind(&b) != nil { - return - } + _, exists := sources[urlComponent] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent) + } - if b.Component != component { - delete(sources, component) - } + if b.Component != urlComponent { + delete(sources, urlComponent) + } - component = b.Component - name := b.Name - sources[component] = name + newComponent := b.Component + name := b.Name + sources[newComponent] = name - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -965,33 +999,41 @@ func apiPublishRemoveSource(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - revision := published.ObtainRevision() - sources := revision.Sources + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component)) - return - } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources - delete(sources, component) + _, exists := sources[component] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component) + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + delete(sources, component) + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } From b7969c7a2d18dc8e41fb9945460440f1e32bda27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Wed, 20 May 2026 13:42:10 +0000 Subject: [PATCH 3/3] fix(publish): reload published inside task for update/switch endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Affected endpoints: apiPublishUpdateSwitch (PUT), apiPublishUpdate (POST). Both handlers loaded the published repo and mutated scalar fields (Label, Origin, SkipContents, SkipBz2, AcquireByHash, SignedBy, MultiDist, Version) outside the task closure, before the lock was acquired. Inside the task, LoadComplete only refreshed sourceItems — it did not reload scalar fields or the Revision. Two concurrent requests therefore each operated on a stale base: Request A loads published (Label="old"), sets Label="A" Request B loads published (Label="old"), sets Label="B" Task A runs: Update() + Publish() + collection.Update() -> saves Label="A" Task B runs: Update() on B's stale copy -> saves Label="B", silently discarding A's Label change and potentially reconciling a Revision built against the pre-A state. Fix: remove all field mutations and the LoadComplete call from the HTTP handler. Inside the task, a fresh taskCollectionFactory is created, the published repo is re-read via ByStoragePrefixDistribution + LoadComplete (obtaining the current DB state after the lock is held), and then all field mutations are applied before Update / Publish / collection.Update. --- api/publish.go | 163 ++++++++++++++++++++++++++----------------------- 1 file changed, 85 insertions(+), 78 deletions(-) diff --git a/api/publish.go b/api/publish.go index 73e049fc9..4e4b75f84 100644 --- a/api/publish.go +++ b/api/publish.go @@ -492,46 +492,50 @@ func apiPublishUpdateSwitch(c *gin.Context) { return } - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } - - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } - - if b.Label != nil { - published.Label = *b.Label - } - - if b.Origin != nil { - published.Origin = *b.Origin - } - - if b.Version != nil { - published.Version = *b.Version - } - + // Field mutations and fresh DB load are deferred to inside the task so + // they always operate on a consistent state after the lock is held. resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(published, collectionFactory) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } + revision := published.ObtainRevision() sources := revision.Sources @@ -543,17 +547,17 @@ func apiPublishUpdateSwitch(c *gin.Context) { } } - result, err := published.Update(collectionFactory, out) + result, err := published.Update(taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -561,7 +565,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -1105,64 +1109,67 @@ func apiPublishUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and field mutations happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } - - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - if b.Label != nil { - published.Label = *b.Label - } + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - if b.Origin != nil { - published.Origin = *b.Origin - } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - if b.Version != nil { - published.Version = *b.Version - } + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - result, err := published.Update(collectionFactory, out) + result, err := published.Update(taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1170,7 +1177,7 @@ func apiPublishUpdate(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) }