{{$.locale.Tr "admin.monitor.process.cancel_notices" `` | Safe}}
-{{$.locale.Tr "admin.monitor.process.cancel_desc"}}
-diff --git a/modules/queue/base_channel.go b/modules/queue/base_channel.go index 27055faf4b..d03c72bdae 100644 --- a/modules/queue/base_channel.go +++ b/modules/queue/base_channel.go @@ -87,7 +87,9 @@ func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) { func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) { q.mu.Lock() defer q.mu.Unlock() - + if !q.isUnique { + return false, nil + } return q.set.Contains(string(data)), nil } @@ -107,7 +109,9 @@ func (q *baseChannel) Close() error { defer q.mu.Unlock() close(q.c) - q.set = container.Set[string]{} + if q.isUnique { + q.set = container.Set[string]{} + } return nil } @@ -119,5 +123,9 @@ func (q *baseChannel) RemoveAll(ctx context.Context) error { for q.c != nil && len(q.c) > 0 { <-q.c } + + if q.isUnique { + q.set = container.Set[string]{} + } return nil } diff --git a/modules/queue/base_levelqueue_unique.go b/modules/queue/base_levelqueue_unique.go index 7546221631..1acd504e32 100644 --- a/modules/queue/base_levelqueue_unique.go +++ b/modules/queue/base_levelqueue_unique.go @@ -77,6 +77,14 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error { } lq := (*levelUniqueQueue)(unsafe.Pointer(q.internal)) + for lq.q.Len() > 0 { + if _, err := lq.q.LPop(); err != nil { + return err + } + } + + // the "set" must be cleared after the "list" because there is no transaction. + // it's better to have duplicate items than losing items. members, err := lq.set.Members() if err != nil { return err // seriously corrupted @@ -84,10 +92,5 @@ func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error { for _, v := range members { _, _ = lq.set.Remove(v) } - for lq.q.Len() > 0 { - if _, err = lq.q.LPop(); err != nil { - return err - } - } return nil } diff --git a/modules/queue/base_redis.go b/modules/queue/base_redis.go index a294077cc6..a1e234943d 100644 --- a/modules/queue/base_redis.go +++ b/modules/queue/base_redis.go @@ -123,7 +123,10 @@ func (q *baseRedis) Close() error { func (q *baseRedis) RemoveAll(ctx context.Context) error { q.mu.Lock() defer q.mu.Unlock() + c1 := q.client.Del(ctx, q.cfg.QueueFullName) + // the "set" must be cleared after the "list" because there is no transaction. + // it's better to have duplicate items than losing items. c2 := q.client.Del(ctx, q.cfg.SetFullName) if c1.Err() != nil { return c1.Err() diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 03dbc72da4..95b3bad57b 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -33,6 +33,9 @@ type ManagedWorkerPoolQueue interface { // FlushWithContext tries to make the handler process all items in the queue synchronously. // It is for testing purpose only. It's not designed to be used in a cluster. FlushWithContext(ctx context.Context, timeout time.Duration) error + + // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected) + RemoveAllItems(ctx context.Context) error } var manager *Manager diff --git a/modules/queue/workerqueue.go b/modules/queue/workerqueue.go index 493bea17aa..de4485fa51 100644 --- a/modules/queue/workerqueue.go +++ b/modules/queue/workerqueue.go @@ -130,6 +130,11 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time. } } +// RemoveAllItems removes all items in the baes queue +func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error { + return q.baseQueue.RemoveAll(ctx) +} + func (q *WorkerPoolQueue[T]) marshal(data T) []byte { bs, err := json.Marshal(data) if err != nil { diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index f825220839..e5963e34c6 100644 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -3040,8 +3040,9 @@ monitor.next = Next Time monitor.previous = Previous Time monitor.execute_times = Executions monitor.process = Running Processes -monitor.stacktrace = Stacktraces -monitor.goroutines = %d Goroutines +monitor.stacktrace = Stacktrace +monitor.processes_count = %d Processes +monitor.download_diagnosis_report = Download diagnosis report monitor.desc = Description monitor.start = Start Time monitor.execute_time = Execution Time @@ -3050,6 +3051,7 @@ monitor.process.cancel = Cancel process monitor.process.cancel_desc = Cancelling a process may cause data loss monitor.process.cancel_notices = Cancel: %s? monitor.process.children = Children + monitor.queues = Queues monitor.queue = Queue: %s monitor.queue.name = Name @@ -3060,56 +3062,15 @@ monitor.queue.maxnumberworkers = Max Number of Workers monitor.queue.numberinqueue = Number in Queue monitor.queue.review = Review Config monitor.queue.review_add = Review/Add Workers -monitor.queue.configuration = Initial Configuration -monitor.queue.nopool.title = No Worker Pool -monitor.queue.nopool.desc = This queue wraps other queues and does not itself have a worker pool. -monitor.queue.wrapped.desc = A wrapped queue wraps a slow starting queue, buffering queued requests in a channel. It does not have a worker pool itself. -monitor.queue.persistable-channel.desc = A persistable-channel wraps two queues, a channel queue that has its own worker pool and a level queue for persisted requests from previous shutdowns. It does not have a worker pool itself. -monitor.queue.flush = Flush worker -monitor.queue.pool.timeout = Timeout -monitor.queue.pool.addworkers.title = Add Workers -monitor.queue.pool.addworkers.submit = Add Workers -monitor.queue.pool.addworkers.desc = Add Workers to this pool with or without a timeout. If you set a timeout these workers will be removed from the pool after the timeout has lapsed. -monitor.queue.pool.addworkers.numberworkers.placeholder = Number of Workers -monitor.queue.pool.addworkers.timeout.placeholder = Set to 0 for no timeout -monitor.queue.pool.addworkers.mustnumbergreaterzero = Number of Workers to add must be greater than zero -monitor.queue.pool.addworkers.musttimeoutduration = Timeout must be a golang duration eg. 5m or be 0 -monitor.queue.pool.flush.title = Flush Queue -monitor.queue.pool.flush.desc = Flush will add a worker that will terminate once the queue is empty, or it times out. -monitor.queue.pool.flush.submit = Add Flush Worker -monitor.queue.pool.flush.added = Flush Worker added for %[1]s -monitor.queue.pool.pause.title = Pause Queue -monitor.queue.pool.pause.desc = Pausing a Queue will prevent it from processing data -monitor.queue.pool.pause.submit = Pause Queue -monitor.queue.pool.resume.title = Resume Queue -monitor.queue.pool.resume.desc = Set this queue to resume work -monitor.queue.pool.resume.submit = Resume Queue - monitor.queue.settings.title = Pool Settings -monitor.queue.settings.desc = Pools dynamically grow with a boost in response to their worker queue blocking. These changes will not affect current worker groups. -monitor.queue.settings.timeout = Boost Timeout -monitor.queue.settings.timeout.placeholder = Currently %[1]v -monitor.queue.settings.timeout.error = Timeout must be a golang duration eg. 5m or be 0 -monitor.queue.settings.numberworkers = Boost Number of Workers -monitor.queue.settings.numberworkers.placeholder = Currently %[1]d -monitor.queue.settings.numberworkers.error = Number of Workers to add must be greater than or equal to zero +monitor.queue.settings.desc = Pools dynamically grow in response to their worker queue blocking. monitor.queue.settings.maxnumberworkers = Max Number of workers monitor.queue.settings.maxnumberworkers.placeholder = Currently %[1]d monitor.queue.settings.maxnumberworkers.error = Max number of workers must be a number monitor.queue.settings.submit = Update Settings monitor.queue.settings.changed = Settings Updated -monitor.queue.settings.blocktimeout = Current Block Timeout -monitor.queue.settings.blocktimeout.value = %[1]v - -monitor.queue.pool.none = This queue does not have a Pool -monitor.queue.pool.added = Worker Group Added -monitor.queue.pool.max_changed = Maximum number of workers changed -monitor.queue.pool.workers.title = Active Worker Groups -monitor.queue.pool.workers.none = No worker groups. -monitor.queue.pool.cancel = Shutdown Worker Group -monitor.queue.pool.cancelling = Worker Group shutting down -monitor.queue.pool.cancel_notices = Shutdown this group of %s workers? -monitor.queue.pool.cancel_desc = Leaving a queue without any worker groups may cause requests to block indefinitely. +monitor.queue.settings.remove_all_items = Remove all +monitor.queue.settings.remove_all_items_done = All items in the queue have been removed. notices.system_notice_list = System Notices notices.view_detail_header = View Notice Details diff --git a/routers/web/admin/admin.go b/routers/web/admin/admin.go index cbe1482a24..1ada4deefc 100644 --- a/routers/web/admin/admin.go +++ b/routers/web/admin/admin.go @@ -13,8 +13,6 @@ import ( activities_model "code.gitea.io/gitea/models/activities" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" - "code.gitea.io/gitea/modules/process" - "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/updatechecker" "code.gitea.io/gitea/modules/web" @@ -24,7 +22,8 @@ import ( const ( tplDashboard base.TplName = "admin/dashboard" - tplMonitor base.TplName = "admin/monitor" + tplCron base.TplName = "admin/cron" + tplQueue base.TplName = "admin/queue" tplStacktrace base.TplName = "admin/stacktrace" tplQueueManage base.TplName = "admin/queue_manage" ) @@ -142,47 +141,15 @@ func DashboardPost(ctx *context.Context) { } } if form.From == "monitor" { - ctx.Redirect(setting.AppSubURL + "/admin/monitor") + ctx.Redirect(setting.AppSubURL + "/admin/monitor/cron") } else { ctx.Redirect(setting.AppSubURL + "/admin") } } -// Monitor show admin monitor page -func Monitor(ctx *context.Context) { - ctx.Data["Title"] = ctx.Tr("admin.monitor") - ctx.Data["PageIsAdminMonitor"] = true - ctx.Data["Processes"], ctx.Data["ProcessCount"] = process.GetManager().Processes(false, true) +func CronTasks(ctx *context.Context) { + ctx.Data["Title"] = ctx.Tr("admin.monitor.cron") + ctx.Data["PageIsAdminMonitorCron"] = true ctx.Data["Entries"] = cron.ListTasks() - ctx.Data["Queues"] = queue.GetManager().ManagedQueues() - - ctx.HTML(http.StatusOK, tplMonitor) -} - -// GoroutineStacktrace show admin monitor goroutines page -func GoroutineStacktrace(ctx *context.Context) { - ctx.Data["Title"] = ctx.Tr("admin.monitor") - ctx.Data["PageIsAdminMonitor"] = true - - processStacks, processCount, goroutineCount, err := process.GetManager().ProcessStacktraces(false, false) - if err != nil { - ctx.ServerError("GoroutineStacktrace", err) - return - } - - ctx.Data["ProcessStacks"] = processStacks - - ctx.Data["GoroutineCount"] = goroutineCount - ctx.Data["ProcessCount"] = processCount - - ctx.HTML(http.StatusOK, tplStacktrace) -} - -// MonitorCancel cancels a process -func MonitorCancel(ctx *context.Context) { - pid := ctx.Params("pid") - process.GetManager().Cancel(process.IDType(pid)) - ctx.JSON(http.StatusOK, map[string]interface{}{ - "redirect": setting.AppSubURL + "/admin/monitor", - }) + ctx.HTML(http.StatusOK, tplCron) } diff --git a/routers/web/admin/diagnosis.go b/routers/web/admin/diagnosis.go new file mode 100644 index 0000000000..5637894e6d --- /dev/null +++ b/routers/web/admin/diagnosis.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Gitea Authors. +// SPDX-License-Identifier: MIT + +package admin + +import ( + "archive/zip" + "fmt" + "runtime/pprof" + "time" + + "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/httplib" +) + +func MonitorDiagnosis(ctx *context.Context) { + seconds := ctx.FormInt64("seconds") + if seconds <= 5 { + seconds = 5 + } + if seconds > 300 { + seconds = 300 + } + + httplib.ServeSetHeaders(ctx.Resp, &httplib.ServeHeaderOptions{ + ContentType: "application/zip", + Disposition: "attachment", + Filename: fmt.Sprintf("gitea-diagnosis-%s.zip", time.Now().Format("20060102-150405")), + }) + + zipWriter := zip.NewWriter(ctx.Resp) + defer zipWriter.Close() + + f, err := zipWriter.CreateHeader(&zip.FileHeader{Name: "goroutine-before.txt", Method: zip.Deflate, Modified: time.Now()}) + if err != nil { + ctx.ServerError("Failed to create zip file", err) + return + } + _ = pprof.Lookup("goroutine").WriteTo(f, 1) + + f, err = zipWriter.CreateHeader(&zip.FileHeader{Name: "cpu-profile.dat", Method: zip.Deflate, Modified: time.Now()}) + if err != nil { + ctx.ServerError("Failed to create zip file", err) + return + } + + err = pprof.StartCPUProfile(f) + if err == nil { + time.Sleep(time.Duration(seconds) * time.Second) + pprof.StopCPUProfile() + } else { + _, _ = f.Write([]byte(err.Error())) + } + + f, err = zipWriter.CreateHeader(&zip.FileHeader{Name: "goroutine-after.txt", Method: zip.Deflate, Modified: time.Now()}) + if err != nil { + ctx.ServerError("Failed to create zip file", err) + return + } + _ = pprof.Lookup("goroutine").WriteTo(f, 1) +} diff --git a/routers/web/admin/queue.go b/routers/web/admin/queue.go index 1d57bc54c9..4e01846ba8 100644 --- a/routers/web/admin/queue.go +++ b/routers/web/admin/queue.go @@ -12,8 +12,18 @@ import ( "code.gitea.io/gitea/modules/setting" ) -// Queue shows details for a specific queue -func Queue(ctx *context.Context) { +func Queues(ctx *context.Context) { + if !setting.IsProd { + initTestQueueOnce() + } + ctx.Data["Title"] = ctx.Tr("admin.monitor.queue") + ctx.Data["PageIsAdminMonitorQueue"] = true + ctx.Data["Queues"] = queue.GetManager().ManagedQueues() + ctx.HTML(http.StatusOK, tplQueue) +} + +// QueueManage shows details for a specific queue +func QueueManage(ctx *context.Context) { qid := ctx.ParamsInt64("qid") mq := queue.GetManager().GetManagedQueue(qid) if mq == nil { @@ -57,3 +67,23 @@ func QueueSet(ctx *context.Context) { ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed")) ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10)) } + +func QueueRemoveAllItems(ctx *context.Context) { + // Gitea's queue doesn't have transaction support + // So in rare cases, the queue could be corrupted/out-of-sync + // Site admin could remove all items from the queue to make it work again + qid := ctx.ParamsInt64("qid") + mq := queue.GetManager().GetManagedQueue(qid) + if mq == nil { + ctx.Status(http.StatusNotFound) + return + } + + if err := mq.RemoveAllItems(ctx); err != nil { + ctx.ServerError("RemoveAllItems", err) + return + } + + ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.remove_all_items_done")) + ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10)) +} diff --git a/routers/web/admin/queue_tester.go b/routers/web/admin/queue_tester.go new file mode 100644 index 0000000000..96373c4d5f --- /dev/null +++ b/routers/web/admin/queue_tester.go @@ -0,0 +1,72 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package admin + +import ( + gocontext "context" + "sync" + "time" + + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" + "code.gitea.io/gitea/modules/setting" +) + +var testQueueOnce sync.Once + +// initTestQueueOnce initializes the test queue for dev mode +// the test queue will also be shown in the queue list +// developers could see the queue length / worker number / items number on the admin page and try to remove the items +func initTestQueueOnce() { + testQueueOnce.Do(func() { + qs := setting.QueueSettings{ + Name: "test-queue", + Type: "channel", + Length: 20, + BatchLength: 2, + MaxWorkers: 3, + } + testQueue, err := queue.NewWorkerPoolQueueBySetting("test-queue", qs, func(t ...int64) (unhandled []int64) { + for range t { + select { + case <-graceful.GetManager().ShutdownContext().Done(): + case <-time.After(5 * time.Second): + } + } + return nil + }, true) + if err != nil { + log.Error("unable to create test queue: %v", err) + return + } + + queue.GetManager().AddManagedQueue(testQueue) + testQueue.SetWorkerMaxNumber(5) + go graceful.GetManager().RunWithShutdownFns(testQueue.Run) + go graceful.GetManager().RunWithShutdownContext(func(ctx gocontext.Context) { + cnt := int64(0) + adding := true + for { + select { + case <-ctx.Done(): + case <-time.After(500 * time.Millisecond): + if adding { + if testQueue.GetQueueItemNumber() == qs.Length { + adding = false + } + } else { + if testQueue.GetQueueItemNumber() == 0 { + adding = true + } + } + if adding { + _ = testQueue.Push(cnt) + cnt++ + } + } + } + }) + }) +} diff --git a/routers/web/admin/stacktrace.go b/routers/web/admin/stacktrace.go new file mode 100644 index 0000000000..4b225c2c8a --- /dev/null +++ b/routers/web/admin/stacktrace.go @@ -0,0 +1,48 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package admin + +import ( + "net/http" + "runtime" + + "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/setting" +) + +// Stacktrace show admin monitor goroutines page +func Stacktrace(ctx *context.Context) { + ctx.Data["Title"] = ctx.Tr("admin.monitor") + ctx.Data["PageIsAdminMonitorStacktrace"] = true + + ctx.Data["GoroutineCount"] = runtime.NumGoroutine() + + show := ctx.FormString("show") + ctx.Data["ShowGoroutineList"] = show + // by default, do not do anything which might cause server errors, to avoid unnecessary 500 pages. + // this page is the entrance of the chance to collect diagnosis report. + if show != "" { + showNoSystem := show == "process" + processStacks, processCount, _, err := process.GetManager().ProcessStacktraces(false, showNoSystem) + if err != nil { + ctx.ServerError("GoroutineStacktrace", err) + return + } + + ctx.Data["ProcessStacks"] = processStacks + ctx.Data["ProcessCount"] = processCount + } + + ctx.HTML(http.StatusOK, tplStacktrace) +} + +// StacktraceCancel cancels a process +func StacktraceCancel(ctx *context.Context) { + pid := ctx.Params("pid") + process.GetManager().Cancel(process.IDType(pid)) + ctx.JSON(http.StatusOK, map[string]interface{}{ + "redirect": setting.AppSubURL + "/admin/monitor/stacktrace", + }) +} diff --git a/routers/web/web.go b/routers/web/web.go index b0db8892ea..d0deb845fc 100644 --- a/routers/web/web.go +++ b/routers/web/web.go @@ -546,13 +546,16 @@ func registerRoutes(m *web.Route) { }) m.Group("/monitor", func() { - m.Get("", admin.Monitor) - m.Get("/stacktrace", admin.GoroutineStacktrace) - m.Post("/cancel/{pid}", admin.MonitorCancel) + m.Get("/cron", admin.CronTasks) + m.Get("/stacktrace", admin.Stacktrace) + m.Post("/stacktrace/cancel/{pid}", admin.StacktraceCancel) + m.Get("/queue", admin.Queues) m.Group("/queue/{qid}", func() { - m.Get("", admin.Queue) + m.Get("", admin.QueueManage) m.Post("/set", admin.QueueSet) + m.Post("/remove-all-items", admin.QueueRemoveAllItems) }) + m.Get("/diagnosis", admin.MonitorDiagnosis) }) m.Group("/users", func() { diff --git a/templates/admin/cron.tmpl b/templates/admin/cron.tmpl index a7f4405620..fe3c88aec1 100644 --- a/templates/admin/cron.tmpl +++ b/templates/admin/cron.tmpl @@ -1,35 +1,39 @@ -
{{$.locale.Tr "admin.monitor.process.cancel_notices" `` | Safe}}
-{{$.locale.Tr "admin.monitor.process.cancel_desc"}}
-{{.locale.Tr "admin.monitor.queue.name"}} | -{{.locale.Tr "admin.monitor.queue.type"}} | -{{.locale.Tr "admin.monitor.queue.exemplar"}} | -{{.locale.Tr "admin.monitor.queue.numberworkers"}} | -{{.locale.Tr "admin.monitor.queue.numberinqueue"}} | -- |
---|---|---|---|---|---|
{{$q.GetName}} | -{{$q.GetType}} | -{{$q.GetItemTypeName}} | -{{$sum := $q.GetWorkerNumber}}{{if lt $sum 0}}-{{else}}{{$sum}}{{end}} | -{{$sum = $q.GetQueueItemNumber}}{{if lt $sum 0}}-{{else}}{{$sum}}{{end}} | -{{if lt $sum 0}}{{$.locale.Tr "admin.monitor.queue.review"}}{{else}}{{$.locale.Tr "admin.monitor.queue.review_add"}}{{end}} - |
{{.locale.Tr "admin.monitor.queue.name"}} | +{{.locale.Tr "admin.monitor.queue.type"}} | +{{.locale.Tr "admin.monitor.queue.exemplar"}} | +{{.locale.Tr "admin.monitor.queue.numberworkers"}} | +{{.locale.Tr "admin.monitor.queue.numberinqueue"}} | ++ |
---|---|---|---|---|---|
{{$q.GetName}} | +{{$q.GetType}} | +{{$q.GetItemTypeName}} | +{{$sum := $q.GetWorkerNumber}}{{if lt $sum 0}}-{{else}}{{$sum}}{{end}} | +{{$sum = $q.GetQueueItemNumber}}{{if lt $sum 0}}-{{else}}{{$sum}}{{end}} | +{{if lt $sum 0}}{{$.locale.Tr "admin.monitor.queue.review"}}{{else}}{{$.locale.Tr "admin.monitor.queue.review_add"}}{{end}} + |