Driver: Shaper: Skip not yet processed packets in the worker thread

This commit is contained in:
Nodir Temirkhodjaev 2024-02-21 13:52:49 +03:00
parent a5eff94edb
commit ec729980bb
2 changed files with 26 additions and 20 deletions

View File

@ -580,6 +580,9 @@ static PFORT_FLOW_PACKET fort_shaper_packet_list_get_flow_packets(
inline static void fort_shaper_queue_advance_available( inline static void fort_shaper_queue_advance_available(
PFORT_SHAPER shaper, PFORT_PACKET_QUEUE queue, const LARGE_INTEGER now) PFORT_SHAPER shaper, PFORT_PACKET_QUEUE queue, const LARGE_INTEGER now)
{ {
const LARGE_INTEGER last_tick = queue->last_tick;
queue->last_tick = now;
if (fort_shaper_packet_list_is_empty(&queue->bandwidth_list) if (fort_shaper_packet_list_is_empty(&queue->bandwidth_list)
&& queue->available_bytes > FORT_QUEUE_INITIAL_TOKEN_COUNT) { && queue->available_bytes > FORT_QUEUE_INITIAL_TOKEN_COUNT) {
queue->available_bytes = FORT_QUEUE_INITIAL_TOKEN_COUNT; queue->available_bytes = FORT_QUEUE_INITIAL_TOKEN_COUNT;
@ -590,7 +593,7 @@ inline static void fort_shaper_queue_advance_available(
/* Advance the available bytes */ /* Advance the available bytes */
const UINT64 accumulated = const UINT64 accumulated =
((now.QuadPart - queue->last_tick.QuadPart) * bps) / shaper->qpcFrequency.QuadPart; ((now.QuadPart - last_tick.QuadPart) * bps) / shaper->qpcFrequency.QuadPart;
queue->available_bytes += accumulated; queue->available_bytes += accumulated;
@ -598,6 +601,13 @@ inline static void fort_shaper_queue_advance_available(
if (queue->available_bytes > max_available) { if (queue->available_bytes > max_available) {
queue->available_bytes = max_available; queue->available_bytes = max_available;
} }
/*
LOG("Shaper: BAND: queued=%d avail=%d ms=%d\n", (UINT32) queue->queued_bytes,
(UINT32) queue->available_bytes,
(UINT32) (((now.QuadPart - last_tick.QuadPart) * 1000)
/ shaper->qpcFrequency.QuadPart));
*/
} }
static void fort_shaper_queue_process_bandwidth( static void fort_shaper_queue_process_bandwidth(
@ -605,15 +615,6 @@ static void fort_shaper_queue_process_bandwidth(
{ {
fort_shaper_queue_advance_available(shaper, queue, now); fort_shaper_queue_advance_available(shaper, queue, now);
/*
LOG("Shaper: BAND: queued=%d avail=%d ms=%d\n", (UINT32) queue->queued_bytes,
(UINT32) queue->available_bytes,
(UINT32) (((now.QuadPart - queue->last_tick.QuadPart) * 1000)
/ shaper->qpcFrequency.QuadPart));
*/
queue->last_tick = now;
/* Move packets to the latency queue as the accumulated available bytes will allow */ /* Move packets to the latency queue as the accumulated available bytes will allow */
PFORT_FLOW_PACKET pkt_chain = queue->bandwidth_list.packet_head; PFORT_FLOW_PACKET pkt_chain = queue->bandwidth_list.packet_head;
if (pkt_chain == NULL) if (pkt_chain == NULL)
@ -627,11 +628,14 @@ static void fort_shaper_queue_process_bandwidth(
if (queue->available_bytes < pkt_length) if (queue->available_bytes < pkt_length)
break; break;
queue->available_bytes -= pkt_length; if (pkt->latency_start.LowPart >= now.LowPart)
queue->queued_bytes -= pkt_length; break; /* skip not yet processed packets */
pkt->latency_start = now; pkt->latency_start = now;
queue->available_bytes -= pkt_length;
queue->queued_bytes -= pkt_length;
pkt_tail = pkt; pkt_tail = pkt;
pkt = pkt->next; pkt = pkt->next;
} while (pkt != NULL); } while (pkt != NULL);
@ -806,12 +810,11 @@ inline static void fort_shaper_thread_set_event(PFORT_SHAPER shaper)
KeSetEvent(&shaper->thread_event, IO_NO_INCREMENT, FALSE); KeSetEvent(&shaper->thread_event, IO_NO_INCREMENT, FALSE);
} }
inline static ULONG fort_shaper_thread_process_queues(PFORT_SHAPER shaper, ULONG active_io_bits) inline static ULONG fort_shaper_thread_process_queues(
PFORT_SHAPER shaper, ULONG active_io_bits, const LARGE_INTEGER now)
{ {
ULONG new_active_io_bits = 0; ULONG new_active_io_bits = 0;
const LARGE_INTEGER now = KeQueryPerformanceCounter(NULL);
for (int i = 0; active_io_bits != 0; ++i) { for (int i = 0; active_io_bits != 0; ++i) {
const BOOL queue_exists = (active_io_bits & 1) != 0; const BOOL queue_exists = (active_io_bits & 1) != 0;
active_io_bits >>= 1; active_io_bits >>= 1;
@ -831,7 +834,7 @@ inline static ULONG fort_shaper_thread_process_queues(PFORT_SHAPER shaper, ULONG
return new_active_io_bits; return new_active_io_bits;
} }
inline static BOOL fort_shaper_thread_process(PFORT_SHAPER shaper) inline static BOOL fort_shaper_thread_process(PFORT_SHAPER shaper, const LARGE_INTEGER now)
{ {
ULONG active_io_bits = ULONG active_io_bits =
fort_shaper_io_bits_set(&shaper->active_io_bits, FORT_PACKET_FLUSH_ALL, FALSE); fort_shaper_io_bits_set(&shaper->active_io_bits, FORT_PACKET_FLUSH_ALL, FALSE);
@ -839,7 +842,7 @@ inline static BOOL fort_shaper_thread_process(PFORT_SHAPER shaper)
if (active_io_bits == 0) if (active_io_bits == 0)
return FALSE; return FALSE;
active_io_bits = fort_shaper_thread_process_queues(shaper, active_io_bits); active_io_bits = fort_shaper_thread_process_queues(shaper, active_io_bits, now);
if (active_io_bits != 0) { if (active_io_bits != 0) {
fort_shaper_io_bits_set(&shaper->active_io_bits, active_io_bits, TRUE); fort_shaper_io_bits_set(&shaper->active_io_bits, active_io_bits, TRUE);
@ -864,7 +867,9 @@ static void fort_shaper_thread_loop(PVOID context)
do { do {
KeWaitForSingleObject(thread_event, Executive, KernelMode, FALSE, timeout); KeWaitForSingleObject(thread_event, Executive, KernelMode, FALSE, timeout);
const BOOL is_active = fort_shaper_thread_process(shaper); const LARGE_INTEGER now = KeQueryPerformanceCounter(NULL); /* get current time ASAP */
const BOOL is_active = fort_shaper_thread_process(shaper, now);
timeout = is_active ? &delay : NULL; timeout = is_active ? &delay : NULL;
@ -927,7 +932,7 @@ FORT_API void fort_shaper_open(PFORT_SHAPER shaper)
KeInitializeEvent(&shaper->thread_event, SynchronizationEvent, FALSE); KeInitializeEvent(&shaper->thread_event, SynchronizationEvent, FALSE);
fort_thread_run(&shaper->thread, &fort_shaper_thread_loop, shaper, /*priorityIncrement=*/1); fort_thread_run(&shaper->thread, &fort_shaper_thread_loop, shaper, /*priorityIncrement=*/0);
} }
FORT_API void fort_shaper_close(PFORT_SHAPER shaper) FORT_API void fort_shaper_close(PFORT_SHAPER shaper)
@ -1076,6 +1081,7 @@ inline static NTSTATUS fort_shaper_packet_queue(
} }
pkt->flow = flow; pkt->flow = flow;
pkt->latency_start = KeQueryPerformanceCounter(NULL);
pkt->data_length = data_length; pkt->data_length = data_length;
/* Add the Packet to Queue */ /* Add the Packet to Queue */

View File

@ -22,7 +22,7 @@ FORT_API NTSTATUS fort_thread_run(
ZwClose(hThread); ZwClose(hThread);
if (NT_SUCCESS(status)) { if (NT_SUCCESS(status) && priorityIncrement != 0) {
KeSetBasePriorityThread(thread->thread_obj, priorityIncrement); KeSetBasePriorityThread(thread->thread_obj, priorityIncrement);
} }