feat: use dbfs to store log

This commit is contained in:
Jason Song
2022-10-18 13:32:52 +08:00
parent f633ec9704
commit d7b99e0028
7 changed files with 206 additions and 122 deletions

View File

@@ -19,19 +19,24 @@ import (
// Task represents a distribution of job
type Task struct {
ID int64
JobID int64
Job *RunJob `xorm:"-"`
Steps []*TaskStep `xorm:"-"`
Attempt int64
RunnerID int64 `xorm:"index"`
LogToFile bool // read log from database or from storage
LogURL string // url of the log file in storage
Result runnerv1.Result
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
ID int64
JobID int64
Job *RunJob `xorm:"-"`
Steps []*TaskStep `xorm:"-"`
Attempt int64
RunnerID int64 `xorm:"index"`
Result runnerv1.Result
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
LogURL string // dbfs:///a/b.log or s3://endpoint.com/a/b.log and etc.
LogLength int64 // lines count
LogSize int64 // blob size
LogIndexes []int64 `xorm:"JSON TEXT"` // line number to offset
LogExpired bool
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
func init() {
@@ -80,7 +85,7 @@ func (task *Task) FullSteps() []*TaskStep {
headStep := &TaskStep{
Name: "Set up job",
LogIndex: 0,
LogLength: -1, // no limit
LogLength: task.LogLength,
Started: task.Started,
}
if firstStep != nil {
@@ -93,7 +98,7 @@ func (task *Task) FullSteps() []*TaskStep {
}
if lastStep != nil {
tailStep.LogIndex = lastStep.LogIndex + lastStep.LogLength
tailStep.LogLength = -1 // no limit
tailStep.LogLength = task.LogLength - tailStep.LogIndex
tailStep.Started = lastStep.Stopped
}
steps := make([]*TaskStep, 0, len(task.Steps)+2)
@@ -208,7 +213,16 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) {
return task, true, nil
}
func UpdateTask(state *runnerv1.TaskState) error {
func UpdateTask(ctx context.Context, task *Task, cols ...string) error {
sess := db.GetEngine(ctx).ID(task.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
_, err := sess.Update(task)
return err
}
func UpdateTaskByState(state *runnerv1.TaskState) error {
stepStates := map[int64]*runnerv1.StepState{}
for _, v := range state.Steps {
stepStates[v.Id] = v

View File

@@ -1,77 +0,0 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package bots
import (
"fmt"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
)
// TaskLog represents a task's log, every task has a standalone table
type TaskLog struct {
ID int64 `xorm:"pk"`
Timestamp timeutil.TimeStamp
Content string `xorm:"LONGTEXT"`
}
func GetTaskLogTableName(taskID int64) string {
return fmt.Sprintf("bots_task_log_%d", taskID)
}
// CreateTaskLog table for a task
func CreateTaskLog(taskID int64) error {
return db.GetEngine(db.DefaultContext).
Table(GetTaskLogTableName(taskID)).
Sync(new(TaskLog))
}
func GetTaskLogs(taskID, index, length int64) (logs []*TaskLog, err error) {
sess := db.GetEngine(db.DefaultContext).Table(GetTaskLogTableName(taskID)).
Where("id>=?", index).OrderBy("id")
if length >= 0 {
sess.Limit(int(length))
}
err = sess.Find(&logs)
return
}
func InsertTaskLogs(taskID int64, logs []*TaskLog) (int64, error) {
if err := CreateTaskLog(taskID); err != nil {
return 0, err
}
table := GetTaskLogTableName(taskID)
// TODO: A more complete way to insert logs
// Be careful:
// - the id of a log can be 0
// - some logs may already exist in db
// - if use exec, consider different databases
// - the input should be ordered by id
// - the ids should be continuously increasing
// - the min id of input should be 1 + (the max id in db)
if len(logs) == 0 {
return 0, fmt.Errorf("no logs")
}
ack := logs[0].ID
sess := db.GetEngine(db.DefaultContext)
for _, v := range logs {
_, err := sess.Exec(fmt.Sprintf("INSERT IGNORE INTO %s (id, timestamp, content) VALUES (?,?,?)", table), v.ID, v.Timestamp, []byte(v.Content))
if err != nil {
log.Error("insert log %d of task %d: %v", v.ID, taskID, err)
break
}
ack = v.ID + 1
}
return ack, nil
}