diff --git a/bin/WorkerState.php b/bin/WorkerState.php index b6abe92..1a5252a 100644 --- a/bin/WorkerState.php +++ b/bin/WorkerState.php @@ -29,4 +29,12 @@ class WorkerState public $lastRequestTime; public $ready = false; + + public int $handledRequests = 0; + + public int $maxRequests = 0; + + public bool $recycleRequested = false; + + public bool $recycleTriggered = false; } diff --git a/bin/swoole-server b/bin/swoole-server index 891bc91..7693b71 100755 --- a/bin/swoole-server +++ b/bin/swoole-server @@ -9,6 +9,7 @@ use Laravel\Octane\Swoole\Handlers\OnWorkerStop; use Laravel\Octane\Swoole\ServerStateFile; use Laravel\Octane\Swoole\SwooleExtension; use Laravel\Octane\Swoole\WorkerState; +use Laravel\Octane\Swoole\Actions\StopWorkerIfMaxRequestsExceeded; use Laravel\Octane\Swoole\Coroutine\Context; use Laravel\Octane\Swoole\Coroutine\Monitor; use Swoole\Coroutine; @@ -195,6 +196,8 @@ $server->on('request', function ($request, $response) use ($server, $workerState Monitor::unregisterRequestCoroutine($cid); + (new StopWorkerIfMaxRequestsExceeded($server, $workerState))(); + // Ensure coroutine context is cleared (Swoole does this automatically, but explicit is safe) Context::clear(); } diff --git a/composer.json b/composer.json index 7d580dc..f38c879 100644 --- a/composer.json +++ b/composer.json @@ -64,6 +64,9 @@ "spiral/roadrunner-http": "<3.3.0" }, "autoload": { + "classmap": [ + "bin/WorkerState.php" + ], "psr-4": { "Laravel\\Octane\\": "src" } diff --git a/src/Commands/StartSwooleCommand.php b/src/Commands/StartSwooleCommand.php index 929f0c1..93a87f5 100644 --- a/src/Commands/StartSwooleCommand.php +++ b/src/Commands/StartSwooleCommand.php @@ -143,7 +143,7 @@ protected function writeServerStateFile( */ protected function defaultServerOptions(SwooleExtension $extension) { - return array_merge([ + $options = array_merge([ // Enable coroutine support for async I/O operations 'enable_coroutine' => true, @@ -156,11 +156,14 @@ protected function defaultServerOptions(SwooleExtension $extension) // Log level: INFO in local, ERROR in production for better performance 'log_level' => app()->environment('local') ? SWOOLE_LOG_INFO : SWOOLE_LOG_ERROR, - // Max requests per worker before restart (prevents memory leaks) - 'max_request' => $this->option('max-requests'), - - // Randomize worker recycling so all busy workers do not restart together. - 'max_request_grace' => $this->maxRequestGrace(), + // Disable Swoole's built-in HTTP worker recycling. In coroutine + // mode it can close an upstream request while Nginx is still + // reading the response. The bin/swoole-server request loop applies + // the same max-requests limit cooperatively after response cleanup. + 'max_request' => 0, + + // Randomization is applied by the cooperative recycler. + 'max_request_grace' => 0, // Max size of request/response package (10MB) 'package_max_length' => 10 * 1024 * 1024, @@ -205,6 +208,13 @@ protected function defaultServerOptions(SwooleExtension $extension) // Number of worker processes 'worker_num' => $this->workerCount($extension), ], config('octane.swoole.options', [])); + + // Keep HTTP worker recycling under the cooperative request-loop guard + // even when applications publish custom low-level Swoole options. + $options['max_request'] = 0; + $options['max_request_grace'] = 0; + + return $options; } /** diff --git a/src/Swoole/Actions/StopWorkerIfMaxRequestsExceeded.php b/src/Swoole/Actions/StopWorkerIfMaxRequestsExceeded.php new file mode 100644 index 0000000..3259908 --- /dev/null +++ b/src/Swoole/Actions/StopWorkerIfMaxRequestsExceeded.php @@ -0,0 +1,60 @@ +workerState->maxRequests <= 0 || $this->workerState->recycleTriggered) { + return false; + } + + $this->workerState->handledRequests++; + + if ($this->workerState->handledRequests >= $this->workerState->maxRequests) { + $this->workerState->recycleRequested = true; + } + + if (! $this->workerState->recycleRequested || Monitor::getActiveRequestCount() > 0) { + return false; + } + + $this->workerState->recycleTriggered = true; + + try { + $stopped = $this->server->stop($this->workerState->workerId); + } catch (Throwable $e) { + $this->workerState->recycleTriggered = false; + + error_log("⚠️ Failed to recycle worker #{$this->workerState->workerId}: {$e->getMessage()}"); + + return false; + } + + if ($stopped === false) { + $this->workerState->recycleTriggered = false; + + error_log("⚠️ Swoole refused to recycle worker #{$this->workerState->workerId}"); + + return false; + } + + error_log( + "♻️ WORKER #{$this->workerState->workerId} recycling after ". + "{$this->workerState->handledRequests}/{$this->workerState->maxRequests} handled requests" + ); + + return true; + } +} diff --git a/src/Swoole/Handlers/OnWorkerStart.php b/src/Swoole/Handlers/OnWorkerStart.php index 156fdd7..38219af 100644 --- a/src/Swoole/Handlers/OnWorkerStart.php +++ b/src/Swoole/Handlers/OnWorkerStart.php @@ -57,6 +57,7 @@ public function __invoke($server, int $workerId) $this->workerState->server = $server; $this->workerState->workerId = $workerId; $this->workerState->workerPid = posix_getpid(); + $this->configureCooperativeMaxRequests($server, $workerId); $this->workerState->worker = $this->bootWorker($server, $workerId); $this->dispatchServerTickTaskEverySecond($server); @@ -74,6 +75,27 @@ public function __invoke($server, int $workerId) error_log("✅ {$workerType} #{$workerId} (PID: {$this->workerState->workerPid}) initialized and ready!"); } + protected function configureCooperativeMaxRequests($server, int $workerId): void + { + $workerNum = $server->setting['worker_num'] ?? 1; + + if ($workerId >= $workerNum) { + return; + } + + $maxRequests = max(0, (int) ($this->serverState['maxRequests'] ?? 0)); + $maxRequestGrace = max(0, (int) ($this->serverState['maxRequestGrace'] ?? 0)); + + if ($maxRequests > 0 && $maxRequestGrace > 0) { + $maxRequests += random_int(0, $maxRequestGrace); + } + + $this->workerState->handledRequests = 0; + $this->workerState->maxRequests = $maxRequests; + $this->workerState->recycleRequested = false; + $this->workerState->recycleTriggered = false; + } + /** * Boot the Octane worker and application. * diff --git a/tests/Feature/SwooleCooperativeRecycleIntegrationTest.php b/tests/Feature/SwooleCooperativeRecycleIntegrationTest.php new file mode 100644 index 0000000..6291078 --- /dev/null +++ b/tests/Feature/SwooleCooperativeRecycleIntegrationTest.php @@ -0,0 +1,344 @@ +markTestSkipped('Swoole extension is required for the integration recycle probe.'); + } + + if (! function_exists('proc_open')) { + $this->markTestSkipped('proc_open is required for the integration recycle probe.'); + } + } + + protected function tearDown(): void + { + foreach (array_reverse($this->servers) as $server) { + $this->stopServer($server); + } + + parent::tearDown(); + } + + public function test_serial_requests_keep_returning_200_across_cooperative_worker_recycle(): void + { + $server = $this->startServer(maxRequests: 3); + $responses = []; + + for ($i = 1; $i <= 6; $i++) { + $responses[] = $this->getJson($server['port'], "/serial-{$i}"); + } + + $this->assertCount(6, $responses); + $this->assertSame(['ok'], array_values(array_unique(array_column($responses, 'status')))); + + $pids = array_values(array_unique(array_column($responses, 'pid'))); + + $this->assertGreaterThanOrEqual(2, count($pids), 'Expected worker PID to change after cooperative recycle.'); + $this->assertStringContainsString('recycled=1', file_get_contents($server['eventLog'])); + $this->assertGreaterThanOrEqual(2, substr_count(file_get_contents($server['eventLog']), 'worker-start:')); + } + + public function test_overlapping_requests_finish_before_worker_recycles(): void + { + if (! function_exists('curl_multi_init')) { + $this->markTestSkipped('curl extension is required for concurrent request probing.'); + } + + $server = $this->startServer(maxRequests: 1); + $responses = $this->getJsonConcurrently($server['port'], [ + '/slow-a?sleep_ms=250', + '/slow-b?sleep_ms=500', + ]); + + $this->assertCount(2, $responses); + $this->assertSame(['ok'], array_values(array_unique(array_column($responses, 'status')))); + $this->assertSame(2, count($responses), 'Both overlapping requests must receive complete JSON responses.'); + + $this->waitForLogCount($server, 'worker-start:', 2); + + $eventLog = file_get_contents($server['eventLog']); + + $this->assertStringContainsString('active=1:requested=1:triggered=0:recycled=0', $eventLog); + $this->assertStringContainsString('active=0:requested=1:triggered=1:recycled=1', $eventLog); + $this->assertGreaterThanOrEqual(2, substr_count($eventLog, 'worker-start:')); + } + + public function test_batched_concurrent_requests_survive_multiple_worker_recycles(): void + { + if (! function_exists('curl_multi_init')) { + $this->markTestSkipped('curl extension is required for concurrent request probing.'); + } + + $server = $this->startServer(maxRequests: 3); + $responses = []; + + for ($batch = 1; $batch <= 4; $batch++) { + $paths = []; + + for ($request = 1; $request <= 8; $request++) { + $paths[] = "/batch-{$batch}-{$request}?sleep_ms=50"; + } + + array_push($responses, ...$this->getJsonConcurrently($server['port'], $paths)); + $this->getJson($server['port'], "/batch-{$batch}-health"); + } + + $this->assertCount(32, $responses); + $this->assertSame(['ok'], array_values(array_unique(array_column($responses, 'status')))); + + $this->waitForLogCount($server, 'recycled=1', 4); + + $eventLog = file_get_contents($server['eventLog']); + + $this->assertGreaterThanOrEqual(4, substr_count($eventLog, 'recycled=1')); + $this->assertStringNotContainsString('active=1:requested=1:triggered=1', $eventLog); + } + + private function startServer(int $maxRequests): array + { + $port = $this->findFreePort(); + $prefix = tempnam(sys_get_temp_dir(), 'octane-recycle-'); + + if ($prefix === false) { + $this->fail('Unable to allocate temporary files for Swoole recycle probe.'); + } + + @unlink($prefix); + + $readyFile = $prefix.'.ready'; + $eventLog = $prefix.'.events.log'; + $stdout = $prefix.'.stdout.log'; + $stderr = $prefix.'.stderr.log'; + $fixture = dirname(__DIR__).'/Fixtures/swoole_cooperative_recycle_server.php'; + $command = sprintf( + '%s %s %d %d %s %s', + escapeshellarg(PHP_BINARY), + escapeshellarg($fixture), + $port, + $maxRequests, + escapeshellarg($readyFile), + escapeshellarg($eventLog), + ); + + $process = proc_open($command, [ + 0 => ['pipe', 'r'], + 1 => ['file', $stdout, 'a'], + 2 => ['file', $stderr, 'a'], + ], $pipes); + + if (! is_resource($process)) { + $this->fail('Unable to start Swoole recycle probe process.'); + } + + fclose($pipes[0]); + + $server = compact('process', 'port', 'readyFile', 'eventLog', 'stdout', 'stderr'); + $this->servers[] = $server; + + $this->waitForServer($server); + + return $server; + } + + private function stopServer(array $server): void + { + if (is_resource($server['process'])) { + $this->getRaw($server['port'], '/shutdown', attempts: 1); + + $deadline = microtime(true) + 2.0; + + do { + $status = proc_get_status($server['process']); + + if (! $status['running']) { + proc_close($server['process']); + $this->cleanupServerFiles($server); + + return; + } + + usleep(50000); + } while (microtime(true) < $deadline); + + proc_terminate($server['process']); + usleep(100000); + + $status = proc_get_status($server['process']); + + if ($status['running']) { + proc_terminate($server['process'], 9); + } + + proc_close($server['process']); + } + + $this->cleanupServerFiles($server); + } + + private function cleanupServerFiles(array $server): void + { + foreach (['readyFile', 'eventLog', 'stdout', 'stderr'] as $key) { + if (isset($server[$key]) && is_string($server[$key]) && is_file($server[$key])) { + @unlink($server[$key]); + } + } + } + + private function waitForServer(array $server): void + { + $deadline = microtime(true) + 5.0; + + do { + $status = proc_get_status($server['process']); + + if (! $status['running']) { + $stderr = is_file($server['stderr']) ? file_get_contents($server['stderr']) : ''; + $this->fail('Swoole recycle probe exited before becoming ready: '.$stderr); + } + + if ( + is_file($server['readyFile']) + && is_file($server['eventLog']) + && str_contains(file_get_contents($server['eventLog']), 'worker-start:') + ) { + return; + } + + usleep(50000); + } while (microtime(true) < $deadline); + + $stderr = is_file($server['stderr']) ? file_get_contents($server['stderr']) : ''; + $this->fail('Timed out waiting for Swoole recycle probe to start: '.$stderr); + } + + private function waitForLogCount(array $server, string $needle, int $minimumCount): void + { + $deadline = microtime(true) + 5.0; + + do { + $log = is_file($server['eventLog']) ? file_get_contents($server['eventLog']) : ''; + + if (substr_count($log, $needle) >= $minimumCount) { + return; + } + + usleep(50000); + } while (microtime(true) < $deadline); + + $log = is_file($server['eventLog']) ? file_get_contents($server['eventLog']) : ''; + + $this->fail("Timed out waiting for {$minimumCount} occurrences of {$needle}. Current log:\n{$log}"); + } + + private function getJson(int $port, string $path, int $attempts = 20): array + { + $raw = $this->getRaw($port, $path, $attempts); + + $this->assertNotNull($raw, "Request {$path} did not return a response."); + + $decoded = json_decode($raw, true); + + $this->assertIsArray($decoded, "Request {$path} returned non-JSON body: {$raw}"); + + return $decoded; + } + + private function getRaw(int $port, string $path, int $attempts = 20): ?string + { + $url = "http://127.0.0.1:{$port}{$path}"; + + for ($attempt = 1; $attempt <= $attempts; $attempt++) { + $context = stream_context_create([ + 'http' => [ + 'ignore_errors' => true, + 'timeout' => 2, + ], + ]); + $raw = @file_get_contents($url, false, $context); + + if (is_string($raw) && $raw !== '') { + return $raw; + } + + usleep(50000); + } + + return null; + } + + private function getJsonConcurrently(int $port, array $paths): array + { + $multi = curl_multi_init(); + $handles = []; + + foreach ($paths as $path) { + $handle = curl_init("http://127.0.0.1:{$port}{$path}"); + + curl_setopt_array($handle, [ + CURLOPT_RETURNTRANSFER => true, + CURLOPT_CONNECTTIMEOUT_MS => 500, + CURLOPT_TIMEOUT_MS => 3000, + ]); + + curl_multi_add_handle($multi, $handle); + $handles[] = $handle; + } + + do { + $status = curl_multi_exec($multi, $running); + + if ($running > 0) { + curl_multi_select($multi, 0.05); + } + } while ($running > 0 && $status === CURLM_OK); + + $responses = []; + + foreach ($handles as $handle) { + $body = curl_multi_getcontent($handle); + $httpCode = curl_getinfo($handle, CURLINFO_HTTP_CODE); + $error = curl_error($handle); + + curl_multi_remove_handle($multi, $handle); + curl_close($handle); + + $this->assertSame(200, $httpCode, "Concurrent request failed with HTTP {$httpCode}: {$error}"); + + $decoded = json_decode($body, true); + + $this->assertIsArray($decoded, "Concurrent request returned non-JSON body: {$body}"); + + $responses[] = $decoded; + } + + curl_multi_close($multi); + + return $responses; + } + + private function findFreePort(): int + { + $socket = stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr); + + if (! is_resource($socket)) { + $this->fail("Unable to allocate free port: {$errstr} ({$errno})"); + } + + $name = stream_socket_get_name($socket, false); + + fclose($socket); + + return (int) substr(strrchr($name, ':'), 1); + } +} diff --git a/tests/Fixtures/swoole_cooperative_recycle_server.php b/tests/Fixtures/swoole_cooperative_recycle_server.php new file mode 100644 index 0000000..124927d --- /dev/null +++ b/tests/Fixtures/swoole_cooperative_recycle_server.php @@ -0,0 +1,120 @@ + \n"); + exit(2); +} + +$server = new Server('127.0.0.1', $port, SWOOLE_PROCESS); +$workerState = new WorkerState(); + +$log = static function (string $message) use ($eventLog): void { + file_put_contents($eventLog, $message.PHP_EOL, FILE_APPEND); +}; + +$server->set([ + 'worker_num' => 1, + 'task_worker_num' => 0, + 'enable_coroutine' => true, + 'daemonize' => false, + 'log_file' => sys_get_temp_dir().'/octane-coroutine-recycle-swoole.log', + 'log_level' => SWOOLE_LOG_ERROR, + 'max_request' => 0, + 'max_request_grace' => 0, + 'reload_async' => true, + 'max_wait_time' => 5, +]); + +$server->on('start', static function () use ($readyFile, $log): void { + file_put_contents($readyFile, (string) getmypid()); + $log('server-start:'.getmypid()); +}); + +$server->on('workerstart', static function (Server $server, int $workerId) use ($workerState, $maxRequests, $log): void { + $workerState->server = $server; + $workerState->workerId = $workerId; + $workerState->workerPid = posix_getpid(); + $workerState->handledRequests = 0; + $workerState->maxRequests = $maxRequests; + $workerState->recycleRequested = false; + $workerState->recycleTriggered = false; + + Monitor::clearRequestCoroutines(); + + $log("worker-start:{$workerId}:".posix_getpid()); +}); + +$server->on('workerstop', static function (Server $server, int $workerId) use ($log): void { + $log("worker-stop:{$workerId}:".posix_getpid().':active='.Monitor::getActiveRequestCount()); +}); + +$server->on('request', static function (Request $request, Response $response) use ($server, $workerState, $log): void { + $cid = Coroutine::getCid(); + + Monitor::registerRequestCoroutine($cid); + + try { + $path = $request->server['request_uri'] ?? '/'; + + if ($path === '/shutdown') { + $response->status(200); + $response->end('ok'); + Timer::after(50, static fn () => $server->shutdown()); + + return; + } + + $sleepMs = max(0, min(2000, (int) ($request->get['sleep_ms'] ?? 0))); + + if ($sleepMs > 0) { + Coroutine::sleep($sleepMs / 1000); + } + + $body = [ + 'status' => 'ok', + 'path' => $path, + 'pid' => posix_getpid(), + 'worker_id' => $workerState->workerId, + 'handled_before' => $workerState->handledRequests, + 'active_before_end' => Monitor::getActiveRequestCount(), + ]; + + $response->status(200); + $response->header('Content-Type', 'application/json'); + $response->end(json_encode($body, JSON_THROW_ON_ERROR)); + } finally { + Monitor::unregisterRequestCoroutine($cid); + + $didRecycle = (new StopWorkerIfMaxRequestsExceeded($server, $workerState))(); + + $log(sprintf( + 'request-finish:pid=%d:handled=%d:max=%d:active=%d:requested=%d:triggered=%d:recycled=%d', + posix_getpid(), + $workerState->handledRequests, + $workerState->maxRequests, + Monitor::getActiveRequestCount(), + $workerState->recycleRequested ? 1 : 0, + $workerState->recycleTriggered ? 1 : 0, + $didRecycle ? 1 : 0, + )); + } +}); + +$server->start(); diff --git a/tests/Unit/OnWorkerStartPoolFreeTest.php b/tests/Unit/OnWorkerStartPoolFreeTest.php index 2a3e2d7..085c94b 100644 --- a/tests/Unit/OnWorkerStartPoolFreeTest.php +++ b/tests/Unit/OnWorkerStartPoolFreeTest.php @@ -81,6 +81,59 @@ public function test_http_worker_boot_uses_single_worker_and_does_not_create_poo $this->assertSame($this->app, $container->getBaseApplication()); $this->assertSame($container, $handler->preparedSandbox); } + + public function test_http_worker_uses_cooperative_max_request_limit_with_grace(): void + { + $workerState = new \Laravel\Octane\Swoole\WorkerState(); + $handler = $this->makeHandler($workerState, [ + 'maxRequests' => 500, + 'maxRequestGrace' => 1000, + ]); + + $handler->configureCooperativeMaxRequestsForTest((object) ['setting' => ['worker_num' => 4]], 1); + + $this->assertSame(0, $workerState->handledRequests); + $this->assertGreaterThanOrEqual(500, $workerState->maxRequests); + $this->assertLessThanOrEqual(1500, $workerState->maxRequests); + $this->assertFalse($workerState->recycleRequested); + $this->assertFalse($workerState->recycleTriggered); + } + + public function test_task_workers_do_not_use_http_cooperative_max_request_limit(): void + { + $workerState = new \Laravel\Octane\Swoole\WorkerState(); + $handler = $this->makeHandler($workerState, [ + 'maxRequests' => 500, + 'maxRequestGrace' => 1000, + ]); + + $handler->configureCooperativeMaxRequestsForTest((object) ['setting' => ['worker_num' => 4]], 4); + + $this->assertSame(0, $workerState->maxRequests); + } + + private function makeHandler(\Laravel\Octane\Swoole\WorkerState $workerState, array $serverState): TestableOnWorkerStart + { + $fakeWorker = $this->createMock(Worker::class); + $fakeWorker->method('application')->willReturn($this->app); + $fakeWorker->method('getClient')->willReturn(new FakeClient([])); + + return new TestableOnWorkerStart( + new SwooleExtension(), + $this->app->basePath(), + array_merge([ + 'appName' => 'octane-coroutine-test', + 'octaneConfig' => [ + 'swoole' => [ + 'pool' => ['size' => 32], + ], + ], + ], $serverState), + $workerState, + false, + $fakeWorker, + ); + } } class TestableOnWorkerStart extends OnWorkerStart @@ -106,6 +159,11 @@ public function bootHttpWorkerForTest($server, int $workerId): ?Worker return $this->bootHttpWorker($server, $workerId); } + public function configureCooperativeMaxRequestsForTest($server, int $workerId): void + { + $this->configureCooperativeMaxRequests($server, $workerId); + } + public function createPoolWorker($server, int $workerId, int $poolIndex): Worker { $this->createPoolWorkerInvocations[] = [$workerId, $poolIndex]; diff --git a/tests/Unit/StartSwooleCommandOptionsTest.php b/tests/Unit/StartSwooleCommandOptionsTest.php index 3c38f15..96a4b5a 100644 --- a/tests/Unit/StartSwooleCommandOptionsTest.php +++ b/tests/Unit/StartSwooleCommandOptionsTest.php @@ -19,12 +19,13 @@ public function test_swoole_options_include_configured_max_request_grace(): void $options = $command->defaultServerOptionsForTest(new SwooleExtension()); - $this->assertSame(1000, $options['max_request']); - $this->assertSame(250, $options['max_request_grace']); + $this->assertSame(0, $options['max_request']); + $this->assertSame(0, $options['max_request_grace']); + $this->assertSame(1000, $options['task_max_request']); $this->assertSame(250, $options['task_max_request_grace']); } - public function test_swoole_options_default_to_ten_percent_max_request_grace(): void + public function test_swoole_task_options_default_to_ten_percent_max_request_grace(): void { config(['octane.swoole.max_request_grace' => null]); @@ -35,9 +36,35 @@ public function test_swoole_options_default_to_ten_percent_max_request_grace(): $options = $command->defaultServerOptionsForTest(new SwooleExtension()); - $this->assertSame(10, $options['max_request_grace']); + $this->assertSame(0, $options['max_request']); + $this->assertSame(0, $options['max_request_grace']); + $this->assertSame(100, $options['task_max_request']); $this->assertSame(10, $options['task_max_request_grace']); } + + public function test_custom_swoole_options_cannot_reenable_builtin_http_max_request_recycling(): void + { + config([ + 'octane.swoole.options' => [ + 'max_request' => 500, + 'max_request_grace' => 1000, + 'task_max_request' => 250, + 'task_max_request_grace' => 25, + ], + ]); + + $command = new TestableStartSwooleCommandForOptions([ + 'max-requests' => 100, + 'max-request-grace' => null, + ]); + + $options = $command->defaultServerOptionsForTest(new SwooleExtension()); + + $this->assertSame(0, $options['max_request']); + $this->assertSame(0, $options['max_request_grace']); + $this->assertSame(250, $options['task_max_request']); + $this->assertSame(25, $options['task_max_request_grace']); + } } class TestableStartSwooleCommandForOptions extends StartSwooleCommand diff --git a/tests/Unit/StopWorkerIfMaxRequestsExceededTest.php b/tests/Unit/StopWorkerIfMaxRequestsExceededTest.php new file mode 100644 index 0000000..a11bc88 --- /dev/null +++ b/tests/Unit/StopWorkerIfMaxRequestsExceededTest.php @@ -0,0 +1,176 @@ +workerState(maxRequests: 0); + + $this->assertFalse((new StopWorkerIfMaxRequestsExceeded($server, $workerState))()); + $this->assertSame(0, $workerState->handledRequests); + $this->assertSame([], $server->stoppedWorkerIds); + } + + public function test_it_stops_worker_after_max_requests_when_no_requests_are_active(): void + { + $server = new FakeWorkerRecycleServer(); + $workerState = $this->workerState(maxRequests: 3, workerId: 7); + $action = new StopWorkerIfMaxRequestsExceeded($server, $workerState); + + $this->assertFalse($action()); + $this->assertFalse($action()); + $this->assertTrue($action()); + + $this->assertSame(3, $workerState->handledRequests); + $this->assertTrue($workerState->recycleRequested); + $this->assertTrue($workerState->recycleTriggered); + $this->assertSame([7], $server->stoppedWorkerIds); + } + + public function test_it_only_stops_once_after_recycle_has_been_triggered(): void + { + $server = new FakeWorkerRecycleServer(); + $workerState = $this->workerState(maxRequests: 1, workerId: 2); + $action = new StopWorkerIfMaxRequestsExceeded($server, $workerState); + + $this->assertTrue($action()); + $this->assertFalse($action()); + + $this->assertSame(1, $workerState->handledRequests); + $this->assertSame([2], $server->stoppedWorkerIds); + } + + public function test_it_waits_for_other_active_request_coroutines_before_stopping(): void + { + $server = new FakeWorkerRecycleServer(); + $workerState = $this->workerState(maxRequests: 2, workerId: 4); + $action = new StopWorkerIfMaxRequestsExceeded($server, $workerState); + + Monitor::registerRequestCoroutine(101); + + $this->assertFalse($action()); + $this->assertFalse($action()); + + $this->assertTrue($workerState->recycleRequested); + $this->assertFalse($workerState->recycleTriggered); + $this->assertSame([], $server->stoppedWorkerIds); + + Monitor::unregisterRequestCoroutine(101); + + $this->assertTrue($action()); + $this->assertSame([4], $server->stoppedWorkerIds); + $this->assertSame(0, Monitor::getActiveRequestCount()); + } + + public function test_it_recovers_if_swoole_refuses_to_stop_the_worker(): void + { + $server = new FakeWorkerRecycleServer(stopResult: false); + $workerState = $this->workerState(maxRequests: 1, workerId: 5); + $action = new StopWorkerIfMaxRequestsExceeded($server, $workerState); + + $this->assertFalse($action()); + + $this->assertTrue($workerState->recycleRequested); + $this->assertFalse($workerState->recycleTriggered); + $this->assertSame([5], $server->stoppedWorkerIds); + + $server->stopResult = true; + + $this->assertTrue($action()); + $this->assertTrue($workerState->recycleTriggered); + $this->assertSame([5, 5], $server->stoppedWorkerIds); + } + + public function test_it_recovers_if_swoole_stop_throws(): void + { + $server = new FakeWorkerRecycleServer(stopException: new RuntimeException('stop failed')); + $workerState = $this->workerState(maxRequests: 1, workerId: 6); + $action = new StopWorkerIfMaxRequestsExceeded($server, $workerState); + + $this->assertFalse($action()); + + $this->assertTrue($workerState->recycleRequested); + $this->assertFalse($workerState->recycleTriggered); + $this->assertSame([6], $server->stoppedWorkerIds); + + $server->stopException = null; + + $this->assertTrue($action()); + $this->assertTrue($workerState->recycleTriggered); + $this->assertSame([6, 6], $server->stoppedWorkerIds); + } + + public function test_repeated_request_completion_checks_do_not_leak_monitor_state(): void + { + $server = new FakeWorkerRecycleServer(); + $workerState = $this->workerState(maxRequests: 1000, workerId: 8); + $action = new StopWorkerIfMaxRequestsExceeded($server, $workerState); + + for ($cid = 1; $cid <= 1000; $cid++) { + Monitor::registerRequestCoroutine($cid); + Monitor::unregisterRequestCoroutine($cid); + $action(); + } + + $this->assertSame(0, Monitor::getActiveRequestCount()); + $this->assertTrue($workerState->recycleTriggered); + $this->assertSame([8], $server->stoppedWorkerIds); + } + + private function workerState(int $maxRequests, int $workerId = 1): WorkerState + { + $workerState = new WorkerState(); + $workerState->workerId = $workerId; + $workerState->maxRequests = $maxRequests; + + return $workerState; + } +} + +class FakeWorkerRecycleServer +{ + public array $stoppedWorkerIds = []; + + public function __construct( + public bool $stopResult = true, + public ?Throwable $stopException = null, + ) { + } + + public function stop(int $workerId): bool + { + $this->stoppedWorkerIds[] = $workerId; + + if ($this->stopException) { + throw $this->stopException; + } + + return $this->stopResult; + } +}