diff --git a/frankenphp.c b/frankenphp.c index 9d8f926..e4bee60 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -70,14 +70,15 @@ frankenphp_config frankenphp_get_config() { } typedef struct frankenphp_server_context { - uintptr_t current_request; - uintptr_t main_request; + bool has_main_request; + bool has_active_request; bool worker_ready; char *cookie_data; bool finished; } frankenphp_server_context; __thread frankenphp_server_context *local_ctx = NULL; +__thread uintptr_t thread_index; static void frankenphp_free_request_context() { frankenphp_server_context *ctx = SG(server_context); @@ -231,9 +232,8 @@ PHP_FUNCTION(frankenphp_finish_request) { /* {{{ */ php_output_end_all(); php_header(); - if (ctx->current_request != 0) { - go_frankenphp_finish_request(ctx->main_request, ctx->current_request, - false); + if (ctx->has_active_request) { + go_frankenphp_finish_request(thread_index, false); } ctx->finished = true; @@ -249,7 +249,7 @@ PHP_FUNCTION(frankenphp_request_headers) { frankenphp_server_context *ctx = SG(server_context); struct go_apache_request_headers_return headers = - go_apache_request_headers(ctx->current_request, ctx->main_request); + go_apache_request_headers(thread_index, ctx->has_active_request); array_init_size(return_value, headers.r1); @@ -260,7 +260,7 @@ PHP_FUNCTION(frankenphp_request_headers) { add_assoc_stringl_ex(return_value, key.data, key.len, val.data, val.len); } - go_apache_request_cleanup(headers.r2); + go_apache_request_cleanup(thread_index); } /* }}} */ @@ -327,7 +327,7 @@ PHP_FUNCTION(frankenphp_handle_request) { frankenphp_server_context *ctx = SG(server_context); - if (ctx->main_request == 0) { + if (!ctx->has_main_request) { /* not a worker, throw an error */ zend_throw_exception( spl_ce_RuntimeException, @@ -340,9 +340,6 @@ PHP_FUNCTION(frankenphp_handle_request) { frankenphp_worker_request_shutdown(); ctx->worker_ready = true; - - /* Mark the worker as ready to handle requests */ - go_frankenphp_worker_ready(ctx->main_request); } #ifdef ZEND_MAX_EXECUTION_TIMERS @@ -350,8 +347,7 @@ PHP_FUNCTION(frankenphp_handle_request) { zend_unset_timeout(); #endif - uintptr_t request = - go_frankenphp_worker_handle_request_start(ctx->main_request); + bool request = go_frankenphp_worker_handle_request_start(thread_index); if (frankenphp_worker_request_startup() == FAILURE /* Shutting down */ || !request) { @@ -384,8 +380,8 @@ PHP_FUNCTION(frankenphp_handle_request) { } frankenphp_worker_request_shutdown(); - ctx->current_request = 0; - go_frankenphp_finish_request(ctx->main_request, request, true); + ctx->has_active_request = false; + go_frankenphp_finish_request(thread_index, true); RETURN_TRUE; } @@ -426,7 +422,7 @@ static zend_module_entry frankenphp_module = { static void frankenphp_request_shutdown() { frankenphp_server_context *ctx = SG(server_context); - if (ctx->main_request && ctx->current_request) { + if (ctx->has_main_request && ctx->has_active_request) { frankenphp_destroy_super_globals(); } @@ -437,7 +433,7 @@ static void frankenphp_request_shutdown() { } int frankenphp_update_server_context( - bool create, uintptr_t current_request, uintptr_t main_request, + bool create, bool has_main_request, bool has_active_request, const char *request_method, char *query_string, zend_long content_length, char *path_translated, char *request_uri, const char *content_type, @@ -459,8 +455,8 @@ int frankenphp_update_server_context( // It is not reset by zend engine, set it to 200. SG(sapi_headers).http_response_code = 200; - ctx->main_request = main_request; - ctx->current_request = current_request; + ctx->has_main_request = has_main_request; + ctx->has_active_request = has_active_request; SG(request_info).auth_password = auth_password; SG(request_info).auth_user = auth_user; @@ -493,9 +489,8 @@ static size_t frankenphp_ub_write(const char *str, size_t str_length) { return 0; } - struct go_ub_write_return result = go_ub_write( - ctx->current_request ? ctx->current_request : ctx->main_request, - (char *)str, str_length); + struct go_ub_write_return result = + go_ub_write(thread_index, (char *)str, str_length); if (result.r1) { php_handle_aborted_connection(); @@ -512,7 +507,7 @@ static int frankenphp_send_headers(sapi_headers_struct *sapi_headers) { int status; frankenphp_server_context *ctx = SG(server_context); - if (ctx->current_request == 0) { + if (!ctx->has_active_request) { return SAPI_HEADER_SEND_FAILED; } @@ -526,7 +521,7 @@ static int frankenphp_send_headers(sapi_headers_struct *sapi_headers) { } } - go_write_headers(ctx->current_request, status, &sapi_headers->headers); + go_write_headers(thread_index, status, &sapi_headers->headers); return SAPI_HEADER_SENT_SUCCESSFULLY; } @@ -534,7 +529,7 @@ static int frankenphp_send_headers(sapi_headers_struct *sapi_headers) { static void frankenphp_sapi_flush(void *server_context) { frankenphp_server_context *ctx = (frankenphp_server_context *)server_context; - if (ctx && ctx->current_request != 0 && go_sapi_flush(ctx->current_request)) { + if (ctx && ctx->has_active_request && go_sapi_flush(thread_index)) { php_handle_aborted_connection(); } } @@ -542,19 +537,19 @@ static void frankenphp_sapi_flush(void *server_context) { static size_t frankenphp_read_post(char *buffer, size_t count_bytes) { frankenphp_server_context *ctx = SG(server_context); - return ctx->current_request - ? go_read_post(ctx->current_request, buffer, count_bytes) + return ctx->has_active_request + ? go_read_post(thread_index, buffer, count_bytes) : 0; } static char *frankenphp_read_cookies(void) { frankenphp_server_context *ctx = SG(server_context); - if (ctx->current_request == 0) { + if (!ctx->has_active_request) { return ""; } - ctx->cookie_data = go_read_cookies(ctx->current_request); + ctx->cookie_data = go_read_cookies(thread_index); return ctx->cookie_data; } @@ -664,16 +659,13 @@ void frankenphp_register_bulk_variables(go_string known_variables[27], static void frankenphp_register_variables(zval *track_vars_array) { /* https://www.php.net/manual/en/reserved.variables.server.php */ - frankenphp_server_context *ctx = SG(server_context); /* In CGI mode, we consider the environment to be a part of the server * variables */ php_import_environment_variables(track_vars_array); - go_register_variables(ctx->current_request ? ctx->current_request - : ctx->main_request, - track_vars_array); + go_register_variables(thread_index, track_vars_array); } static void frankenphp_log_message(const char *message, int syslog_type_int) { @@ -732,6 +724,7 @@ static void set_thread_name(char *thread_name) { static void *php_thread(void *arg) { char thread_name[16] = {0}; snprintf(thread_name, 16, "php-%" PRIxPTR, (uintptr_t)arg); + thread_index = (uintptr_t)arg; set_thread_name(thread_name); #ifdef ZTS @@ -744,7 +737,7 @@ static void *php_thread(void *arg) { local_ctx = malloc(sizeof(frankenphp_server_context)); - while (go_handle_request()) { + while (go_handle_request(thread_index)) { } #ifdef ZTS diff --git a/frankenphp.go b/frankenphp.go index 7cd83d0..b24f132 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -35,7 +35,6 @@ import ( "net/http" "os" "runtime" - "runtime/cgo" "strconv" "strings" "sync" @@ -50,10 +49,8 @@ import ( ) type contextKeyStruct struct{} -type handleKeyStruct struct{} var contextKey = contextKeyStruct{} -var handleKey = handleKeyStruct{} var ( InvalidRequestError = errors.New("not a FrankenPHP request") @@ -125,15 +122,11 @@ type FrankenPHPContext struct { // Whether the request is already closed by us closed sync.Once - // whether the context is ready to receive requests - ready bool - responseWriter http.ResponseWriter exitStatus C.int - done chan interface{} - currentWorkerRequest cgo.Handle - startedAt time.Time + done chan interface{} + startedAt time.Time } func clientHasClosed(r *http.Request) bool { @@ -197,7 +190,6 @@ func NewRequestWithContext(r *http.Request, opts ...RequestOption) (*http.Reques fc.scriptFilename = sanitizedPathJoin(fc.documentRoot, fc.scriptName) c := context.WithValue(r.Context(), contextKey, fc) - c = context.WithValue(c, handleKey, Handles()) return r.WithContext(c), nil } @@ -338,6 +330,7 @@ func Init(options ...Option) error { shutdownWG.Add(1) done = make(chan struct{}) requestChan = make(chan *http.Request) + initPHPThreads(opt.numThreads) if C.frankenphp_init(C.int(opt.numThreads)) != 0 { return MainThreadCreationError @@ -386,6 +379,7 @@ func go_shutdown() { func drainThreads() { close(done) shutdownWG.Wait() + phpThreads = nil } func getLogger() *zap.Logger { @@ -395,7 +389,7 @@ func getLogger() *zap.Logger { return logger } -func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) error { +func updateServerContext(request *http.Request, create bool, isWorkerRequest bool) error { fc, ok := FromContext(request.Context()) if !ok { return InvalidRequestError @@ -437,21 +431,12 @@ func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) er } cRequestUri := C.CString(request.URL.RequestURI()) - - var rh cgo.Handle - if fc.responseWriter == nil { - h := cgo.NewHandle(request) - request.Context().Value(handleKey).(*handleList).AddHandle(h) - mrh = C.uintptr_t(h) - } else { - rh = cgo.NewHandle(request) - request.Context().Value(handleKey).(*handleList).AddHandle(rh) - } + isBootingAWorkerScript := fc.responseWriter == nil ret := C.frankenphp_update_server_context( C.bool(create), - C.uintptr_t(rh), - mrh, + C.bool(isWorkerRequest || isBootingAWorkerScript), + C.bool(!isBootingAWorkerScript), cMethod, cQueryString, @@ -490,10 +475,10 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error rc := requestChan // Detect if a worker is available to handle this request if !isWorker { - if v, ok := workersRequestChans.Load(fc.scriptFilename); ok { + if worker, ok := workers[fc.scriptFilename]; ok { isWorkerRequest = true metrics.StartWorkerRequest(fc.scriptFilename) - rc = v.(chan *http.Request) + rc = worker.requestChan } else { metrics.StartRequest() } @@ -517,14 +502,14 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error } //export go_handle_request -func go_handle_request() bool { +func go_handle_request(threadIndex C.uintptr_t) bool { select { case <-done: return false case r := <-requestChan: - h := cgo.NewHandle(r) - r.Context().Value(handleKey).(*handleList).AddHandle(h) + thread := phpThreads[threadIndex] + thread.mainRequest = r fc, ok := FromContext(r.Context()) if !ok { @@ -532,10 +517,10 @@ func go_handle_request() bool { } defer func() { maybeCloseContext(fc) - r.Context().Value(handleKey).(*handleList).FreeAll() + thread.mainRequest = nil }() - if err := updateServerContext(r, true, 0); err != nil { + if err := updateServerContext(r, true, false); err != nil { panic(err) } @@ -556,8 +541,8 @@ func maybeCloseContext(fc *FrankenPHPContext) { } //export go_ub_write -func go_ub_write(rh C.uintptr_t, cBuf *C.char, length C.int) (C.size_t, C.bool) { - r := cgo.Handle(rh).Value().(*http.Request) +func go_ub_write(threadIndex C.uintptr_t, cBuf *C.char, length C.int) (C.size_t, C.bool) { + r := phpThreads[threadIndex].getActiveRequest() fc, _ := FromContext(r.Context()) var writer io.Writer @@ -595,12 +580,11 @@ var headerKeyCache = func() otter.Cache[string, string] { }() //export go_register_variables -func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) { - r := cgo.Handle(rh).Value().(*http.Request) +func go_register_variables(threadIndex C.uintptr_t, trackVarsArray *C.zval) { + thread := phpThreads[threadIndex] + r := thread.getActiveRequest() fc := r.Context().Value(contextKey).(*FrankenPHPContext) - p := &runtime.Pinner{} - dynamicVariables := make([]C.php_variable, len(fc.env)+len(r.Header)) var l int @@ -622,8 +606,8 @@ func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) { kData := unsafe.StringData(k) vData := unsafe.StringData(v) - p.Pin(kData) - p.Pin(vData) + thread.Pin(kData) + thread.Pin(vData) dynamicVariables[l]._var = (*C.char)(unsafe.Pointer(kData)) dynamicVariables[l].data_len = C.size_t(len(v)) @@ -640,8 +624,8 @@ func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) { kData := unsafe.StringData(k) vData := unsafe.Pointer(unsafe.StringData(v)) - p.Pin(kData) - p.Pin(vData) + thread.Pin(kData) + thread.Pin(vData) dynamicVariables[l]._var = (*C.char)(unsafe.Pointer(kData)) dynamicVariables[l].data_len = C.size_t(len(v)) @@ -650,45 +634,43 @@ func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) { l++ } - knownVariables := computeKnownVariables(r, p) + knownVariables := computeKnownVariables(r, &thread.Pinner) dvsd := unsafe.SliceData(dynamicVariables) - p.Pin(dvsd) + thread.Pin(dvsd) C.frankenphp_register_bulk_variables(&knownVariables[0], dvsd, C.size_t(l), trackVarsArray) - p.Unpin() + thread.Unpin() fc.env = nil } //export go_apache_request_headers -func go_apache_request_headers(rh, mrh C.uintptr_t) (*C.go_string, C.size_t, C.uintptr_t) { - if rh == 0 { +func go_apache_request_headers(threadIndex C.uintptr_t, hasActiveRequest bool) (*C.go_string, C.size_t) { + thread := phpThreads[threadIndex] + + if !hasActiveRequest { // worker mode, not handling a request - mr := cgo.Handle(mrh).Value().(*http.Request) - mfc := mr.Context().Value(contextKey).(*FrankenPHPContext) + mfc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext) if c := mfc.logger.Check(zapcore.DebugLevel, "apache_request_headers() called in non-HTTP context"); c != nil { c.Write(zap.String("worker", mfc.scriptFilename)) } - return nil, 0, 0 + return nil, 0 } - r := cgo.Handle(rh).Value().(*http.Request) - - pinner := &runtime.Pinner{} - pinnerHandle := C.uintptr_t(cgo.NewHandle(pinner)) + r := thread.getActiveRequest() headers := make([]C.go_string, 0, len(r.Header)*2) for field, val := range r.Header { fd := unsafe.StringData(field) - pinner.Pin(fd) + thread.Pin(fd) cv := strings.Join(val, ", ") vd := unsafe.StringData(cv) - pinner.Pin(vd) + thread.Pin(vd) headers = append( headers, @@ -698,21 +680,14 @@ func go_apache_request_headers(rh, mrh C.uintptr_t) (*C.go_string, C.size_t, C.u } sd := unsafe.SliceData(headers) - pinner.Pin(sd) + thread.Pin(sd) - return sd, C.size_t(len(r.Header)), pinnerHandle + return sd, C.size_t(len(r.Header)) } //export go_apache_request_cleanup -func go_apache_request_cleanup(rh C.uintptr_t) { - if rh == 0 { - return - } - - h := cgo.Handle(rh) - p := h.Value().(*runtime.Pinner) - p.Unpin() - h.Delete() +func go_apache_request_cleanup(threadIndex C.uintptr_t) { + phpThreads[threadIndex].Unpin() } func addHeader(fc *FrankenPHPContext, cString *C.char, length C.int) { @@ -729,8 +704,8 @@ func addHeader(fc *FrankenPHPContext, cString *C.char, length C.int) { } //export go_write_headers -func go_write_headers(rh C.uintptr_t, status C.int, headers *C.zend_llist) { - r := cgo.Handle(rh).Value().(*http.Request) +func go_write_headers(threadIndex C.uintptr_t, status C.int, headers *C.zend_llist) { + r := phpThreads[threadIndex].getActiveRequest() fc := r.Context().Value(contextKey).(*FrankenPHPContext) if fc.responseWriter == nil { @@ -757,8 +732,8 @@ func go_write_headers(rh C.uintptr_t, status C.int, headers *C.zend_llist) { } //export go_sapi_flush -func go_sapi_flush(rh C.uintptr_t) bool { - r := cgo.Handle(rh).Value().(*http.Request) +func go_sapi_flush(threadIndex C.uintptr_t) bool { + r := phpThreads[threadIndex].getActiveRequest() fc := r.Context().Value(contextKey).(*FrankenPHPContext) if fc.responseWriter == nil || clientHasClosed(r) { @@ -775,8 +750,8 @@ func go_sapi_flush(rh C.uintptr_t) bool { } //export go_read_post -func go_read_post(rh C.uintptr_t, cBuf *C.char, countBytes C.size_t) (readBytes C.size_t) { - r := cgo.Handle(rh).Value().(*http.Request) +func go_read_post(threadIndex C.uintptr_t, cBuf *C.char, countBytes C.size_t) (readBytes C.size_t) { + r := phpThreads[threadIndex].getActiveRequest() p := unsafe.Slice((*byte)(unsafe.Pointer(cBuf)), countBytes) var err error @@ -790,8 +765,8 @@ func go_read_post(rh C.uintptr_t, cBuf *C.char, countBytes C.size_t) (readBytes } //export go_read_cookies -func go_read_cookies(rh C.uintptr_t) *C.char { - r := cgo.Handle(rh).Value().(*http.Request) +func go_read_cookies(threadIndex C.uintptr_t) *C.char { + r := phpThreads[threadIndex].getActiveRequest() cookies := r.Cookies() if len(cookies) == 0 { diff --git a/frankenphp.h b/frankenphp.h index 5feb71b..a0c5493 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -43,7 +43,7 @@ frankenphp_config frankenphp_get_config(); int frankenphp_init(int num_threads); int frankenphp_update_server_context( - bool create, uintptr_t current_request, uintptr_t main_request, + bool create, bool has_main_request, bool has_active_request, const char *request_method, char *query_string, zend_long content_length, char *path_translated, char *request_uri, const char *content_type, diff --git a/php_thread.go b/php_thread.go new file mode 100644 index 0000000..5b9c299 --- /dev/null +++ b/php_thread.go @@ -0,0 +1,33 @@ +package frankenphp + +// #include +import "C" +import ( + "net/http" + "runtime" +) + +var phpThreads []*phpThread + +type phpThread struct { + runtime.Pinner + + mainRequest *http.Request + workerRequest *http.Request + worker *worker +} + +func initPHPThreads(numThreads int) { + phpThreads = make([]*phpThread, 0, numThreads) + for i := 0; i < numThreads; i++ { + phpThreads = append(phpThreads, &phpThread{}) + } +} + +func (thread phpThread) getActiveRequest() *http.Request { + if thread.workerRequest != nil { + return thread.workerRequest + } + + return thread.mainRequest +} diff --git a/php_thread_test.go b/php_thread_test.go new file mode 100644 index 0000000..63afe4d --- /dev/null +++ b/php_thread_test.go @@ -0,0 +1,40 @@ +package frankenphp + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInitializeTwoPhpThreadsWithoutRequests(t *testing.T) { + initPHPThreads(2) + + assert.Len(t, phpThreads, 2) + assert.NotNil(t, phpThreads[0]) + assert.NotNil(t, phpThreads[1]) + assert.Nil(t, phpThreads[0].mainRequest) + assert.Nil(t, phpThreads[0].workerRequest) +} + +func TestMainRequestIsActiveRequest(t *testing.T) { + mainRequest := &http.Request{} + initPHPThreads(1) + thread := phpThreads[0] + + thread.mainRequest = mainRequest + + assert.Equal(t, mainRequest, thread.getActiveRequest()) +} + +func TestWorkerRequestIsActiveRequest(t *testing.T) { + mainRequest := &http.Request{} + workerRequest := &http.Request{} + initPHPThreads(1) + thread := phpThreads[0] + + thread.mainRequest = mainRequest + thread.workerRequest = workerRequest + + assert.Equal(t, workerRequest, thread.getActiveRequest()) +} diff --git a/smartpointer.go b/smartpointer.go deleted file mode 100644 index 780b47b..0000000 --- a/smartpointer.go +++ /dev/null @@ -1,64 +0,0 @@ -package frankenphp - -// #include -import "C" -import ( - "runtime/cgo" - "unsafe" -) - -/* -FrankenPHP is fairly complex because it shuffles handles/requests/contexts -between C and Go. This simplifies the lifecycle management of per-request -structures by allowing us to hold references until the end of the request -and ensure they are always cleaned up. -*/ - -// PointerList A list of pointers that can be freed at a later time -type pointerList struct { - Pointers []unsafe.Pointer -} - -// HandleList A list of pointers that can be freed at a later time -type handleList struct { - Handles []cgo.Handle -} - -// AddHandle Call when registering a handle for the very first time -func (h *handleList) AddHandle(handle cgo.Handle) { - h.Handles = append(h.Handles, handle) -} - -// AddPointer Call when creating a request-level C pointer for the very first time -func (p *pointerList) AddPointer(ptr unsafe.Pointer) { - p.Pointers = append(p.Pointers, ptr) -} - -// FreeAll frees all C pointers -func (p *pointerList) FreeAll() { - for _, ptr := range p.Pointers { - C.free(ptr) - } - p.Pointers = nil // To avoid dangling pointers -} - -// FreeAll frees all handles -func (h *handleList) FreeAll() { - for _, p := range h.Handles { - p.Delete() - } -} - -// Pointers Get a new list of pointers -func Pointers() *pointerList { - return &pointerList{ - Pointers: make([]unsafe.Pointer, 0), - } -} - -// Handles Get a new list of handles -func Handles() *handleList { - return &handleList{ - Handles: make([]cgo.Handle, 0, 8), - } -} diff --git a/worker.go b/worker.go index 5c91fec..584ae25 100644 --- a/worker.go +++ b/worker.go @@ -4,11 +4,9 @@ package frankenphp // #include "frankenphp.h" import "C" import ( - "errors" "fmt" "net/http" "path/filepath" - "runtime/cgo" "sync" "sync/atomic" "time" @@ -18,194 +16,174 @@ import ( "go.uber.org/zap/zapcore" ) +type worker struct { + fileName string + num int + env PreparedEnv + requestChan chan *http.Request +} + +const maxWorkerErrorBackoff = 1 * time.Second +const minWorkerErrorBackoff = 100 * time.Millisecond +const maxWorkerConsecutiveFailures = 60 + var ( - workersRequestChans sync.Map // map[fileName]chan *http.Request - workersReadyWG sync.WaitGroup - workerShutdownWG sync.WaitGroup - workersAreReady atomic.Bool - workersAreDone atomic.Bool - workersDone chan interface{} + workersReadyWG sync.WaitGroup + workerShutdownWG sync.WaitGroup + workersAreReady atomic.Bool + workersAreDone atomic.Bool + workersDone chan interface{} + workers map[string]*worker = make(map[string]*worker) ) -// TODO: start all the worker in parallel to reduce the boot time func initWorkers(opt []workerOpt) error { workersDone = make(chan interface{}) workersAreReady.Store(false) workersAreDone.Store(false) - for _, w := range opt { - if err := startWorkers(w.fileName, w.num, w.env); err != nil { + for _, o := range opt { + worker, err := newWorker(o) + if err != nil { return err } - } - - return nil -} - -func startWorkers(fileName string, nbWorkers int, env PreparedEnv) error { - absFileName, err := filepath.Abs(fileName) - if err != nil { - return fmt.Errorf("workers %q: %w", fileName, err) - } - - if _, ok := workersRequestChans.Load(absFileName); !ok { - workersRequestChans.Store(absFileName, make(chan *http.Request)) - } - - shutdownWG.Add(nbWorkers) - workerShutdownWG.Add(nbWorkers) - workersReadyWG.Add(nbWorkers) - - var ( - m sync.RWMutex - errs []error - ) - - if env == nil { - env = make(PreparedEnv, 1) - } - - env["FRANKENPHP_WORKER\x00"] = "1" - - l := getLogger() - - const maxBackoff = 1 * time.Second - const minBackoff = 10 * time.Millisecond - const maxConsecutiveFailures = 60 - - for i := 0; i < nbWorkers; i++ { - go func() { - defer shutdownWG.Done() - defer workerShutdownWG.Done() - backoff := minBackoff - failureCount := 0 - backingOffLock := sync.RWMutex{} - for { - // if the worker can stay up longer than backoff*2, it is probably an application error - upFunc := sync.Once{} - go func() { - backingOffLock.RLock() - wait := backoff * 2 - backingOffLock.RUnlock() - time.Sleep(wait) - upFunc.Do(func() { - backingOffLock.Lock() - defer backingOffLock.Unlock() - // if we come back to a stable state, reset the failure count - if backoff == minBackoff { - failureCount = 0 - } - - // earn back the backoff over time - if failureCount > 0 { - backoff = max(backoff/2, 100*time.Millisecond) - } - }) - }() - - // Create main dummy request - r, err := http.NewRequest(http.MethodGet, filepath.Base(absFileName), nil) - - metrics.StartWorker(absFileName) - - if err != nil { - panic(err) - } - r, err = NewRequestWithContext( - r, - WithRequestDocumentRoot(filepath.Dir(absFileName), false), - WithRequestPreparedEnv(env), - ) - if err != nil { - panic(err) - } - - if c := l.Check(zapcore.DebugLevel, "starting"); c != nil { - c.Write(zap.String("worker", absFileName), zap.Int("num", nbWorkers)) - } - - if err := ServeHTTP(nil, r); err != nil { - panic(err) - } - - fc := r.Context().Value(contextKey).(*FrankenPHPContext) - if fc.currentWorkerRequest != 0 { - // Terminate the pending HTTP request handled by the worker - maybeCloseContext(fc.currentWorkerRequest.Value().(*http.Request).Context().Value(contextKey).(*FrankenPHPContext)) - fc.currentWorkerRequest.Delete() - fc.currentWorkerRequest = 0 - } - - // TODO: make the max restart configurable - if !workersAreDone.Load() { - if fc.ready { - fc.ready = false - } - - if fc.exitStatus == 0 { - if c := l.Check(zapcore.InfoLevel, "restarting"); c != nil { - c.Write(zap.String("worker", absFileName)) - } - - // a normal restart resets the backoff and failure count - backingOffLock.Lock() - backoff = minBackoff - failureCount = 0 - backingOffLock.Unlock() - metrics.StopWorker(absFileName, StopReasonRestart) - } else { - // we will wait a few milliseconds to not overwhelm the logger in case of repeated unexpected terminations - if c := l.Check(zapcore.ErrorLevel, "unexpected termination, restarting"); c != nil { - backingOffLock.RLock() - c.Write(zap.String("worker", absFileName), zap.Int("failure_count", failureCount), zap.Int("exit_status", int(fc.exitStatus)), zap.Duration("waiting", backoff)) - backingOffLock.RUnlock() - } - - upFunc.Do(func() { - backingOffLock.Lock() - defer backingOffLock.Unlock() - // if we end up here, the worker has not been up for backoff*2 - // this is probably due to a syntax error or another fatal error - if failureCount >= maxConsecutiveFailures { - panic(fmt.Errorf("workers %q: too many consecutive failures", absFileName)) - } else { - failureCount += 1 - } - }) - backingOffLock.RLock() - wait := backoff - backingOffLock.RUnlock() - time.Sleep(wait) - backingOffLock.Lock() - backoff *= 2 - backoff = min(backoff, maxBackoff) - backingOffLock.Unlock() - metrics.StopWorker(absFileName, StopReasonCrash) - } - } else { - break - } - } - - metrics.StopWorker(absFileName, StopReasonShutdown) - - // TODO: check if the termination is expected - if c := l.Check(zapcore.DebugLevel, "terminated"); c != nil { - c.Write(zap.String("worker", absFileName)) - } - }() + workersReadyWG.Add(worker.num) + for i := 0; i < worker.num; i++ { + go worker.startNewWorkerThread() + } } workersReadyWG.Wait() workersAreReady.Store(true) - m.Lock() - defer m.Unlock() - if len(errs) == 0 { - return nil + return nil +} + +func newWorker(o workerOpt) (*worker, error) { + absFileName, err := filepath.Abs(o.fileName) + if err != nil { + return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err) } - return fmt.Errorf("workers %q: error while starting: %w", fileName, errors.Join(errs...)) + // if the worker already exists, return it + // it's necessary since we don't want to destroy the channels when restarting on file changes + if w, ok := workers[absFileName]; ok { + return w, nil + } + + if o.env == nil { + o.env = make(PreparedEnv, 1) + } + + o.env["FRANKENPHP_WORKER\x00"] = "1" + w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)} + workers[absFileName] = w + + return w, nil +} + +func (worker *worker) startNewWorkerThread() { + workerShutdownWG.Add(1) + defer workerShutdownWG.Done() + + backoff := minWorkerErrorBackoff + failureCount := 0 + backingOffLock := sync.RWMutex{} + + for { + + // if the worker can stay up longer than backoff*2, it is probably an application error + upFunc := sync.Once{} + go func() { + backingOffLock.RLock() + wait := backoff * 2 + backingOffLock.RUnlock() + time.Sleep(wait) + upFunc.Do(func() { + backingOffLock.Lock() + defer backingOffLock.Unlock() + // if we come back to a stable state, reset the failure count + if backoff == minWorkerErrorBackoff { + failureCount = 0 + } + + // earn back the backoff over time + if failureCount > 0 { + backoff = max(backoff/2, 100*time.Millisecond) + } + }) + }() + + metrics.StartWorker(worker.fileName) + + // Create main dummy request + r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil) + if err != nil { + panic(err) + } + + r, err = NewRequestWithContext( + r, + WithRequestDocumentRoot(filepath.Dir(worker.fileName), false), + WithRequestPreparedEnv(worker.env), + ) + if err != nil { + panic(err) + } + + if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil { + c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num)) + } + + if err := ServeHTTP(nil, r); err != nil { + panic(err) + } + + fc := r.Context().Value(contextKey).(*FrankenPHPContext) + + // if we are done, exit the loop that restarts the worker script + if workersAreDone.Load() { + break + } + + // on exit status 0 we just run the worker script again + if fc.exitStatus == 0 { + // TODO: make the max restart configurable + if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil { + c.Write(zap.String("worker", worker.fileName)) + } + metrics.StopWorker(worker.fileName, StopReasonRestart) + continue + } + + // on exit status 1 we log the error and apply an exponential backoff when restarting + upFunc.Do(func() { + backingOffLock.Lock() + defer backingOffLock.Unlock() + // if we end up here, the worker has not been up for backoff*2 + // this is probably due to a syntax error or another fatal error + if failureCount >= maxWorkerConsecutiveFailures { + panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName)) + } else { + failureCount += 1 + } + }) + backingOffLock.RLock() + wait := backoff + backingOffLock.RUnlock() + time.Sleep(wait) + backingOffLock.Lock() + backoff *= 2 + backoff = min(backoff, maxWorkerErrorBackoff) + backingOffLock.Unlock() + metrics.StopWorker(worker.fileName, StopReasonCrash) + } + + metrics.StopWorker(worker.fileName, StopReasonShutdown) + + // TODO: check if the termination is expected + if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil { + c.Write(zap.String("worker", worker.fileName)) + } } func stopWorkers() { @@ -217,7 +195,7 @@ func drainWorkers() { watcher.DrainWatcher() stopWorkers() workerShutdownWG.Wait() - workersRequestChans = sync.Map{} + workers = make(map[string]*worker) } func restartWorkersOnFileChanges(workerOpts []workerOpt) error { @@ -249,82 +227,78 @@ func restartWorkers(workerOpts []workerOpt) { logger.Info("workers restarted successfully") } -//export go_frankenphp_worker_ready -func go_frankenphp_worker_ready(mrh C.uintptr_t) { - mainRequest := cgo.Handle(mrh).Value().(*http.Request) - fc := mainRequest.Context().Value(contextKey).(*FrankenPHPContext) - fc.ready = true +func assignThreadToWorker(thread *phpThread) { + fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext) metrics.ReadyWorker(fc.scriptFilename) + worker, ok := workers[fc.scriptFilename] + if !ok { + panic("worker not found for script: " + fc.scriptFilename) + } + thread.worker = worker if !workersAreReady.Load() { workersReadyWG.Done() } + // TODO: we can also store all threads assigned to the worker if needed } //export go_frankenphp_worker_handle_request_start -func go_frankenphp_worker_handle_request_start(mrh C.uintptr_t) C.uintptr_t { - mainRequest := cgo.Handle(mrh).Value().(*http.Request) - fc := mainRequest.Context().Value(contextKey).(*FrankenPHPContext) +func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool { + thread := phpThreads[threadIndex] - v, ok := workersRequestChans.Load(fc.scriptFilename) - if !ok { - // Probably shutting down - return 0 + // we assign a worker to the thread if it doesn't have one already + if thread.worker == nil { + assignThreadToWorker(thread) } - rc := v.(chan *http.Request) - l := getLogger() - - if c := l.Check(zapcore.DebugLevel, "waiting for request"); c != nil { - c.Write(zap.String("worker", fc.scriptFilename)) + if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil { + c.Write(zap.String("worker", thread.worker.fileName)) } var r *http.Request select { case <-workersDone: - if c := l.Check(zapcore.DebugLevel, "shutting down"); c != nil { - c.Write(zap.String("worker", fc.scriptFilename)) + if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil { + c.Write(zap.String("worker", thread.worker.fileName)) } + thread.worker = nil executePHPFunction("opcache_reset") - return 0 - case r = <-rc: + return C.bool(false) + case r = <-thread.worker.requestChan: } - fc.currentWorkerRequest = cgo.NewHandle(r) - r.Context().Value(handleKey).(*handleList).AddHandle(fc.currentWorkerRequest) + thread.workerRequest = r - if c := l.Check(zapcore.DebugLevel, "request handling started"); c != nil { - c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI)) + if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil { + c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI)) } - if err := updateServerContext(r, false, mrh); err != nil { + if err := updateServerContext(r, false, true); err != nil { // Unexpected error - if c := l.Check(zapcore.DebugLevel, "unexpected error"); c != nil { - c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI), zap.Error(err)) + if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil { + c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err)) } - return 0 + return C.bool(false) } - - return C.uintptr_t(fc.currentWorkerRequest) + return C.bool(true) } //export go_frankenphp_finish_request -func go_frankenphp_finish_request(mrh, rh C.uintptr_t, deleteHandle bool) { - rHandle := cgo.Handle(rh) - r := rHandle.Value().(*http.Request) +func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) { + thread := phpThreads[threadIndex] + r := thread.getActiveRequest() fc := r.Context().Value(contextKey).(*FrankenPHPContext) - if deleteHandle { - r.Context().Value(handleKey).(*handleList).FreeAll() - cgo.Handle(mrh).Value().(*http.Request).Context().Value(contextKey).(*FrankenPHPContext).currentWorkerRequest = 0 + if isWorkerRequest { + thread.workerRequest = nil } maybeCloseContext(fc) if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil { var fields []zap.Field - if mrh == 0 { + if isWorkerRequest { fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI)) } else { fields = append(fields, zap.String("url", r.RequestURI))