perf: remove all cgo handles (#1073)
Some checks failed
Lint Code Base / Lint Code Base (push) Has been cancelled
Sanitizers / ${{ matrix.sanitizer }} (asan) (push) Has been cancelled
Sanitizers / ${{ matrix.sanitizer }} (msan) (push) Has been cancelled
Tests / tests (8.2) (push) Has been cancelled
Tests / tests (8.3) (push) Has been cancelled
Tests / tests (8.4) (push) Has been cancelled

* Removes Cgo handles and adds phpThreads.

* Changes variable name.

* Changes variable name.

* Fixes merge error.

* Removes unnecessary 'workers are done' check.

* Removes panic.

* Replaces int with uint32_t.

* Changes index type to uintptr_t.

* simplify

---------

Co-authored-by: a.stecher <a.stecher@sportradar.com>
Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>
This commit is contained in:
Alexander Stecher 2024-10-09 07:31:09 +02:00 committed by GitHub
parent e9c075a4a5
commit d99b16a158
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 335 additions and 384 deletions

View File

@ -70,14 +70,15 @@ frankenphp_config frankenphp_get_config() {
}
typedef struct frankenphp_server_context {
uintptr_t current_request;
uintptr_t main_request;
bool has_main_request;
bool has_active_request;
bool worker_ready;
char *cookie_data;
bool finished;
} frankenphp_server_context;
__thread frankenphp_server_context *local_ctx = NULL;
__thread uintptr_t thread_index;
static void frankenphp_free_request_context() {
frankenphp_server_context *ctx = SG(server_context);
@ -231,9 +232,8 @@ PHP_FUNCTION(frankenphp_finish_request) { /* {{{ */
php_output_end_all();
php_header();
if (ctx->current_request != 0) {
go_frankenphp_finish_request(ctx->main_request, ctx->current_request,
false);
if (ctx->has_active_request) {
go_frankenphp_finish_request(thread_index, false);
}
ctx->finished = true;
@ -249,7 +249,7 @@ PHP_FUNCTION(frankenphp_request_headers) {
frankenphp_server_context *ctx = SG(server_context);
struct go_apache_request_headers_return headers =
go_apache_request_headers(ctx->current_request, ctx->main_request);
go_apache_request_headers(thread_index, ctx->has_active_request);
array_init_size(return_value, headers.r1);
@ -260,7 +260,7 @@ PHP_FUNCTION(frankenphp_request_headers) {
add_assoc_stringl_ex(return_value, key.data, key.len, val.data, val.len);
}
go_apache_request_cleanup(headers.r2);
go_apache_request_cleanup(thread_index);
}
/* }}} */
@ -327,7 +327,7 @@ PHP_FUNCTION(frankenphp_handle_request) {
frankenphp_server_context *ctx = SG(server_context);
if (ctx->main_request == 0) {
if (!ctx->has_main_request) {
/* not a worker, throw an error */
zend_throw_exception(
spl_ce_RuntimeException,
@ -340,9 +340,6 @@ PHP_FUNCTION(frankenphp_handle_request) {
frankenphp_worker_request_shutdown();
ctx->worker_ready = true;
/* Mark the worker as ready to handle requests */
go_frankenphp_worker_ready(ctx->main_request);
}
#ifdef ZEND_MAX_EXECUTION_TIMERS
@ -350,8 +347,7 @@ PHP_FUNCTION(frankenphp_handle_request) {
zend_unset_timeout();
#endif
uintptr_t request =
go_frankenphp_worker_handle_request_start(ctx->main_request);
bool request = go_frankenphp_worker_handle_request_start(thread_index);
if (frankenphp_worker_request_startup() == FAILURE
/* Shutting down */
|| !request) {
@ -384,8 +380,8 @@ PHP_FUNCTION(frankenphp_handle_request) {
}
frankenphp_worker_request_shutdown();
ctx->current_request = 0;
go_frankenphp_finish_request(ctx->main_request, request, true);
ctx->has_active_request = false;
go_frankenphp_finish_request(thread_index, true);
RETURN_TRUE;
}
@ -426,7 +422,7 @@ static zend_module_entry frankenphp_module = {
static void frankenphp_request_shutdown() {
frankenphp_server_context *ctx = SG(server_context);
if (ctx->main_request && ctx->current_request) {
if (ctx->has_main_request && ctx->has_active_request) {
frankenphp_destroy_super_globals();
}
@ -437,7 +433,7 @@ static void frankenphp_request_shutdown() {
}
int frankenphp_update_server_context(
bool create, uintptr_t current_request, uintptr_t main_request,
bool create, bool has_main_request, bool has_active_request,
const char *request_method, char *query_string, zend_long content_length,
char *path_translated, char *request_uri, const char *content_type,
@ -459,8 +455,8 @@ int frankenphp_update_server_context(
// It is not reset by zend engine, set it to 200.
SG(sapi_headers).http_response_code = 200;
ctx->main_request = main_request;
ctx->current_request = current_request;
ctx->has_main_request = has_main_request;
ctx->has_active_request = has_active_request;
SG(request_info).auth_password = auth_password;
SG(request_info).auth_user = auth_user;
@ -493,9 +489,8 @@ static size_t frankenphp_ub_write(const char *str, size_t str_length) {
return 0;
}
struct go_ub_write_return result = go_ub_write(
ctx->current_request ? ctx->current_request : ctx->main_request,
(char *)str, str_length);
struct go_ub_write_return result =
go_ub_write(thread_index, (char *)str, str_length);
if (result.r1) {
php_handle_aborted_connection();
@ -512,7 +507,7 @@ static int frankenphp_send_headers(sapi_headers_struct *sapi_headers) {
int status;
frankenphp_server_context *ctx = SG(server_context);
if (ctx->current_request == 0) {
if (!ctx->has_active_request) {
return SAPI_HEADER_SEND_FAILED;
}
@ -526,7 +521,7 @@ static int frankenphp_send_headers(sapi_headers_struct *sapi_headers) {
}
}
go_write_headers(ctx->current_request, status, &sapi_headers->headers);
go_write_headers(thread_index, status, &sapi_headers->headers);
return SAPI_HEADER_SENT_SUCCESSFULLY;
}
@ -534,7 +529,7 @@ static int frankenphp_send_headers(sapi_headers_struct *sapi_headers) {
static void frankenphp_sapi_flush(void *server_context) {
frankenphp_server_context *ctx = (frankenphp_server_context *)server_context;
if (ctx && ctx->current_request != 0 && go_sapi_flush(ctx->current_request)) {
if (ctx && ctx->has_active_request && go_sapi_flush(thread_index)) {
php_handle_aborted_connection();
}
}
@ -542,19 +537,19 @@ static void frankenphp_sapi_flush(void *server_context) {
static size_t frankenphp_read_post(char *buffer, size_t count_bytes) {
frankenphp_server_context *ctx = SG(server_context);
return ctx->current_request
? go_read_post(ctx->current_request, buffer, count_bytes)
return ctx->has_active_request
? go_read_post(thread_index, buffer, count_bytes)
: 0;
}
static char *frankenphp_read_cookies(void) {
frankenphp_server_context *ctx = SG(server_context);
if (ctx->current_request == 0) {
if (!ctx->has_active_request) {
return "";
}
ctx->cookie_data = go_read_cookies(ctx->current_request);
ctx->cookie_data = go_read_cookies(thread_index);
return ctx->cookie_data;
}
@ -664,16 +659,13 @@ void frankenphp_register_bulk_variables(go_string known_variables[27],
static void frankenphp_register_variables(zval *track_vars_array) {
/* https://www.php.net/manual/en/reserved.variables.server.php */
frankenphp_server_context *ctx = SG(server_context);
/* In CGI mode, we consider the environment to be a part of the server
* variables
*/
php_import_environment_variables(track_vars_array);
go_register_variables(ctx->current_request ? ctx->current_request
: ctx->main_request,
track_vars_array);
go_register_variables(thread_index, track_vars_array);
}
static void frankenphp_log_message(const char *message, int syslog_type_int) {
@ -732,6 +724,7 @@ static void set_thread_name(char *thread_name) {
static void *php_thread(void *arg) {
char thread_name[16] = {0};
snprintf(thread_name, 16, "php-%" PRIxPTR, (uintptr_t)arg);
thread_index = (uintptr_t)arg;
set_thread_name(thread_name);
#ifdef ZTS
@ -744,7 +737,7 @@ static void *php_thread(void *arg) {
local_ctx = malloc(sizeof(frankenphp_server_context));
while (go_handle_request()) {
while (go_handle_request(thread_index)) {
}
#ifdef ZTS

View File

@ -35,7 +35,6 @@ import (
"net/http"
"os"
"runtime"
"runtime/cgo"
"strconv"
"strings"
"sync"
@ -50,10 +49,8 @@ import (
)
type contextKeyStruct struct{}
type handleKeyStruct struct{}
var contextKey = contextKeyStruct{}
var handleKey = handleKeyStruct{}
var (
InvalidRequestError = errors.New("not a FrankenPHP request")
@ -125,15 +122,11 @@ type FrankenPHPContext struct {
// Whether the request is already closed by us
closed sync.Once
// whether the context is ready to receive requests
ready bool
responseWriter http.ResponseWriter
exitStatus C.int
done chan interface{}
currentWorkerRequest cgo.Handle
startedAt time.Time
done chan interface{}
startedAt time.Time
}
func clientHasClosed(r *http.Request) bool {
@ -197,7 +190,6 @@ func NewRequestWithContext(r *http.Request, opts ...RequestOption) (*http.Reques
fc.scriptFilename = sanitizedPathJoin(fc.documentRoot, fc.scriptName)
c := context.WithValue(r.Context(), contextKey, fc)
c = context.WithValue(c, handleKey, Handles())
return r.WithContext(c), nil
}
@ -338,6 +330,7 @@ func Init(options ...Option) error {
shutdownWG.Add(1)
done = make(chan struct{})
requestChan = make(chan *http.Request)
initPHPThreads(opt.numThreads)
if C.frankenphp_init(C.int(opt.numThreads)) != 0 {
return MainThreadCreationError
@ -386,6 +379,7 @@ func go_shutdown() {
func drainThreads() {
close(done)
shutdownWG.Wait()
phpThreads = nil
}
func getLogger() *zap.Logger {
@ -395,7 +389,7 @@ func getLogger() *zap.Logger {
return logger
}
func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) error {
func updateServerContext(request *http.Request, create bool, isWorkerRequest bool) error {
fc, ok := FromContext(request.Context())
if !ok {
return InvalidRequestError
@ -437,21 +431,12 @@ func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) er
}
cRequestUri := C.CString(request.URL.RequestURI())
var rh cgo.Handle
if fc.responseWriter == nil {
h := cgo.NewHandle(request)
request.Context().Value(handleKey).(*handleList).AddHandle(h)
mrh = C.uintptr_t(h)
} else {
rh = cgo.NewHandle(request)
request.Context().Value(handleKey).(*handleList).AddHandle(rh)
}
isBootingAWorkerScript := fc.responseWriter == nil
ret := C.frankenphp_update_server_context(
C.bool(create),
C.uintptr_t(rh),
mrh,
C.bool(isWorkerRequest || isBootingAWorkerScript),
C.bool(!isBootingAWorkerScript),
cMethod,
cQueryString,
@ -490,10 +475,10 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
rc := requestChan
// Detect if a worker is available to handle this request
if !isWorker {
if v, ok := workersRequestChans.Load(fc.scriptFilename); ok {
if worker, ok := workers[fc.scriptFilename]; ok {
isWorkerRequest = true
metrics.StartWorkerRequest(fc.scriptFilename)
rc = v.(chan *http.Request)
rc = worker.requestChan
} else {
metrics.StartRequest()
}
@ -517,14 +502,14 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
}
//export go_handle_request
func go_handle_request() bool {
func go_handle_request(threadIndex C.uintptr_t) bool {
select {
case <-done:
return false
case r := <-requestChan:
h := cgo.NewHandle(r)
r.Context().Value(handleKey).(*handleList).AddHandle(h)
thread := phpThreads[threadIndex]
thread.mainRequest = r
fc, ok := FromContext(r.Context())
if !ok {
@ -532,10 +517,10 @@ func go_handle_request() bool {
}
defer func() {
maybeCloseContext(fc)
r.Context().Value(handleKey).(*handleList).FreeAll()
thread.mainRequest = nil
}()
if err := updateServerContext(r, true, 0); err != nil {
if err := updateServerContext(r, true, false); err != nil {
panic(err)
}
@ -556,8 +541,8 @@ func maybeCloseContext(fc *FrankenPHPContext) {
}
//export go_ub_write
func go_ub_write(rh C.uintptr_t, cBuf *C.char, length C.int) (C.size_t, C.bool) {
r := cgo.Handle(rh).Value().(*http.Request)
func go_ub_write(threadIndex C.uintptr_t, cBuf *C.char, length C.int) (C.size_t, C.bool) {
r := phpThreads[threadIndex].getActiveRequest()
fc, _ := FromContext(r.Context())
var writer io.Writer
@ -595,12 +580,11 @@ var headerKeyCache = func() otter.Cache[string, string] {
}()
//export go_register_variables
func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) {
r := cgo.Handle(rh).Value().(*http.Request)
func go_register_variables(threadIndex C.uintptr_t, trackVarsArray *C.zval) {
thread := phpThreads[threadIndex]
r := thread.getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
p := &runtime.Pinner{}
dynamicVariables := make([]C.php_variable, len(fc.env)+len(r.Header))
var l int
@ -622,8 +606,8 @@ func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) {
kData := unsafe.StringData(k)
vData := unsafe.StringData(v)
p.Pin(kData)
p.Pin(vData)
thread.Pin(kData)
thread.Pin(vData)
dynamicVariables[l]._var = (*C.char)(unsafe.Pointer(kData))
dynamicVariables[l].data_len = C.size_t(len(v))
@ -640,8 +624,8 @@ func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) {
kData := unsafe.StringData(k)
vData := unsafe.Pointer(unsafe.StringData(v))
p.Pin(kData)
p.Pin(vData)
thread.Pin(kData)
thread.Pin(vData)
dynamicVariables[l]._var = (*C.char)(unsafe.Pointer(kData))
dynamicVariables[l].data_len = C.size_t(len(v))
@ -650,45 +634,43 @@ func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) {
l++
}
knownVariables := computeKnownVariables(r, p)
knownVariables := computeKnownVariables(r, &thread.Pinner)
dvsd := unsafe.SliceData(dynamicVariables)
p.Pin(dvsd)
thread.Pin(dvsd)
C.frankenphp_register_bulk_variables(&knownVariables[0], dvsd, C.size_t(l), trackVarsArray)
p.Unpin()
thread.Unpin()
fc.env = nil
}
//export go_apache_request_headers
func go_apache_request_headers(rh, mrh C.uintptr_t) (*C.go_string, C.size_t, C.uintptr_t) {
if rh == 0 {
func go_apache_request_headers(threadIndex C.uintptr_t, hasActiveRequest bool) (*C.go_string, C.size_t) {
thread := phpThreads[threadIndex]
if !hasActiveRequest {
// worker mode, not handling a request
mr := cgo.Handle(mrh).Value().(*http.Request)
mfc := mr.Context().Value(contextKey).(*FrankenPHPContext)
mfc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
if c := mfc.logger.Check(zapcore.DebugLevel, "apache_request_headers() called in non-HTTP context"); c != nil {
c.Write(zap.String("worker", mfc.scriptFilename))
}
return nil, 0, 0
return nil, 0
}
r := cgo.Handle(rh).Value().(*http.Request)
pinner := &runtime.Pinner{}
pinnerHandle := C.uintptr_t(cgo.NewHandle(pinner))
r := thread.getActiveRequest()
headers := make([]C.go_string, 0, len(r.Header)*2)
for field, val := range r.Header {
fd := unsafe.StringData(field)
pinner.Pin(fd)
thread.Pin(fd)
cv := strings.Join(val, ", ")
vd := unsafe.StringData(cv)
pinner.Pin(vd)
thread.Pin(vd)
headers = append(
headers,
@ -698,21 +680,14 @@ func go_apache_request_headers(rh, mrh C.uintptr_t) (*C.go_string, C.size_t, C.u
}
sd := unsafe.SliceData(headers)
pinner.Pin(sd)
thread.Pin(sd)
return sd, C.size_t(len(r.Header)), pinnerHandle
return sd, C.size_t(len(r.Header))
}
//export go_apache_request_cleanup
func go_apache_request_cleanup(rh C.uintptr_t) {
if rh == 0 {
return
}
h := cgo.Handle(rh)
p := h.Value().(*runtime.Pinner)
p.Unpin()
h.Delete()
func go_apache_request_cleanup(threadIndex C.uintptr_t) {
phpThreads[threadIndex].Unpin()
}
func addHeader(fc *FrankenPHPContext, cString *C.char, length C.int) {
@ -729,8 +704,8 @@ func addHeader(fc *FrankenPHPContext, cString *C.char, length C.int) {
}
//export go_write_headers
func go_write_headers(rh C.uintptr_t, status C.int, headers *C.zend_llist) {
r := cgo.Handle(rh).Value().(*http.Request)
func go_write_headers(threadIndex C.uintptr_t, status C.int, headers *C.zend_llist) {
r := phpThreads[threadIndex].getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
if fc.responseWriter == nil {
@ -757,8 +732,8 @@ func go_write_headers(rh C.uintptr_t, status C.int, headers *C.zend_llist) {
}
//export go_sapi_flush
func go_sapi_flush(rh C.uintptr_t) bool {
r := cgo.Handle(rh).Value().(*http.Request)
func go_sapi_flush(threadIndex C.uintptr_t) bool {
r := phpThreads[threadIndex].getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
if fc.responseWriter == nil || clientHasClosed(r) {
@ -775,8 +750,8 @@ func go_sapi_flush(rh C.uintptr_t) bool {
}
//export go_read_post
func go_read_post(rh C.uintptr_t, cBuf *C.char, countBytes C.size_t) (readBytes C.size_t) {
r := cgo.Handle(rh).Value().(*http.Request)
func go_read_post(threadIndex C.uintptr_t, cBuf *C.char, countBytes C.size_t) (readBytes C.size_t) {
r := phpThreads[threadIndex].getActiveRequest()
p := unsafe.Slice((*byte)(unsafe.Pointer(cBuf)), countBytes)
var err error
@ -790,8 +765,8 @@ func go_read_post(rh C.uintptr_t, cBuf *C.char, countBytes C.size_t) (readBytes
}
//export go_read_cookies
func go_read_cookies(rh C.uintptr_t) *C.char {
r := cgo.Handle(rh).Value().(*http.Request)
func go_read_cookies(threadIndex C.uintptr_t) *C.char {
r := phpThreads[threadIndex].getActiveRequest()
cookies := r.Cookies()
if len(cookies) == 0 {

View File

@ -43,7 +43,7 @@ frankenphp_config frankenphp_get_config();
int frankenphp_init(int num_threads);
int frankenphp_update_server_context(
bool create, uintptr_t current_request, uintptr_t main_request,
bool create, bool has_main_request, bool has_active_request,
const char *request_method, char *query_string, zend_long content_length,
char *path_translated, char *request_uri, const char *content_type,

33
php_thread.go Normal file
View File

@ -0,0 +1,33 @@
package frankenphp
// #include <stdint.h>
import "C"
import (
"net/http"
"runtime"
)
var phpThreads []*phpThread
type phpThread struct {
runtime.Pinner
mainRequest *http.Request
workerRequest *http.Request
worker *worker
}
func initPHPThreads(numThreads int) {
phpThreads = make([]*phpThread, 0, numThreads)
for i := 0; i < numThreads; i++ {
phpThreads = append(phpThreads, &phpThread{})
}
}
func (thread phpThread) getActiveRequest() *http.Request {
if thread.workerRequest != nil {
return thread.workerRequest
}
return thread.mainRequest
}

40
php_thread_test.go Normal file
View File

@ -0,0 +1,40 @@
package frankenphp
import (
"net/http"
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitializeTwoPhpThreadsWithoutRequests(t *testing.T) {
initPHPThreads(2)
assert.Len(t, phpThreads, 2)
assert.NotNil(t, phpThreads[0])
assert.NotNil(t, phpThreads[1])
assert.Nil(t, phpThreads[0].mainRequest)
assert.Nil(t, phpThreads[0].workerRequest)
}
func TestMainRequestIsActiveRequest(t *testing.T) {
mainRequest := &http.Request{}
initPHPThreads(1)
thread := phpThreads[0]
thread.mainRequest = mainRequest
assert.Equal(t, mainRequest, thread.getActiveRequest())
}
func TestWorkerRequestIsActiveRequest(t *testing.T) {
mainRequest := &http.Request{}
workerRequest := &http.Request{}
initPHPThreads(1)
thread := phpThreads[0]
thread.mainRequest = mainRequest
thread.workerRequest = workerRequest
assert.Equal(t, workerRequest, thread.getActiveRequest())
}

View File

@ -1,64 +0,0 @@
package frankenphp
// #include <stdlib.h>
import "C"
import (
"runtime/cgo"
"unsafe"
)
/*
FrankenPHP is fairly complex because it shuffles handles/requests/contexts
between C and Go. This simplifies the lifecycle management of per-request
structures by allowing us to hold references until the end of the request
and ensure they are always cleaned up.
*/
// PointerList A list of pointers that can be freed at a later time
type pointerList struct {
Pointers []unsafe.Pointer
}
// HandleList A list of pointers that can be freed at a later time
type handleList struct {
Handles []cgo.Handle
}
// AddHandle Call when registering a handle for the very first time
func (h *handleList) AddHandle(handle cgo.Handle) {
h.Handles = append(h.Handles, handle)
}
// AddPointer Call when creating a request-level C pointer for the very first time
func (p *pointerList) AddPointer(ptr unsafe.Pointer) {
p.Pointers = append(p.Pointers, ptr)
}
// FreeAll frees all C pointers
func (p *pointerList) FreeAll() {
for _, ptr := range p.Pointers {
C.free(ptr)
}
p.Pointers = nil // To avoid dangling pointers
}
// FreeAll frees all handles
func (h *handleList) FreeAll() {
for _, p := range h.Handles {
p.Delete()
}
}
// Pointers Get a new list of pointers
func Pointers() *pointerList {
return &pointerList{
Pointers: make([]unsafe.Pointer, 0),
}
}
// Handles Get a new list of handles
func Handles() *handleList {
return &handleList{
Handles: make([]cgo.Handle, 0, 8),
}
}

398
worker.go
View File

@ -4,11 +4,9 @@ package frankenphp
// #include "frankenphp.h"
import "C"
import (
"errors"
"fmt"
"net/http"
"path/filepath"
"runtime/cgo"
"sync"
"sync/atomic"
"time"
@ -18,194 +16,174 @@ import (
"go.uber.org/zap/zapcore"
)
type worker struct {
fileName string
num int
env PreparedEnv
requestChan chan *http.Request
}
const maxWorkerErrorBackoff = 1 * time.Second
const minWorkerErrorBackoff = 100 * time.Millisecond
const maxWorkerConsecutiveFailures = 60
var (
workersRequestChans sync.Map // map[fileName]chan *http.Request
workersReadyWG sync.WaitGroup
workerShutdownWG sync.WaitGroup
workersAreReady atomic.Bool
workersAreDone atomic.Bool
workersDone chan interface{}
workersReadyWG sync.WaitGroup
workerShutdownWG sync.WaitGroup
workersAreReady atomic.Bool
workersAreDone atomic.Bool
workersDone chan interface{}
workers map[string]*worker = make(map[string]*worker)
)
// TODO: start all the worker in parallel to reduce the boot time
func initWorkers(opt []workerOpt) error {
workersDone = make(chan interface{})
workersAreReady.Store(false)
workersAreDone.Store(false)
for _, w := range opt {
if err := startWorkers(w.fileName, w.num, w.env); err != nil {
for _, o := range opt {
worker, err := newWorker(o)
if err != nil {
return err
}
}
return nil
}
func startWorkers(fileName string, nbWorkers int, env PreparedEnv) error {
absFileName, err := filepath.Abs(fileName)
if err != nil {
return fmt.Errorf("workers %q: %w", fileName, err)
}
if _, ok := workersRequestChans.Load(absFileName); !ok {
workersRequestChans.Store(absFileName, make(chan *http.Request))
}
shutdownWG.Add(nbWorkers)
workerShutdownWG.Add(nbWorkers)
workersReadyWG.Add(nbWorkers)
var (
m sync.RWMutex
errs []error
)
if env == nil {
env = make(PreparedEnv, 1)
}
env["FRANKENPHP_WORKER\x00"] = "1"
l := getLogger()
const maxBackoff = 1 * time.Second
const minBackoff = 10 * time.Millisecond
const maxConsecutiveFailures = 60
for i := 0; i < nbWorkers; i++ {
go func() {
defer shutdownWG.Done()
defer workerShutdownWG.Done()
backoff := minBackoff
failureCount := 0
backingOffLock := sync.RWMutex{}
for {
// if the worker can stay up longer than backoff*2, it is probably an application error
upFunc := sync.Once{}
go func() {
backingOffLock.RLock()
wait := backoff * 2
backingOffLock.RUnlock()
time.Sleep(wait)
upFunc.Do(func() {
backingOffLock.Lock()
defer backingOffLock.Unlock()
// if we come back to a stable state, reset the failure count
if backoff == minBackoff {
failureCount = 0
}
// earn back the backoff over time
if failureCount > 0 {
backoff = max(backoff/2, 100*time.Millisecond)
}
})
}()
// Create main dummy request
r, err := http.NewRequest(http.MethodGet, filepath.Base(absFileName), nil)
metrics.StartWorker(absFileName)
if err != nil {
panic(err)
}
r, err = NewRequestWithContext(
r,
WithRequestDocumentRoot(filepath.Dir(absFileName), false),
WithRequestPreparedEnv(env),
)
if err != nil {
panic(err)
}
if c := l.Check(zapcore.DebugLevel, "starting"); c != nil {
c.Write(zap.String("worker", absFileName), zap.Int("num", nbWorkers))
}
if err := ServeHTTP(nil, r); err != nil {
panic(err)
}
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
if fc.currentWorkerRequest != 0 {
// Terminate the pending HTTP request handled by the worker
maybeCloseContext(fc.currentWorkerRequest.Value().(*http.Request).Context().Value(contextKey).(*FrankenPHPContext))
fc.currentWorkerRequest.Delete()
fc.currentWorkerRequest = 0
}
// TODO: make the max restart configurable
if !workersAreDone.Load() {
if fc.ready {
fc.ready = false
}
if fc.exitStatus == 0 {
if c := l.Check(zapcore.InfoLevel, "restarting"); c != nil {
c.Write(zap.String("worker", absFileName))
}
// a normal restart resets the backoff and failure count
backingOffLock.Lock()
backoff = minBackoff
failureCount = 0
backingOffLock.Unlock()
metrics.StopWorker(absFileName, StopReasonRestart)
} else {
// we will wait a few milliseconds to not overwhelm the logger in case of repeated unexpected terminations
if c := l.Check(zapcore.ErrorLevel, "unexpected termination, restarting"); c != nil {
backingOffLock.RLock()
c.Write(zap.String("worker", absFileName), zap.Int("failure_count", failureCount), zap.Int("exit_status", int(fc.exitStatus)), zap.Duration("waiting", backoff))
backingOffLock.RUnlock()
}
upFunc.Do(func() {
backingOffLock.Lock()
defer backingOffLock.Unlock()
// if we end up here, the worker has not been up for backoff*2
// this is probably due to a syntax error or another fatal error
if failureCount >= maxConsecutiveFailures {
panic(fmt.Errorf("workers %q: too many consecutive failures", absFileName))
} else {
failureCount += 1
}
})
backingOffLock.RLock()
wait := backoff
backingOffLock.RUnlock()
time.Sleep(wait)
backingOffLock.Lock()
backoff *= 2
backoff = min(backoff, maxBackoff)
backingOffLock.Unlock()
metrics.StopWorker(absFileName, StopReasonCrash)
}
} else {
break
}
}
metrics.StopWorker(absFileName, StopReasonShutdown)
// TODO: check if the termination is expected
if c := l.Check(zapcore.DebugLevel, "terminated"); c != nil {
c.Write(zap.String("worker", absFileName))
}
}()
workersReadyWG.Add(worker.num)
for i := 0; i < worker.num; i++ {
go worker.startNewWorkerThread()
}
}
workersReadyWG.Wait()
workersAreReady.Store(true)
m.Lock()
defer m.Unlock()
if len(errs) == 0 {
return nil
return nil
}
func newWorker(o workerOpt) (*worker, error) {
absFileName, err := filepath.Abs(o.fileName)
if err != nil {
return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
}
return fmt.Errorf("workers %q: error while starting: %w", fileName, errors.Join(errs...))
// if the worker already exists, return it
// it's necessary since we don't want to destroy the channels when restarting on file changes
if w, ok := workers[absFileName]; ok {
return w, nil
}
if o.env == nil {
o.env = make(PreparedEnv, 1)
}
o.env["FRANKENPHP_WORKER\x00"] = "1"
w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
workers[absFileName] = w
return w, nil
}
func (worker *worker) startNewWorkerThread() {
workerShutdownWG.Add(1)
defer workerShutdownWG.Done()
backoff := minWorkerErrorBackoff
failureCount := 0
backingOffLock := sync.RWMutex{}
for {
// if the worker can stay up longer than backoff*2, it is probably an application error
upFunc := sync.Once{}
go func() {
backingOffLock.RLock()
wait := backoff * 2
backingOffLock.RUnlock()
time.Sleep(wait)
upFunc.Do(func() {
backingOffLock.Lock()
defer backingOffLock.Unlock()
// if we come back to a stable state, reset the failure count
if backoff == minWorkerErrorBackoff {
failureCount = 0
}
// earn back the backoff over time
if failureCount > 0 {
backoff = max(backoff/2, 100*time.Millisecond)
}
})
}()
metrics.StartWorker(worker.fileName)
// Create main dummy request
r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
if err != nil {
panic(err)
}
r, err = NewRequestWithContext(
r,
WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
WithRequestPreparedEnv(worker.env),
)
if err != nil {
panic(err)
}
if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num))
}
if err := ServeHTTP(nil, r); err != nil {
panic(err)
}
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
// if we are done, exit the loop that restarts the worker script
if workersAreDone.Load() {
break
}
// on exit status 0 we just run the worker script again
if fc.exitStatus == 0 {
// TODO: make the max restart configurable
if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
metrics.StopWorker(worker.fileName, StopReasonRestart)
continue
}
// on exit status 1 we log the error and apply an exponential backoff when restarting
upFunc.Do(func() {
backingOffLock.Lock()
defer backingOffLock.Unlock()
// if we end up here, the worker has not been up for backoff*2
// this is probably due to a syntax error or another fatal error
if failureCount >= maxWorkerConsecutiveFailures {
panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
} else {
failureCount += 1
}
})
backingOffLock.RLock()
wait := backoff
backingOffLock.RUnlock()
time.Sleep(wait)
backingOffLock.Lock()
backoff *= 2
backoff = min(backoff, maxWorkerErrorBackoff)
backingOffLock.Unlock()
metrics.StopWorker(worker.fileName, StopReasonCrash)
}
metrics.StopWorker(worker.fileName, StopReasonShutdown)
// TODO: check if the termination is expected
if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
}
func stopWorkers() {
@ -217,7 +195,7 @@ func drainWorkers() {
watcher.DrainWatcher()
stopWorkers()
workerShutdownWG.Wait()
workersRequestChans = sync.Map{}
workers = make(map[string]*worker)
}
func restartWorkersOnFileChanges(workerOpts []workerOpt) error {
@ -249,82 +227,78 @@ func restartWorkers(workerOpts []workerOpt) {
logger.Info("workers restarted successfully")
}
//export go_frankenphp_worker_ready
func go_frankenphp_worker_ready(mrh C.uintptr_t) {
mainRequest := cgo.Handle(mrh).Value().(*http.Request)
fc := mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.ready = true
func assignThreadToWorker(thread *phpThread) {
fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
metrics.ReadyWorker(fc.scriptFilename)
worker, ok := workers[fc.scriptFilename]
if !ok {
panic("worker not found for script: " + fc.scriptFilename)
}
thread.worker = worker
if !workersAreReady.Load() {
workersReadyWG.Done()
}
// TODO: we can also store all threads assigned to the worker if needed
}
//export go_frankenphp_worker_handle_request_start
func go_frankenphp_worker_handle_request_start(mrh C.uintptr_t) C.uintptr_t {
mainRequest := cgo.Handle(mrh).Value().(*http.Request)
fc := mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
thread := phpThreads[threadIndex]
v, ok := workersRequestChans.Load(fc.scriptFilename)
if !ok {
// Probably shutting down
return 0
// we assign a worker to the thread if it doesn't have one already
if thread.worker == nil {
assignThreadToWorker(thread)
}
rc := v.(chan *http.Request)
l := getLogger()
if c := l.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
c.Write(zap.String("worker", fc.scriptFilename))
if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName))
}
var r *http.Request
select {
case <-workersDone:
if c := l.Check(zapcore.DebugLevel, "shutting down"); c != nil {
c.Write(zap.String("worker", fc.scriptFilename))
if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName))
}
thread.worker = nil
executePHPFunction("opcache_reset")
return 0
case r = <-rc:
return C.bool(false)
case r = <-thread.worker.requestChan:
}
fc.currentWorkerRequest = cgo.NewHandle(r)
r.Context().Value(handleKey).(*handleList).AddHandle(fc.currentWorkerRequest)
thread.workerRequest = r
if c := l.Check(zapcore.DebugLevel, "request handling started"); c != nil {
c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
}
if err := updateServerContext(r, false, mrh); err != nil {
if err := updateServerContext(r, false, true); err != nil {
// Unexpected error
if c := l.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI), zap.Error(err))
if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
}
return 0
return C.bool(false)
}
return C.uintptr_t(fc.currentWorkerRequest)
return C.bool(true)
}
//export go_frankenphp_finish_request
func go_frankenphp_finish_request(mrh, rh C.uintptr_t, deleteHandle bool) {
rHandle := cgo.Handle(rh)
r := rHandle.Value().(*http.Request)
func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) {
thread := phpThreads[threadIndex]
r := thread.getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
if deleteHandle {
r.Context().Value(handleKey).(*handleList).FreeAll()
cgo.Handle(mrh).Value().(*http.Request).Context().Value(contextKey).(*FrankenPHPContext).currentWorkerRequest = 0
if isWorkerRequest {
thread.workerRequest = nil
}
maybeCloseContext(fc)
if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
var fields []zap.Field
if mrh == 0 {
if isWorkerRequest {
fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
} else {
fields = append(fields, zap.String("url", r.RequestURI))