Fix container parallel upload bugs ()

This PR should be replaced by  in v1.23. The aim of creating this
PR is to fix it in 1.22 because globallock hasn't been introduced.

Fix 
Fix 
Fix 
This commit is contained in:
Lunny Xiao 2024-09-12 11:11:03 +08:00 committed by GitHub
parent b3af359cc6
commit 30d989d411
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 31 additions and 9 deletions
routers/api/packages/container

@ -10,23 +10,29 @@ import (
"fmt"
"os"
"strings"
"sync"
"code.gitea.io/gitea/models/db"
packages_model "code.gitea.io/gitea/models/packages"
container_model "code.gitea.io/gitea/models/packages/container"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/log"
packages_module "code.gitea.io/gitea/modules/packages"
container_module "code.gitea.io/gitea/modules/packages/container"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
packages_service "code.gitea.io/gitea/services/packages"
)
var uploadVersionMutex sync.Mutex
// TODO: use clustered lock
var uploadVersionMutex = sync.NewExclusivePool()
// saveAsPackageBlob creates a package blob from an upload
// The uploaded blob gets stored in a special upload version to link them to the package/image
func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo) (*packages_model.PackageBlob, error) {
pkgPath := pci.PackageInfo.Owner.LowerName + "/" + pci.PackageInfo.Name
uploadVersionMutex.CheckIn(pkgPath)
defer uploadVersionMutex.CheckOut(pkgPath)
pb := packages_service.NewPackageBlob(hsr)
exists := false
@ -80,6 +86,10 @@ func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader
// mountBlob mounts the specific blob to a different package
func mountBlob(ctx context.Context, pi *packages_service.PackageInfo, pb *packages_model.PackageBlob) error {
pkgPath := pi.Owner.LowerName + "/" + pi.Name
uploadVersionMutex.CheckIn(pkgPath)
defer uploadVersionMutex.CheckOut(pkgPath)
uploadVersion, err := getOrCreateUploadVersion(ctx, pi)
if err != nil {
return err
@ -93,9 +103,6 @@ func mountBlob(ctx context.Context, pi *packages_service.PackageInfo, pb *packag
func getOrCreateUploadVersion(ctx context.Context, pi *packages_service.PackageInfo) (*packages_model.PackageVersion, error) {
var uploadVersion *packages_model.PackageVersion
// FIXME: Replace usage of mutex with database transaction
// https://github.com/go-gitea/gitea/pull/21862
uploadVersionMutex.Lock()
err := db.WithTx(ctx, func(ctx context.Context) error {
created := true
p := &packages_model.Package{
@ -140,7 +147,6 @@ func getOrCreateUploadVersion(ctx context.Context, pi *packages_service.PackageI
return nil
})
uploadVersionMutex.Unlock()
return uploadVersion, err
}
@ -172,10 +178,14 @@ func createFileForBlob(ctx context.Context, pv *packages_model.PackageVersion, p
return nil
}
func deleteBlob(ctx context.Context, ownerID int64, image, digest string) error {
func deleteBlob(ctx context.Context, owner *user_model.User, image, digest string) error {
pkgPath := owner.LowerName + "/" + image
uploadVersionMutex.CheckIn(pkgPath)
defer uploadVersionMutex.CheckOut(pkgPath)
return db.WithTx(ctx, func(ctx context.Context) error {
pfds, err := container_model.GetContainerBlobs(ctx, &container_model.BlobSearchOptions{
OwnerID: ownerID,
OwnerID: owner.ID,
Image: image,
Digest: digest,
})

@ -24,6 +24,7 @@ import (
packages_module "code.gitea.io/gitea/modules/packages"
container_module "code.gitea.io/gitea/modules/packages/container"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/routers/api/packages/helper"
auth_service "code.gitea.io/gitea/services/auth"
@ -540,7 +541,7 @@ func DeleteBlob(ctx *context.Context) {
return
}
if err := deleteBlob(ctx, ctx.Package.Owner.ID, ctx.Params("image"), d); err != nil {
if err := deleteBlob(ctx, ctx.Package.Owner, ctx.Params("image"), d); err != nil {
apiError(ctx, http.StatusInternalServerError, err)
return
}
@ -550,6 +551,9 @@ func DeleteBlob(ctx *context.Context) {
})
}
// TODO: use clustered lock
var lockManifest = sync.NewExclusivePool()
// https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pushing-manifests
func UploadManifest(ctx *context.Context) {
reference := ctx.Params("reference")
@ -581,6 +585,10 @@ func UploadManifest(ctx *context.Context) {
return
}
imagePath := ctx.Package.Owner.Name + "/" + ctx.Params("image")
lockManifest.CheckIn(imagePath)
defer lockManifest.CheckOut(imagePath)
digest, err := processManifest(ctx, mci, buf)
if err != nil {
var namedError *namedError
@ -679,6 +687,10 @@ func DeleteManifest(ctx *context.Context) {
return
}
imagePath := ctx.Package.Owner.Name + "/" + ctx.Params("image")
lockManifest.CheckIn(imagePath)
defer lockManifest.CheckOut(imagePath)
pvs, err := container_model.GetManifestVersions(ctx, opts)
if err != nil {
apiError(ctx, http.StatusInternalServerError, err)