From 65d93d819b7eacea4ddd1babfdc58ad7004e405a Mon Sep 17 00:00:00 2001 From: Noel Jackson Date: Tue, 3 Feb 2026 17:08:20 +0100 Subject: [PATCH] fix(packages/container): data race when uploading container blobs concurrently (#36524) Co-authored-by: wxiaoguang --- models/packages/package_blob.go | 11 ++++- models/packages/package_blob_test.go | 51 ++++++++++++++++++++ routers/api/packages/container/blob.go | 13 ++++- services/packages/container/blob_uploader.go | 8 +-- 4 files changed, 75 insertions(+), 8 deletions(-) create mode 100644 models/packages/package_blob_test.go diff --git a/models/packages/package_blob.go b/models/packages/package_blob.go index d9c30b6533..e765bbf0c2 100644 --- a/models/packages/package_blob.go +++ b/models/packages/package_blob.go @@ -43,13 +43,15 @@ func GetOrInsertBlob(ctx context.Context, pb *PackageBlob) (*PackageBlob, bool, existing := &PackageBlob{} - has, err := e.Where(builder.Eq{ + hashCond := builder.Eq{ "size": pb.Size, "hash_md5": pb.HashMD5, "hash_sha1": pb.HashSHA1, "hash_sha256": pb.HashSHA256, "hash_sha512": pb.HashSHA512, - }).Get(existing) + } + + has, err := e.Where(hashCond).Get(existing) if err != nil { return nil, false, err } @@ -57,6 +59,11 @@ func GetOrInsertBlob(ctx context.Context, pb *PackageBlob) (*PackageBlob, bool, return existing, true, nil } if _, err = e.Insert(pb); err != nil { + // Handle race condition: another request may have inserted the same blob + // between our SELECT and INSERT. Retry the SELECT to get the existing blob. + if has, _ = e.Where(hashCond).Get(existing); has { + return existing, true, nil + } return nil, false, err } return pb, false, nil diff --git a/models/packages/package_blob_test.go b/models/packages/package_blob_test.go new file mode 100644 index 0000000000..8b636b4ee0 --- /dev/null +++ b/models/packages/package_blob_test.go @@ -0,0 +1,51 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package packages + +import ( + "testing" + + "code.gitea.io/gitea/models/unittest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestGetOrInsertBlobConcurrent(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + + testBlob := PackageBlob{ + Size: 123, + HashMD5: "md5", + HashSHA1: "sha1", + HashSHA256: "sha256", + HashSHA512: "sha512", + } + + const numGoroutines = 3 + var wg errgroup.Group + results := make([]*PackageBlob, numGoroutines) + existed := make([]bool, numGoroutines) + for idx := range numGoroutines { + wg.Go(func() error { + blob := testBlob // Create a copy of the test blob for each goroutine + var err error + results[idx], existed[idx], err = GetOrInsertBlob(t.Context(), &blob) + return err + }) + } + require.NoError(t, wg.Wait()) + + // then: all GetOrInsertBlob succeeds with the same blob ID, and only one indicates it did not exist before + existedCount := 0 + assert.NotNil(t, results[0]) + for i := range numGoroutines { + assert.Equal(t, results[0].ID, results[i].ID) + if existed[i] { + existedCount++ + } + } + assert.Equal(t, numGoroutines-1, existedCount) +} diff --git a/routers/api/packages/container/blob.go b/routers/api/packages/container/blob.go index 4b7bcee9d0..da509bc7a7 100644 --- a/routers/api/packages/container/blob.go +++ b/routers/api/packages/container/blob.go @@ -26,9 +26,18 @@ import ( // saveAsPackageBlob creates a package blob from an upload // The uploaded blob gets stored in a special upload version to link them to the package/image -func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo) (*packages_model.PackageBlob, error) { //nolint:unparam // PackageBlob is never used +// There will be concurrent uploading for the same blob, so it needs a global lock per blob hash +func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo) (*packages_model.PackageBlob, error) { //nolint:unparam //returned PackageBlob is never used pb := packages_service.NewPackageBlob(hsr) + err := globallock.LockAndDo(ctx, "container-blob:"+pb.HashSHA256, func(ctx context.Context) error { + var err error + pb, err = saveAsPackageBlobInternal(ctx, hsr, pci, pb) + return err + }) + return pb, err +} +func saveAsPackageBlobInternal(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo, pb *packages_model.PackageBlob) (*packages_model.PackageBlob, error) { exists := false contentStore := packages_module.NewContentStore() @@ -67,7 +76,7 @@ func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader return createFileForBlob(ctx, uploadVersion, pb) }) if err != nil { - if !exists { + if !exists && pb != nil { // pb can be nil if GetOrInsertBlob failed if err := contentStore.Delete(packages_module.BlobHash256Key(pb.HashSHA256)); err != nil { log.Error("Error deleting package blob from content store: %v", err) } diff --git a/services/packages/container/blob_uploader.go b/services/packages/container/blob_uploader.go index 27bc4a5421..17139c3706 100644 --- a/services/packages/container/blob_uploader.go +++ b/services/packages/container/blob_uploader.go @@ -63,10 +63,10 @@ func NewBlobUploader(ctx context.Context, id string) (*BlobUploader, error) { } return &BlobUploader{ - model, - hash, - f, - false, + PackageBlobUpload: model, + MultiHasher: hash, + file: f, + reading: false, }, nil }