Skip to content

Implement systemd-notify protocol #21151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion contrib/systemd/gitea.service
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ After=network.target
# Uncomment the next line if you have repos with lots of files and get a HTTP 500 error because of that
# LimitNOFILE=524288:524288
RestartSec=2s
Type=simple
Type=notify
User=git
Group=git
WorkingDirectory=/var/lib/gitea/
Expand All @@ -62,6 +62,7 @@ WorkingDirectory=/var/lib/gitea/
ExecStart=/usr/local/bin/gitea web --config /etc/gitea/app.ini
Restart=always
Environment=USER=git HOME=/home/git GITEA_WORK_DIR=/var/lib/gitea
WatchdogSec=30s
# If you install Git to directory prefix other than default PATH (which happens
# for example if you install other versions of Git side-to-side with
# distribution version), uncomment below line and add that prefix to PATH
Expand Down
67 changes: 65 additions & 2 deletions modules/graceful/manager_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"os/signal"
"runtime/pprof"
"strconv"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -45,14 +46,49 @@ type Manager struct {

func newGracefulManager(ctx context.Context) *Manager {
manager := &Manager{
isChild: len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1,
isChild: len(os.Getenv(listenFDsEnv)) > 0 && os.Getppid() > 1,
lock: &sync.RWMutex{},
}
manager.createServerWaitGroup.Add(numberOfServersToCreate)
manager.start(ctx)
return manager
}

type systemdNotifyMsg string

const (
readyMsg systemdNotifyMsg = "READY=1"
stoppingMsg systemdNotifyMsg = "STOPPING=1"
reloadingMsg systemdNotifyMsg = "RELOADING=1"
watchdogMsg systemdNotifyMsg = "WATCHDOG=1"
)

func statusMsg(msg string) systemdNotifyMsg {
return systemdNotifyMsg("STATUS=" + msg)
}

func pidMsg() systemdNotifyMsg {
return systemdNotifyMsg("MAINPID=" + strconv.Itoa(os.Getpid()))
}

// Notify systemd of status via the notify protocol
func (g *Manager) notify(msg systemdNotifyMsg) {
conn, err := getNotifySocket()
if err != nil {
// the err is logged in getNotifySocket
return
}
if conn == nil {
return
}
defer conn.Close()

if _, err = conn.Write([]byte(msg)); err != nil {
log.Warn("Failed to notify NOTIFY_SOCKET: %v", err)
return
}
}

func (g *Manager) start(ctx context.Context) {
// Make contexts
g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx)
Expand All @@ -72,6 +108,8 @@ func (g *Manager) start(ctx context.Context) {

// Set the running state & handle signals
g.setState(stateRunning)
g.notify(statusMsg("Starting Gitea"))
g.notify(pidMsg())
go g.handleSignals(g.managerCtx)

// Handle clean up of unused provided listeners and delayed start-up
Expand All @@ -84,6 +122,7 @@ func (g *Manager) start(ctx context.Context) {
// Ignore the error here there's not much we can do with it
// They're logged in the CloseProvidedListeners function
_ = CloseProvidedListeners()
g.notify(readyMsg)
}()
if setting.StartupTimeout > 0 {
go func() {
Expand All @@ -104,6 +143,8 @@ func (g *Manager) start(ctx context.Context) {
return
case <-time.After(setting.StartupTimeout):
log.Error("Startup took too long! Shutting down")
g.notify(statusMsg("Startup took too long! Shutting down"))
g.notify(stoppingMsg)
g.doShutdown()
}
}()
Expand All @@ -126,6 +167,13 @@ func (g *Manager) handleSignals(ctx context.Context) {
syscall.SIGTSTP,
)

watchdogTimeout := getWatchdogTimeout()
t := &time.Ticker{}
if watchdogTimeout != 0 {
g.notify(watchdogMsg)
t = time.NewTicker(watchdogTimeout / 2)
}

pid := syscall.Getpid()
for {
select {
Expand All @@ -136,6 +184,7 @@ func (g *Manager) handleSignals(ctx context.Context) {
g.DoGracefulRestart()
case syscall.SIGUSR1:
log.Warn("PID %d. Received SIGUSR1. Releasing and reopening logs", pid)
g.notify(statusMsg("Releasing and reopening logs"))
if err := log.ReleaseReopen(); err != nil {
log.Error("Error whilst releasing and reopening logs: %v", err)
}
Expand All @@ -153,6 +202,8 @@ func (g *Manager) handleSignals(ctx context.Context) {
default:
log.Info("PID %d. Received %v.", pid, sig)
}
case <-t.C:
g.notify(watchdogMsg)
case <-ctx.Done():
log.Warn("PID: %d. Background context for manager closed - %v - Shutting down...", pid, ctx.Err())
g.DoGracefulShutdown()
Expand All @@ -169,6 +220,9 @@ func (g *Manager) doFork() error {
}
g.forked = true
g.lock.Unlock()

g.notify(reloadingMsg)

// We need to move the file logs to append pids
setting.RestartLogsWithPIDSuffix()

Expand All @@ -191,18 +245,27 @@ func (g *Manager) DoGracefulRestart() {
}
} else {
log.Info("PID: %d. Not set restartable. Shutting down...", os.Getpid())

g.notify(stoppingMsg)
g.doShutdown()
}
}

// DoImmediateHammer causes an immediate hammer
func (g *Manager) DoImmediateHammer() {
g.notify(statusMsg("Sending immediate hammer"))
g.doHammerTime(0 * time.Second)
}

// DoGracefulShutdown causes a graceful shutdown
func (g *Manager) DoGracefulShutdown() {
g.lock.Lock()
if !g.forked {
g.lock.Unlock()
g.notify(stoppingMsg)
} else {
g.lock.Unlock()
g.notify(statusMsg("Shutting down after fork"))
}
g.doShutdown()
}

Expand Down
88 changes: 81 additions & 7 deletions modules/graceful/net_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@ import (
"strconv"
"strings"
"sync"
"time"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
)

const (
listenFDs = "LISTEN_FDS"
startFD = 3
unlinkFDs = "GITEA_UNLINK_FDS"
listenFDsEnv = "LISTEN_FDS"
startFD = 3
unlinkFDsEnv = "GITEA_UNLINK_FDS"

notifySocketEnv = "NOTIFY_SOCKET"
watchdogTimeoutEnv = "WATCHDOG_USEC"
)

// In order to keep the working directory the same as when we started we record
Expand All @@ -38,25 +42,62 @@ var (
activeListenersToUnlink = []bool{}
providedListeners = []net.Listener{}
activeListeners = []net.Listener{}

notifySocketAddr string
watchdogTimeout time.Duration
)

func getProvidedFDs() (savedErr error) {
// Only inherit the provided FDS once but we will save the error so that repeated calls to this function will return the same error
once.Do(func() {
mutex.Lock()
defer mutex.Unlock()
// now handle some additional systemd provided things
notifySocketAddr = os.Getenv(notifySocketEnv)
if notifySocketAddr != "" {
log.Debug("Systemd Notify Socket provided: %s", notifySocketAddr)
savedErr = os.Unsetenv(notifySocketEnv)
if savedErr != nil {
log.Warn("Unable to Unset the NOTIFY_SOCKET environment variable: %v", savedErr)
return
}
// FIXME: We don't handle WATCHDOG_PID
timeoutStr := os.Getenv(watchdogTimeoutEnv)
if timeoutStr != "" {
savedErr = os.Unsetenv(watchdogTimeoutEnv)
if savedErr != nil {
log.Warn("Unable to Unset the WATCHDOG_USEC environment variable: %v", savedErr)
return
}

numFDs := os.Getenv(listenFDs)
s, err := strconv.ParseInt(timeoutStr, 10, 64)
if err != nil {
log.Error("Unable to parse the provided WATCHDOG_USEC: %v", err)
savedErr = fmt.Errorf("unable to parse the provided WATCHDOG_USEC: %w", err)
return
}
if s <= 0 {
log.Error("Unable to parse the provided WATCHDOG_USEC: %s should be a positive number", timeoutStr)
savedErr = fmt.Errorf("unable to parse the provided WATCHDOG_USEC: %s should be a positive number", timeoutStr)
return
}
watchdogTimeout = time.Duration(s) * time.Microsecond
}
} else {
log.Trace("No Systemd Notify Socket provided")
}

numFDs := os.Getenv(listenFDsEnv)
if numFDs == "" {
return
}
n, err := strconv.Atoi(numFDs)
if err != nil {
savedErr = fmt.Errorf("%s is not a number: %s. Err: %w", listenFDs, numFDs, err)
savedErr = fmt.Errorf("%s is not a number: %s. Err: %w", listenFDsEnv, numFDs, err)
return
}

fdsToUnlinkStr := strings.Split(os.Getenv(unlinkFDs), ",")
fdsToUnlinkStr := strings.Split(os.Getenv(unlinkFDsEnv), ",")
providedListenersToUnlink = make([]bool, n)
for _, fdStr := range fdsToUnlinkStr {
i, err := strconv.Atoi(fdStr)
Expand All @@ -73,7 +114,7 @@ func getProvidedFDs() (savedErr error) {
if err == nil {
// Close the inherited file if it's a listener
if err = file.Close(); err != nil {
savedErr = fmt.Errorf("error closing provided socket fd %d: %s", i, err)
savedErr = fmt.Errorf("error closing provided socket fd %d: %w", i, err)
return
}
providedListeners = append(providedListeners, l)
Expand Down Expand Up @@ -255,3 +296,36 @@ func getActiveListenersToUnlink() []bool {
copy(listenersToUnlink, activeListenersToUnlink)
return listenersToUnlink
}

func getNotifySocket() (*net.UnixConn, error) {
if err := getProvidedFDs(); err != nil {
// This error will be logged elsewhere
return nil, nil
}

if notifySocketAddr == "" {
return nil, nil
}

socketAddr := &net.UnixAddr{
Name: notifySocketAddr,
Net: "unixgram",
}

notifySocket, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
if err != nil {
log.Warn("failed to dial NOTIFY_SOCKET %s: %v", socketAddr, err)
return nil, err
}

return notifySocket, nil
}

func getWatchdogTimeout() time.Duration {
if err := getProvidedFDs(); err != nil {
// This error will be logged elsewhere
return 0
}

return watchdogTimeout
}
16 changes: 13 additions & 3 deletions modules/graceful/restart_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync"
"syscall"
"time"
)

var killParent sync.Once
Expand Down Expand Up @@ -70,11 +71,20 @@ func RestartProcess() (int, error) {
// Pass on the environment and replace the old count key with the new one.
var env []string
for _, v := range os.Environ() {
if !strings.HasPrefix(v, listenFDs+"=") {
if !strings.HasPrefix(v, listenFDsEnv+"=") {
env = append(env, v)
}
}
env = append(env, fmt.Sprintf("%s=%d", listenFDs, len(listeners)))
env = append(env, fmt.Sprintf("%s=%d", listenFDsEnv, len(listeners)))

if notifySocketAddr != "" {
env = append(env, fmt.Sprintf("%s=%s", notifySocketEnv, notifySocketAddr))
}

if watchdogTimeout != 0 {
watchdogStr := strconv.FormatInt(int64(watchdogTimeout/time.Millisecond), 10)
env = append(env, fmt.Sprintf("%s=%s", watchdogTimeoutEnv, watchdogStr))
}

sb := &strings.Builder{}
for i, unlink := range getActiveListenersToUnlink() {
Expand All @@ -87,7 +97,7 @@ func RestartProcess() (int, error) {
unlinkStr := sb.String()
if len(unlinkStr) > 0 {
unlinkStr = unlinkStr[:len(unlinkStr)-1]
env = append(env, fmt.Sprintf("%s=%s", unlinkFDs, unlinkStr))
env = append(env, fmt.Sprintf("%s=%s", unlinkFDsEnv, unlinkStr))
}

allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...)
Expand Down