From ec729980bb4dae371905ce8a3b938bb433445965 Mon Sep 17 00:00:00 2001 From: Nodir Temirkhodjaev Date: Wed, 21 Feb 2024 13:52:49 +0300 Subject: [PATCH] Driver: Shaper: Skip not yet processed packets in the worker thread --- src/driver/fortpkt.c | 44 +++++++++++++++++++++++++------------------- src/driver/fortthr.c | 2 +- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/driver/fortpkt.c b/src/driver/fortpkt.c index 20a71e87..f74e71a4 100644 --- a/src/driver/fortpkt.c +++ b/src/driver/fortpkt.c @@ -580,6 +580,9 @@ static PFORT_FLOW_PACKET fort_shaper_packet_list_get_flow_packets( inline static void fort_shaper_queue_advance_available( PFORT_SHAPER shaper, PFORT_PACKET_QUEUE queue, const LARGE_INTEGER now) { + const LARGE_INTEGER last_tick = queue->last_tick; + queue->last_tick = now; + if (fort_shaper_packet_list_is_empty(&queue->bandwidth_list) && queue->available_bytes > FORT_QUEUE_INITIAL_TOKEN_COUNT) { queue->available_bytes = FORT_QUEUE_INITIAL_TOKEN_COUNT; @@ -590,7 +593,7 @@ inline static void fort_shaper_queue_advance_available( /* Advance the available bytes */ const UINT64 accumulated = - ((now.QuadPart - queue->last_tick.QuadPart) * bps) / shaper->qpcFrequency.QuadPart; + ((now.QuadPart - last_tick.QuadPart) * bps) / shaper->qpcFrequency.QuadPart; queue->available_bytes += accumulated; @@ -598,6 +601,13 @@ inline static void fort_shaper_queue_advance_available( if (queue->available_bytes > max_available) { queue->available_bytes = max_available; } + + /* + LOG("Shaper: BAND: queued=%d avail=%d ms=%d\n", (UINT32) queue->queued_bytes, + (UINT32) queue->available_bytes, + (UINT32) (((now.QuadPart - last_tick.QuadPart) * 1000) + / shaper->qpcFrequency.QuadPart)); + */ } static void fort_shaper_queue_process_bandwidth( @@ -605,15 +615,6 @@ static void fort_shaper_queue_process_bandwidth( { fort_shaper_queue_advance_available(shaper, queue, now); - /* - LOG("Shaper: BAND: queued=%d avail=%d ms=%d\n", (UINT32) queue->queued_bytes, - (UINT32) queue->available_bytes, - (UINT32) (((now.QuadPart - queue->last_tick.QuadPart) * 1000) - / shaper->qpcFrequency.QuadPart)); - */ - - queue->last_tick = now; - /* Move packets to the latency queue as the accumulated available bytes will allow */ PFORT_FLOW_PACKET pkt_chain = queue->bandwidth_list.packet_head; if (pkt_chain == NULL) @@ -627,11 +628,14 @@ static void fort_shaper_queue_process_bandwidth( if (queue->available_bytes < pkt_length) break; - queue->available_bytes -= pkt_length; - queue->queued_bytes -= pkt_length; + if (pkt->latency_start.LowPart >= now.LowPart) + break; /* skip not yet processed packets */ pkt->latency_start = now; + queue->available_bytes -= pkt_length; + queue->queued_bytes -= pkt_length; + pkt_tail = pkt; pkt = pkt->next; } while (pkt != NULL); @@ -806,12 +810,11 @@ inline static void fort_shaper_thread_set_event(PFORT_SHAPER shaper) KeSetEvent(&shaper->thread_event, IO_NO_INCREMENT, FALSE); } -inline static ULONG fort_shaper_thread_process_queues(PFORT_SHAPER shaper, ULONG active_io_bits) +inline static ULONG fort_shaper_thread_process_queues( + PFORT_SHAPER shaper, ULONG active_io_bits, const LARGE_INTEGER now) { ULONG new_active_io_bits = 0; - const LARGE_INTEGER now = KeQueryPerformanceCounter(NULL); - for (int i = 0; active_io_bits != 0; ++i) { const BOOL queue_exists = (active_io_bits & 1) != 0; active_io_bits >>= 1; @@ -831,7 +834,7 @@ inline static ULONG fort_shaper_thread_process_queues(PFORT_SHAPER shaper, ULONG return new_active_io_bits; } -inline static BOOL fort_shaper_thread_process(PFORT_SHAPER shaper) +inline static BOOL fort_shaper_thread_process(PFORT_SHAPER shaper, const LARGE_INTEGER now) { ULONG active_io_bits = fort_shaper_io_bits_set(&shaper->active_io_bits, FORT_PACKET_FLUSH_ALL, FALSE); @@ -839,7 +842,7 @@ inline static BOOL fort_shaper_thread_process(PFORT_SHAPER shaper) if (active_io_bits == 0) return FALSE; - active_io_bits = fort_shaper_thread_process_queues(shaper, active_io_bits); + active_io_bits = fort_shaper_thread_process_queues(shaper, active_io_bits, now); if (active_io_bits != 0) { fort_shaper_io_bits_set(&shaper->active_io_bits, active_io_bits, TRUE); @@ -864,7 +867,9 @@ static void fort_shaper_thread_loop(PVOID context) do { KeWaitForSingleObject(thread_event, Executive, KernelMode, FALSE, timeout); - const BOOL is_active = fort_shaper_thread_process(shaper); + const LARGE_INTEGER now = KeQueryPerformanceCounter(NULL); /* get current time ASAP */ + + const BOOL is_active = fort_shaper_thread_process(shaper, now); timeout = is_active ? &delay : NULL; @@ -927,7 +932,7 @@ FORT_API void fort_shaper_open(PFORT_SHAPER shaper) KeInitializeEvent(&shaper->thread_event, SynchronizationEvent, FALSE); - fort_thread_run(&shaper->thread, &fort_shaper_thread_loop, shaper, /*priorityIncrement=*/1); + fort_thread_run(&shaper->thread, &fort_shaper_thread_loop, shaper, /*priorityIncrement=*/0); } FORT_API void fort_shaper_close(PFORT_SHAPER shaper) @@ -1076,6 +1081,7 @@ inline static NTSTATUS fort_shaper_packet_queue( } pkt->flow = flow; + pkt->latency_start = KeQueryPerformanceCounter(NULL); pkt->data_length = data_length; /* Add the Packet to Queue */ diff --git a/src/driver/fortthr.c b/src/driver/fortthr.c index 60eee5ab..523792fe 100644 --- a/src/driver/fortthr.c +++ b/src/driver/fortthr.c @@ -22,7 +22,7 @@ FORT_API NTSTATUS fort_thread_run( ZwClose(hThread); - if (NT_SUCCESS(status)) { + if (NT_SUCCESS(status) && priorityIncrement != 0) { KeSetBasePriorityThread(thread->thread_obj, priorityIncrement); }