mirror of
https://github.com/dunglas/frankenphp
synced 2024-11-21 15:22:16 +00:00
handle worker failures gracefully (#1038)
Some checks failed
Lint Code Base / Lint Code Base (push) Has been cancelled
Sanitizers / ${{ matrix.sanitizer }} (asan) (push) Has been cancelled
Sanitizers / ${{ matrix.sanitizer }} (msan) (push) Has been cancelled
Tests / tests (8.2) (push) Has been cancelled
Tests / tests (8.3) (push) Has been cancelled
Tests / tests (8.4) (push) Has been cancelled
Some checks failed
Lint Code Base / Lint Code Base (push) Has been cancelled
Sanitizers / ${{ matrix.sanitizer }} (asan) (push) Has been cancelled
Sanitizers / ${{ matrix.sanitizer }} (msan) (push) Has been cancelled
Tests / tests (8.2) (push) Has been cancelled
Tests / tests (8.3) (push) Has been cancelled
Tests / tests (8.4) (push) Has been cancelled
* handle failures gracefully * fix super-subtle race condition * address feedback: panic instead of fatal log and make vars into consts * pass the frankenphp context to worker-ready function * reset backoff and failures on normal restart * update docs * add test and fix race condition * fail sometimes but do not be pathological about it * Use title case Co-authored-by: Kévin Dunglas <kevin@dunglas.fr> * fix code style in php * define lifecycle metrics * ensure we update unregister the metrics and fix tests * update caddy tests and fix typo * update docs * no need for this --------- Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>
This commit is contained in:
parent
b8e5ad16cd
commit
aa585f7da0
@ -445,6 +445,18 @@ func TestWorkerMetrics(t *testing.T) {
|
|||||||
# HELP frankenphp_testdata_index_php_worker_request_count
|
# HELP frankenphp_testdata_index_php_worker_request_count
|
||||||
# TYPE frankenphp_testdata_index_php_worker_request_count counter
|
# TYPE frankenphp_testdata_index_php_worker_request_count counter
|
||||||
frankenphp_testdata_index_php_worker_request_count 10
|
frankenphp_testdata_index_php_worker_request_count 10
|
||||||
|
|
||||||
|
# HELP frankenphp_testdata_index_php_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
|
||||||
|
# TYPE frankenphp_testdata_index_php_ready_workers gauge
|
||||||
|
frankenphp_testdata_index_php_ready_workers 2
|
||||||
|
|
||||||
|
# HELP frankenphp_testdata_index_php_worker_crashes Number of PHP worker crashes for this worker
|
||||||
|
# TYPE frankenphp_testdata_index_php_worker_crashes counter
|
||||||
|
frankenphp_testdata_index_php_worker_crashes 0
|
||||||
|
|
||||||
|
# HELP frankenphp_testdata_index_php_worker_restarts Number of PHP worker restarts for this worker
|
||||||
|
# TYPE frankenphp_testdata_index_php_worker_restarts counter
|
||||||
|
frankenphp_testdata_index_php_worker_restarts 0
|
||||||
`
|
`
|
||||||
|
|
||||||
require.NoError(t,
|
require.NoError(t,
|
||||||
@ -456,6 +468,9 @@ func TestWorkerMetrics(t *testing.T) {
|
|||||||
"frankenphp_testdata_index_php_busy_workers",
|
"frankenphp_testdata_index_php_busy_workers",
|
||||||
"frankenphp_testdata_index_php_total_workers",
|
"frankenphp_testdata_index_php_total_workers",
|
||||||
"frankenphp_testdata_index_php_worker_request_count",
|
"frankenphp_testdata_index_php_worker_request_count",
|
||||||
|
"frankenphp_testdata_index_php_worker_crashes",
|
||||||
|
"frankenphp_testdata_index_php_worker_restarts",
|
||||||
|
"frankenphp_testdata_index_php_ready_workers",
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -531,6 +546,18 @@ func TestAutoWorkerConfig(t *testing.T) {
|
|||||||
# HELP frankenphp_testdata_index_php_worker_request_count
|
# HELP frankenphp_testdata_index_php_worker_request_count
|
||||||
# TYPE frankenphp_testdata_index_php_worker_request_count counter
|
# TYPE frankenphp_testdata_index_php_worker_request_count counter
|
||||||
frankenphp_testdata_index_php_worker_request_count 10
|
frankenphp_testdata_index_php_worker_request_count 10
|
||||||
|
|
||||||
|
# HELP frankenphp_testdata_index_php_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
|
||||||
|
# TYPE frankenphp_testdata_index_php_ready_workers gauge
|
||||||
|
frankenphp_testdata_index_php_ready_workers ` + workers + `
|
||||||
|
|
||||||
|
# HELP frankenphp_testdata_index_php_worker_crashes Number of PHP worker crashes for this worker
|
||||||
|
# TYPE frankenphp_testdata_index_php_worker_crashes counter
|
||||||
|
frankenphp_testdata_index_php_worker_crashes 0
|
||||||
|
|
||||||
|
# HELP frankenphp_testdata_index_php_worker_restarts Number of PHP worker restarts for this worker
|
||||||
|
# TYPE frankenphp_testdata_index_php_worker_restarts counter
|
||||||
|
frankenphp_testdata_index_php_worker_restarts 0
|
||||||
`
|
`
|
||||||
|
|
||||||
require.NoError(t,
|
require.NoError(t,
|
||||||
@ -542,5 +569,8 @@ func TestAutoWorkerConfig(t *testing.T) {
|
|||||||
"frankenphp_testdata_index_php_busy_workers",
|
"frankenphp_testdata_index_php_busy_workers",
|
||||||
"frankenphp_testdata_index_php_total_workers",
|
"frankenphp_testdata_index_php_total_workers",
|
||||||
"frankenphp_testdata_index_php_worker_request_count",
|
"frankenphp_testdata_index_php_worker_request_count",
|
||||||
|
"frankenphp_testdata_index_php_worker_crashes",
|
||||||
|
"frankenphp_testdata_index_php_worker_restarts",
|
||||||
|
"frankenphp_testdata_index_php_ready_workers",
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,9 @@ When [Caddy metrics](https://caddyserver.com/docs/metrics) are enabled, FrankenP
|
|||||||
* `frankenphp_[worker]_busy_workers`: The number of workers currently processing a request.
|
* `frankenphp_[worker]_busy_workers`: The number of workers currently processing a request.
|
||||||
* `frankenphp_[worker]_worker_request_time`: The time spent processing requests by all workers.
|
* `frankenphp_[worker]_worker_request_time`: The time spent processing requests by all workers.
|
||||||
* `frankenphp_[worker]_worker_request_count`: The number of requests processed by all workers.
|
* `frankenphp_[worker]_worker_request_count`: The number of requests processed by all workers.
|
||||||
|
* `frankenphp_[worker]_ready_workers`: The number of workers that have called `frankenphp_handle_request` at least once.
|
||||||
|
* `frankenphp_[worker]_worker_crashes`: The number of times a worker has unexpectedly terminated.
|
||||||
|
* `frankenphp_[worker]_worker_restarts`: The number of times a worker has been deliberately restarted.
|
||||||
* `frankenphp_total_threads`: The total number of PHP threads.
|
* `frankenphp_total_threads`: The total number of PHP threads.
|
||||||
* `frankenphp_busy_threads`: The number of PHP threads currently processing a request (running workers always consume a thread).
|
* `frankenphp_busy_threads`: The number of PHP threads currently processing a request (running workers always consume a thread).
|
||||||
|
|
||||||
|
@ -121,6 +121,14 @@ A workaround to using this type of code in worker mode is to restart the worker
|
|||||||
|
|
||||||
The previous worker snippet allows configuring a maximum number of request to handle by setting an environment variable named `MAX_REQUESTS`.
|
The previous worker snippet allows configuring a maximum number of request to handle by setting an environment variable named `MAX_REQUESTS`.
|
||||||
|
|
||||||
|
### Worker Failures
|
||||||
|
|
||||||
|
If a worker script crashes with a non-zero exit code, FrankenPHP will restart it with an exponential backoff strategy.
|
||||||
|
If the worker script stays up longer than the last backoff * 2,
|
||||||
|
it will not penalize the worker script and restart it again.
|
||||||
|
However, if the worker script continues to fail with a non-zero exit code in a short period of time
|
||||||
|
(for example, having a typo in a script), FrankenPHP will crash with the error: `too many consecutive failures`.
|
||||||
|
|
||||||
## Superglobals Behavior
|
## Superglobals Behavior
|
||||||
|
|
||||||
[PHP superglobals](https://www.php.net/manual/en/language.variables.superglobals.php) (`$_SERVER`, `$_ENV`, `$_GET`...)
|
[PHP superglobals](https://www.php.net/manual/en/language.variables.superglobals.php) (`$_SERVER`, `$_ENV`, `$_GET`...)
|
||||||
|
@ -342,7 +342,7 @@ PHP_FUNCTION(frankenphp_handle_request) {
|
|||||||
ctx->worker_ready = true;
|
ctx->worker_ready = true;
|
||||||
|
|
||||||
/* Mark the worker as ready to handle requests */
|
/* Mark the worker as ready to handle requests */
|
||||||
go_frankenphp_worker_ready();
|
go_frankenphp_worker_ready(ctx->main_request);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef ZEND_MAX_EXECUTION_TIMERS
|
#ifdef ZEND_MAX_EXECUTION_TIMERS
|
||||||
|
@ -125,6 +125,9 @@ type FrankenPHPContext struct {
|
|||||||
// Whether the request is already closed by us
|
// Whether the request is already closed by us
|
||||||
closed sync.Once
|
closed sync.Once
|
||||||
|
|
||||||
|
// whether the context is ready to receive requests
|
||||||
|
ready bool
|
||||||
|
|
||||||
responseWriter http.ResponseWriter
|
responseWriter http.ResponseWriter
|
||||||
exitStatus C.int
|
exitStatus C.int
|
||||||
|
|
||||||
|
@ -609,6 +609,18 @@ func testRequestHeaders(t *testing.T, opts *testOptions) {
|
|||||||
}, opts)
|
}, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFailingWorker(t *testing.T) {
|
||||||
|
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) {
|
||||||
|
req := httptest.NewRequest("GET", "http://example.com/failing-worker.php", nil)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
handler(w, req)
|
||||||
|
|
||||||
|
resp := w.Result()
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
assert.Contains(t, string(body), "ok")
|
||||||
|
}, &testOptions{workerScript: "failing-worker.php"})
|
||||||
|
}
|
||||||
|
|
||||||
func TestFileUpload_module(t *testing.T) { testFileUpload(t, &testOptions{}) }
|
func TestFileUpload_module(t *testing.T) { testFileUpload(t, &testOptions{}) }
|
||||||
func TestFileUpload_worker(t *testing.T) {
|
func TestFileUpload_worker(t *testing.T) {
|
||||||
testFileUpload(t, &testOptions{workerScript: "file-upload.php"})
|
testFileUpload(t, &testOptions{workerScript: "file-upload.php"})
|
||||||
|
87
metrics.go
87
metrics.go
@ -11,11 +11,21 @@ import (
|
|||||||
var metricsNameRegex = regexp.MustCompile(`\W+`)
|
var metricsNameRegex = regexp.MustCompile(`\W+`)
|
||||||
var metricsNameFixRegex = regexp.MustCompile(`^_+|_+$`)
|
var metricsNameFixRegex = regexp.MustCompile(`^_+|_+$`)
|
||||||
|
|
||||||
|
const (
|
||||||
|
StopReasonCrash = iota
|
||||||
|
StopReasonRestart
|
||||||
|
StopReasonShutdown
|
||||||
|
)
|
||||||
|
|
||||||
|
type StopReason int
|
||||||
|
|
||||||
type Metrics interface {
|
type Metrics interface {
|
||||||
// StartWorker collects started workers
|
// StartWorker collects started workers
|
||||||
StartWorker(name string)
|
StartWorker(name string)
|
||||||
|
// ReadyWorker collects ready workers
|
||||||
|
ReadyWorker(name string)
|
||||||
// StopWorker collects stopped workers
|
// StopWorker collects stopped workers
|
||||||
StopWorker(name string)
|
StopWorker(name string, reason StopReason)
|
||||||
// TotalWorkers collects expected workers
|
// TotalWorkers collects expected workers
|
||||||
TotalWorkers(name string, num int)
|
TotalWorkers(name string, num int)
|
||||||
// TotalThreads collects total threads
|
// TotalThreads collects total threads
|
||||||
@ -36,7 +46,10 @@ type nullMetrics struct{}
|
|||||||
func (n nullMetrics) StartWorker(name string) {
|
func (n nullMetrics) StartWorker(name string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n nullMetrics) StopWorker(name string) {
|
func (n nullMetrics) ReadyWorker(name string) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n nullMetrics) StopWorker(name string, reason StopReason) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n nullMetrics) TotalWorkers(name string, num int) {
|
func (n nullMetrics) TotalWorkers(name string, num int) {
|
||||||
@ -66,6 +79,9 @@ type PrometheusMetrics struct {
|
|||||||
busyThreads prometheus.Gauge
|
busyThreads prometheus.Gauge
|
||||||
totalWorkers map[string]prometheus.Gauge
|
totalWorkers map[string]prometheus.Gauge
|
||||||
busyWorkers map[string]prometheus.Gauge
|
busyWorkers map[string]prometheus.Gauge
|
||||||
|
readyWorkers map[string]prometheus.Gauge
|
||||||
|
workerCrashes map[string]prometheus.Counter
|
||||||
|
workerRestarts map[string]prometheus.Counter
|
||||||
workerRequestTime map[string]prometheus.Counter
|
workerRequestTime map[string]prometheus.Counter
|
||||||
workerRequestCount map[string]prometheus.Counter
|
workerRequestCount map[string]prometheus.Counter
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
@ -81,7 +97,15 @@ func (m *PrometheusMetrics) StartWorker(name string) {
|
|||||||
m.totalWorkers[name].Inc()
|
m.totalWorkers[name].Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *PrometheusMetrics) StopWorker(name string) {
|
func (m *PrometheusMetrics) ReadyWorker(name string) {
|
||||||
|
if _, ok := m.totalWorkers[name]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.readyWorkers[name].Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *PrometheusMetrics) StopWorker(name string, reason StopReason) {
|
||||||
m.busyThreads.Dec()
|
m.busyThreads.Dec()
|
||||||
|
|
||||||
// tests do not register workers before starting them
|
// tests do not register workers before starting them
|
||||||
@ -89,6 +113,15 @@ func (m *PrometheusMetrics) StopWorker(name string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
m.totalWorkers[name].Dec()
|
m.totalWorkers[name].Dec()
|
||||||
|
m.readyWorkers[name].Dec()
|
||||||
|
|
||||||
|
if reason == StopReasonCrash {
|
||||||
|
m.workerCrashes[name].Inc()
|
||||||
|
} else if reason == StopReasonRestart {
|
||||||
|
m.workerRestarts[name].Inc()
|
||||||
|
} else if reason == StopReasonShutdown {
|
||||||
|
m.totalWorkers[name].Dec()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *PrometheusMetrics) getIdentity(name string) (string, error) {
|
func (m *PrometheusMetrics) getIdentity(name string) (string, error) {
|
||||||
@ -122,6 +155,36 @@ func (m *PrometheusMetrics) TotalWorkers(name string, num int) {
|
|||||||
m.registry.MustRegister(m.totalWorkers[identity])
|
m.registry.MustRegister(m.totalWorkers[identity])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := m.workerCrashes[identity]; !ok {
|
||||||
|
m.workerCrashes[identity] = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "frankenphp",
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "worker_crashes",
|
||||||
|
Help: "Number of PHP worker crashes for this worker",
|
||||||
|
})
|
||||||
|
m.registry.MustRegister(m.workerCrashes[identity])
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := m.workerRestarts[identity]; !ok {
|
||||||
|
m.workerRestarts[identity] = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "frankenphp",
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "worker_restarts",
|
||||||
|
Help: "Number of PHP worker restarts for this worker",
|
||||||
|
})
|
||||||
|
m.registry.MustRegister(m.workerRestarts[identity])
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := m.readyWorkers[identity]; !ok {
|
||||||
|
m.readyWorkers[identity] = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Namespace: "frankenphp",
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "ready_workers",
|
||||||
|
Help: "Running workers that have successfully called frankenphp_handle_request at least once",
|
||||||
|
})
|
||||||
|
m.registry.MustRegister(m.readyWorkers[identity])
|
||||||
|
}
|
||||||
|
|
||||||
if _, ok := m.busyWorkers[identity]; !ok {
|
if _, ok := m.busyWorkers[identity]; !ok {
|
||||||
m.busyWorkers[identity] = prometheus.NewGauge(prometheus.GaugeOpts{
|
m.busyWorkers[identity] = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Namespace: "frankenphp",
|
Namespace: "frankenphp",
|
||||||
@ -200,6 +263,18 @@ func (m *PrometheusMetrics) Shutdown() {
|
|||||||
m.registry.Unregister(c)
|
m.registry.Unregister(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, c := range m.workerCrashes {
|
||||||
|
m.registry.Unregister(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range m.workerRestarts {
|
||||||
|
m.registry.Unregister(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, g := range m.readyWorkers {
|
||||||
|
m.registry.Unregister(g)
|
||||||
|
}
|
||||||
|
|
||||||
m.totalThreads = prometheus.NewCounter(prometheus.CounterOpts{
|
m.totalThreads = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Name: "frankenphp_total_threads",
|
Name: "frankenphp_total_threads",
|
||||||
Help: "Total number of PHP threads",
|
Help: "Total number of PHP threads",
|
||||||
@ -212,6 +287,9 @@ func (m *PrometheusMetrics) Shutdown() {
|
|||||||
m.busyWorkers = map[string]prometheus.Gauge{}
|
m.busyWorkers = map[string]prometheus.Gauge{}
|
||||||
m.workerRequestTime = map[string]prometheus.Counter{}
|
m.workerRequestTime = map[string]prometheus.Counter{}
|
||||||
m.workerRequestCount = map[string]prometheus.Counter{}
|
m.workerRequestCount = map[string]prometheus.Counter{}
|
||||||
|
m.workerRestarts = map[string]prometheus.Counter{}
|
||||||
|
m.workerCrashes = map[string]prometheus.Counter{}
|
||||||
|
m.readyWorkers = map[string]prometheus.Gauge{}
|
||||||
|
|
||||||
m.registry.MustRegister(m.totalThreads)
|
m.registry.MustRegister(m.totalThreads)
|
||||||
m.registry.MustRegister(m.busyThreads)
|
m.registry.MustRegister(m.busyThreads)
|
||||||
@ -243,6 +321,9 @@ func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics {
|
|||||||
busyWorkers: map[string]prometheus.Gauge{},
|
busyWorkers: map[string]prometheus.Gauge{},
|
||||||
workerRequestTime: map[string]prometheus.Counter{},
|
workerRequestTime: map[string]prometheus.Counter{},
|
||||||
workerRequestCount: map[string]prometheus.Counter{},
|
workerRequestCount: map[string]prometheus.Counter{},
|
||||||
|
workerRestarts: map[string]prometheus.Counter{},
|
||||||
|
workerCrashes: map[string]prometheus.Counter{},
|
||||||
|
readyWorkers: map[string]prometheus.Gauge{},
|
||||||
}
|
}
|
||||||
|
|
||||||
m.registry.MustRegister(m.totalThreads)
|
m.registry.MustRegister(m.totalThreads)
|
||||||
|
@ -38,6 +38,9 @@ func createPrometheusMetrics() *PrometheusMetrics {
|
|||||||
busyWorkers: make(map[string]prometheus.Gauge),
|
busyWorkers: make(map[string]prometheus.Gauge),
|
||||||
workerRequestTime: make(map[string]prometheus.Counter),
|
workerRequestTime: make(map[string]prometheus.Counter),
|
||||||
workerRequestCount: make(map[string]prometheus.Counter),
|
workerRequestCount: make(map[string]prometheus.Counter),
|
||||||
|
workerCrashes: make(map[string]prometheus.Counter),
|
||||||
|
workerRestarts: make(map[string]prometheus.Counter),
|
||||||
|
readyWorkers: make(map[string]prometheus.Gauge),
|
||||||
mu: sync.Mutex{},
|
mu: sync.Mutex{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
18
testdata/failing-worker.php
vendored
Normal file
18
testdata/failing-worker.php
vendored
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
$fail = random_int(1, 100) < 1;
|
||||||
|
$wait = random_int(1, 5);
|
||||||
|
|
||||||
|
sleep($wait);
|
||||||
|
if ($fail) {
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (frankenphp_handle_request(function () {
|
||||||
|
echo "ok";
|
||||||
|
})) {
|
||||||
|
$fail = random_int(1, 100) < 10;
|
||||||
|
if ($fail) {
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
79
worker.go
79
worker.go
@ -10,6 +10,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime/cgo"
|
"runtime/cgo"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
@ -20,7 +21,7 @@ var (
|
|||||||
workersReadyWG sync.WaitGroup
|
workersReadyWG sync.WaitGroup
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: start all the worker in parallell to reduce the boot time
|
// TODO: start all the worker in parallel to reduce the boot time
|
||||||
func initWorkers(opt []workerOpt) error {
|
func initWorkers(opt []workerOpt) error {
|
||||||
for _, w := range opt {
|
for _, w := range opt {
|
||||||
if err := startWorkers(w.fileName, w.num, w.env); err != nil {
|
if err := startWorkers(w.fileName, w.num, w.env); err != nil {
|
||||||
@ -57,10 +58,40 @@ func startWorkers(fileName string, nbWorkers int, env PreparedEnv) error {
|
|||||||
env["FRANKENPHP_WORKER\x00"] = "1"
|
env["FRANKENPHP_WORKER\x00"] = "1"
|
||||||
|
|
||||||
l := getLogger()
|
l := getLogger()
|
||||||
|
|
||||||
|
const maxBackoff = 16 * time.Second
|
||||||
|
const minBackoff = 100 * time.Millisecond
|
||||||
|
const maxConsecutiveFailures = 3
|
||||||
|
|
||||||
for i := 0; i < nbWorkers; i++ {
|
for i := 0; i < nbWorkers; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer shutdownWG.Done()
|
defer shutdownWG.Done()
|
||||||
|
backoff := minBackoff
|
||||||
|
failureCount := 0
|
||||||
|
backingOffLock := sync.RWMutex{}
|
||||||
for {
|
for {
|
||||||
|
// if the worker can stay up longer than backoff*2, it is probably an application error
|
||||||
|
upFunc := sync.Once{}
|
||||||
|
go func() {
|
||||||
|
backingOffLock.RLock()
|
||||||
|
wait := backoff * 2
|
||||||
|
backingOffLock.RUnlock()
|
||||||
|
time.Sleep(wait)
|
||||||
|
upFunc.Do(func() {
|
||||||
|
backingOffLock.Lock()
|
||||||
|
defer backingOffLock.Unlock()
|
||||||
|
// if we come back to a stable state, reset the failure count
|
||||||
|
if backoff == minBackoff {
|
||||||
|
failureCount = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// earn back the backoff over time
|
||||||
|
if failureCount > 0 {
|
||||||
|
backoff = max(backoff/2, 100*time.Millisecond)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
|
||||||
// Create main dummy request
|
// Create main dummy request
|
||||||
r, err := http.NewRequest(http.MethodGet, filepath.Base(absFileName), nil)
|
r, err := http.NewRequest(http.MethodGet, filepath.Base(absFileName), nil)
|
||||||
|
|
||||||
@ -96,22 +127,58 @@ func startWorkers(fileName string, nbWorkers int, env PreparedEnv) error {
|
|||||||
|
|
||||||
// TODO: make the max restart configurable
|
// TODO: make the max restart configurable
|
||||||
if _, ok := workersRequestChans.Load(absFileName); ok {
|
if _, ok := workersRequestChans.Load(absFileName); ok {
|
||||||
metrics.StopWorker(absFileName)
|
if fc.ready {
|
||||||
|
fc.ready = false
|
||||||
|
workersReadyWG.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
workersReadyWG.Add(1)
|
workersReadyWG.Add(1)
|
||||||
if fc.exitStatus == 0 {
|
if fc.exitStatus == 0 {
|
||||||
if c := l.Check(zapcore.InfoLevel, "restarting"); c != nil {
|
if c := l.Check(zapcore.InfoLevel, "restarting"); c != nil {
|
||||||
c.Write(zap.String("worker", absFileName))
|
c.Write(zap.String("worker", absFileName))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// a normal restart resets the backoff and failure count
|
||||||
|
backingOffLock.Lock()
|
||||||
|
backoff = minBackoff
|
||||||
|
failureCount = 0
|
||||||
|
backingOffLock.Unlock()
|
||||||
|
metrics.StopWorker(absFileName, StopReasonRestart)
|
||||||
} else {
|
} else {
|
||||||
if c := l.Check(zapcore.ErrorLevel, "unexpected termination, restarting"); c != nil {
|
if c := l.Check(zapcore.ErrorLevel, "unexpected termination, restarting"); c != nil {
|
||||||
c.Write(zap.String("worker", absFileName), zap.Int("exit_status", int(fc.exitStatus)))
|
backingOffLock.RLock()
|
||||||
|
c.Write(zap.String("worker", absFileName), zap.Int("failure_count", failureCount), zap.Int("exit_status", int(fc.exitStatus)), zap.Duration("waiting", backoff))
|
||||||
|
backingOffLock.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
upFunc.Do(func() {
|
||||||
|
backingOffLock.Lock()
|
||||||
|
defer backingOffLock.Unlock()
|
||||||
|
// if we end up here, the worker has not been up for backoff*2
|
||||||
|
// this is probably due to a syntax error or another fatal error
|
||||||
|
if failureCount >= maxConsecutiveFailures {
|
||||||
|
panic(fmt.Errorf("workers %q: too many consecutive failures", absFileName))
|
||||||
|
} else {
|
||||||
|
failureCount += 1
|
||||||
|
}
|
||||||
|
})
|
||||||
|
backingOffLock.RLock()
|
||||||
|
wait := backoff
|
||||||
|
backingOffLock.RUnlock()
|
||||||
|
time.Sleep(wait)
|
||||||
|
backingOffLock.Lock()
|
||||||
|
backoff *= 2
|
||||||
|
backoff = min(backoff, maxBackoff)
|
||||||
|
backingOffLock.Unlock()
|
||||||
|
metrics.StopWorker(absFileName, StopReasonCrash)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics.StopWorker(absFileName, StopReasonShutdown)
|
||||||
|
|
||||||
// TODO: check if the termination is expected
|
// TODO: check if the termination is expected
|
||||||
if c := l.Check(zapcore.DebugLevel, "terminated"); c != nil {
|
if c := l.Check(zapcore.DebugLevel, "terminated"); c != nil {
|
||||||
c.Write(zap.String("worker", absFileName))
|
c.Write(zap.String("worker", absFileName))
|
||||||
@ -139,7 +206,11 @@ func stopWorkers() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//export go_frankenphp_worker_ready
|
//export go_frankenphp_worker_ready
|
||||||
func go_frankenphp_worker_ready() {
|
func go_frankenphp_worker_ready(mrh C.uintptr_t) {
|
||||||
|
mainRequest := cgo.Handle(mrh).Value().(*http.Request)
|
||||||
|
fc := mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
|
||||||
|
fc.ready = true
|
||||||
|
metrics.ReadyWorker(fc.scriptFilename)
|
||||||
workersReadyWG.Done()
|
workersReadyWG.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user