Skip to content

Commit b594cec

Browse files
Fix artifact v4 upload above 8MB (#31664)
Multiple chunks are uploaded with type "block" without using "appendBlock" and eventually out of order for bigger uploads. 8MB seems to be the chunk size This change parses the blockList uploaded after all blocks to get the final artifact size and order them correctly before calculating the sha256 checksum over all blocks Fixes #31354
1 parent 74f2ee3 commit b594cec

File tree

3 files changed

+286
-40
lines changed

3 files changed

+286
-40
lines changed

routers/api/actions/artifacts_chunks.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,54 @@ func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chun
123123
return chunksMap, nil
124124
}
125125

126+
func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
127+
storageDir := fmt.Sprintf("tmpv4%d", runID)
128+
var chunks []*chunkFileItem
129+
chunkMap := map[string]*chunkFileItem{}
130+
dummy := &chunkFileItem{}
131+
for _, name := range blist.Latest {
132+
chunkMap[name] = dummy
133+
}
134+
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
135+
baseName := filepath.Base(fpath)
136+
if !strings.HasPrefix(baseName, "block-") {
137+
return nil
138+
}
139+
// when read chunks from storage, it only contains storage dir and basename,
140+
// no matter the subdirectory setting in storage config
141+
item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
142+
var size int64
143+
var b64chunkName string
144+
if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil {
145+
return fmt.Errorf("parse content range error: %v", err)
146+
}
147+
rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
148+
if err != nil {
149+
return fmt.Errorf("failed to parse chunkName: %v", err)
150+
}
151+
chunkName := string(rchunkName)
152+
item.End = item.Start + size - 1
153+
if _, ok := chunkMap[chunkName]; ok {
154+
chunkMap[chunkName] = &item
155+
}
156+
return nil
157+
}); err != nil {
158+
return nil, err
159+
}
160+
for i, name := range blist.Latest {
161+
chunk, ok := chunkMap[name]
162+
if !ok || chunk.Path == "" {
163+
return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
164+
}
165+
chunks = append(chunks, chunk)
166+
if i > 0 {
167+
chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
168+
chunk.End += chunk.Start
169+
}
170+
}
171+
return chunks, nil
172+
}
173+
126174
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
127175
// read all db artifacts by name
128176
artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
@@ -230,7 +278,7 @@ func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st st
230278
rawChecksum := hash.Sum(nil)
231279
actualChecksum := hex.EncodeToString(rawChecksum)
232280
if !strings.HasSuffix(checksum, actualChecksum) {
233-
return fmt.Errorf("update artifact error checksum is invalid")
281+
return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
234282
}
235283
}
236284

routers/api/actions/artifactsv4.go

Lines changed: 107 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,15 @@ package actions
2424
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=block
2525
// 1.3. Continue Upload Zip Content to Blobstorage (unauthenticated request), repeat until everything is uploaded
2626
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=appendBlock
27-
// 1.4. Unknown xml payload to Blobstorage (unauthenticated request), ignored for now
27+
// 1.4. BlockList xml payload to Blobstorage (unauthenticated request)
28+
// Files of about 800MB are parallel in parallel and / or out of order, this file is needed to enshure the correct order
2829
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=blockList
30+
// Request
31+
// <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
32+
// <BlockList>
33+
// <Latest>blockId1</Latest>
34+
// <Latest>blockId2</Latest>
35+
// </BlockList>
2936
// 1.5. FinalizeArtifact
3037
// Post: /twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact
3138
// Request
@@ -82,6 +89,7 @@ import (
8289
"crypto/hmac"
8390
"crypto/sha256"
8491
"encoding/base64"
92+
"encoding/xml"
8593
"fmt"
8694
"io"
8795
"net/http"
@@ -152,31 +160,34 @@ func ArtifactsV4Routes(prefix string) *web.Router {
152160
return m
153161
}
154162

155-
func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID int64) []byte {
163+
func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID, artifactID int64) []byte {
156164
mac := hmac.New(sha256.New, setting.GetGeneralTokenSigningSecret())
157165
mac.Write([]byte(endp))
158166
mac.Write([]byte(expires))
159167
mac.Write([]byte(artifactName))
160168
mac.Write([]byte(fmt.Sprint(taskID)))
169+
mac.Write([]byte(fmt.Sprint(artifactID)))
161170
return mac.Sum(nil)
162171
}
163172

164-
func (r artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endp, artifactName string, taskID int64) string {
173+
func (r artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endp, artifactName string, taskID, artifactID int64) string {
165174
expires := time.Now().Add(60 * time.Minute).Format("2006-01-02 15:04:05.999999999 -0700 MST")
166175
uploadURL := strings.TrimSuffix(httplib.GuessCurrentAppURL(ctx), "/") + strings.TrimSuffix(r.prefix, "/") +
167-
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + fmt.Sprint(taskID)
176+
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID, artifactID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + fmt.Sprint(taskID) + "&artifactID=" + fmt.Sprint(artifactID)
168177
return uploadURL
169178
}
170179

171180
func (r artifactV4Routes) verifySignature(ctx *ArtifactContext, endp string) (*actions.ActionTask, string, bool) {
172181
rawTaskID := ctx.Req.URL.Query().Get("taskID")
182+
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
173183
sig := ctx.Req.URL.Query().Get("sig")
174184
expires := ctx.Req.URL.Query().Get("expires")
175185
artifactName := ctx.Req.URL.Query().Get("artifactName")
176186
dsig, _ := base64.URLEncoding.DecodeString(sig)
177187
taskID, _ := strconv.ParseInt(rawTaskID, 10, 64)
188+
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
178189

179-
expecedsig := r.buildSignature(endp, expires, artifactName, taskID)
190+
expecedsig := r.buildSignature(endp, expires, artifactName, taskID, artifactID)
180191
if !hmac.Equal(dsig, expecedsig) {
181192
log.Error("Error unauthorized")
182193
ctx.Error(http.StatusUnauthorized, "Error unauthorized")
@@ -271,6 +282,8 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
271282
return
272283
}
273284
artifact.ContentEncoding = ArtifactV4ContentEncoding
285+
artifact.FileSize = 0
286+
artifact.FileCompressedSize = 0
274287
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
275288
log.Error("Error UpdateArtifactByID: %v", err)
276289
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
@@ -279,7 +292,7 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
279292

280293
respData := CreateArtifactResponse{
281294
Ok: true,
282-
SignedUploadUrl: r.buildArtifactURL(ctx, "UploadArtifact", artifactName, ctx.ActionTask.ID),
295+
SignedUploadUrl: r.buildArtifactURL(ctx, "UploadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID),
283296
}
284297
r.sendProtbufBody(ctx, &respData)
285298
}
@@ -293,38 +306,77 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
293306
comp := ctx.Req.URL.Query().Get("comp")
294307
switch comp {
295308
case "block", "appendBlock":
296-
// get artifact by name
297-
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
298-
if err != nil {
299-
log.Error("Error artifact not found: %v", err)
300-
ctx.Error(http.StatusNotFound, "Error artifact not found")
301-
return
309+
blockid := ctx.Req.URL.Query().Get("blockid")
310+
if blockid == "" {
311+
// get artifact by name
312+
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
313+
if err != nil {
314+
log.Error("Error artifact not found: %v", err)
315+
ctx.Error(http.StatusNotFound, "Error artifact not found")
316+
return
317+
}
318+
319+
_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
320+
if err != nil {
321+
log.Error("Error runner api getting task: task is not running")
322+
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
323+
return
324+
}
325+
artifact.FileCompressedSize += ctx.Req.ContentLength
326+
artifact.FileSize += ctx.Req.ContentLength
327+
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
328+
log.Error("Error UpdateArtifactByID: %v", err)
329+
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
330+
return
331+
}
332+
} else {
333+
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/block-%d-%d-%s", task.Job.RunID, task.Job.RunID, ctx.Req.ContentLength, base64.URLEncoding.EncodeToString([]byte(blockid))), ctx.Req.Body, -1)
334+
if err != nil {
335+
log.Error("Error runner api getting task: task is not running")
336+
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
337+
return
338+
}
302339
}
303-
304-
if comp == "block" {
305-
artifact.FileSize = 0
306-
artifact.FileCompressedSize = 0
307-
}
308-
309-
_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
340+
ctx.JSON(http.StatusCreated, "appended")
341+
case "blocklist":
342+
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
343+
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
344+
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/%d-%d-blocklist", task.Job.RunID, task.Job.RunID, artifactID), ctx.Req.Body, -1)
310345
if err != nil {
311346
log.Error("Error runner api getting task: task is not running")
312347
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
313348
return
314349
}
315-
artifact.FileCompressedSize += ctx.Req.ContentLength
316-
artifact.FileSize += ctx.Req.ContentLength
317-
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
318-
log.Error("Error UpdateArtifactByID: %v", err)
319-
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
320-
return
321-
}
322-
ctx.JSON(http.StatusCreated, "appended")
323-
case "blocklist":
324350
ctx.JSON(http.StatusCreated, "created")
325351
}
326352
}
327353

354+
type BlockList struct {
355+
Latest []string `xml:"Latest"`
356+
}
357+
358+
type Latest struct {
359+
Value string `xml:",chardata"`
360+
}
361+
362+
func (r *artifactV4Routes) readBlockList(runID, artifactID int64) (*BlockList, error) {
363+
blockListName := fmt.Sprintf("tmpv4%d/%d-%d-blocklist", runID, runID, artifactID)
364+
s, err := r.fs.Open(blockListName)
365+
if err != nil {
366+
return nil, err
367+
}
368+
369+
xdec := xml.NewDecoder(s)
370+
blockList := &BlockList{}
371+
err = xdec.Decode(blockList)
372+
373+
delerr := r.fs.Delete(blockListName)
374+
if delerr != nil {
375+
log.Warn("Failed to delete blockList %s: %v", blockListName, delerr)
376+
}
377+
return blockList, err
378+
}
379+
328380
func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
329381
var req FinalizeArtifactRequest
330382

@@ -343,18 +395,34 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
343395
ctx.Error(http.StatusNotFound, "Error artifact not found")
344396
return
345397
}
346-
chunkMap, err := listChunksByRunID(r.fs, runID)
398+
399+
var chunks []*chunkFileItem
400+
blockList, err := r.readBlockList(runID, artifact.ID)
347401
if err != nil {
348-
log.Error("Error merge chunks: %v", err)
349-
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
350-
return
351-
}
352-
chunks, ok := chunkMap[artifact.ID]
353-
if !ok {
354-
log.Error("Error merge chunks")
355-
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
356-
return
402+
log.Warn("Failed to read BlockList, fallback to old behavior: %v", err)
403+
chunkMap, err := listChunksByRunID(r.fs, runID)
404+
if err != nil {
405+
log.Error("Error merge chunks: %v", err)
406+
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
407+
return
408+
}
409+
chunks, ok = chunkMap[artifact.ID]
410+
if !ok {
411+
log.Error("Error merge chunks")
412+
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
413+
return
414+
}
415+
} else {
416+
chunks, err = listChunksByRunIDV4(r.fs, runID, artifact.ID, blockList)
417+
if err != nil {
418+
log.Error("Error merge chunks: %v", err)
419+
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
420+
return
421+
}
422+
artifact.FileSize = chunks[len(chunks)-1].End + 1
423+
artifact.FileCompressedSize = chunks[len(chunks)-1].End + 1
357424
}
425+
358426
checksum := ""
359427
if req.Hash != nil {
360428
checksum = req.Hash.Value
@@ -455,7 +523,7 @@ func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) {
455523
}
456524
}
457525
if respData.SignedUrl == "" {
458-
respData.SignedUrl = r.buildArtifactURL(ctx, "DownloadArtifact", artifactName, ctx.ActionTask.ID)
526+
respData.SignedUrl = r.buildArtifactURL(ctx, "DownloadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID)
459527
}
460528
r.sendProtbufBody(ctx, &respData)
461529
}

0 commit comments

Comments
 (0)