Driver: Shaper: Rework packets processing, Part 2

This commit is contained in:
Nodir Temirkhodjaev 2024-02-17 14:19:42 +03:00
parent e880ea9ae5
commit 1bef33928d
8 changed files with 77 additions and 64 deletions

View File

@ -98,7 +98,11 @@ inline static ULONG fort_packet_data_length(PCFORT_CALLOUT_ARG ca)
PNET_BUFFER netBuf = NET_BUFFER_LIST_FIRST_NB(ca->netBufList); PNET_BUFFER netBuf = NET_BUFFER_LIST_FIRST_NB(ca->netBufList);
const ULONG data_length = NET_BUFFER_DATA_LENGTH(netBuf); const ULONG data_length = NET_BUFFER_DATA_LENGTH(netBuf);
return data_length; const ULONG header_size = ca->inbound
? 0
: ca->inMetaValues->transportHeaderSize + ca->inMetaValues->ipHeaderSize;
return header_size + data_length;
} }
static PFORT_FLOW_PACKET fort_shaper_packet_get_locked(PFORT_SHAPER shaper) static PFORT_FLOW_PACKET fort_shaper_packet_get_locked(PFORT_SHAPER shaper)
@ -512,11 +516,6 @@ static void fort_shaper_packet_list_add_chain(
pkt_list->packet_tail = pkt_tail; pkt_list->packet_tail = pkt_tail;
} }
inline static void fort_shaper_packet_list_add(PFORT_PACKET_LIST pkt_list, PFORT_FLOW_PACKET pkt)
{
fort_shaper_packet_list_add_chain(pkt_list, pkt, pkt);
}
static PFORT_FLOW_PACKET fort_shaper_packet_list_get( static PFORT_FLOW_PACKET fort_shaper_packet_list_get(
PFORT_PACKET_LIST pkt_list, PFORT_FLOW_PACKET pkt) PFORT_PACKET_LIST pkt_list, PFORT_FLOW_PACKET pkt)
{ {
@ -578,22 +577,41 @@ static PFORT_FLOW_PACKET fort_shaper_packet_list_get_flow_packets(
return pkt_chain; return pkt_chain;
} }
static void fort_shaper_queue_process_bandwidth( inline static void fort_shaper_queue_advance_available(
PFORT_SHAPER shaper, PFORT_PACKET_QUEUE queue, const LARGE_INTEGER now) PFORT_SHAPER shaper, PFORT_PACKET_QUEUE queue, const LARGE_INTEGER now)
{ {
const UINT64 bps = queue->limit.bps;
if (fort_shaper_packet_list_is_empty(&queue->bandwidth_list) if (fort_shaper_packet_list_is_empty(&queue->bandwidth_list)
&& queue->available_bytes > FORT_QUEUE_INITIAL_TOKEN_COUNT) { && queue->available_bytes > FORT_QUEUE_INITIAL_TOKEN_COUNT) {
queue->available_bytes = FORT_QUEUE_INITIAL_TOKEN_COUNT; queue->available_bytes = FORT_QUEUE_INITIAL_TOKEN_COUNT;
} else { return;
/* Advance the available bytes */
const UINT64 accumulated =
((now.QuadPart - queue->last_tick.QuadPart) * bps) / shaper->qpcFrequency.QuadPart;
queue->available_bytes += accumulated;
} }
const UINT64 bps = queue->limit.bps;
/* Advance the available bytes */
const UINT64 accumulated =
((now.QuadPart - queue->last_tick.QuadPart) * bps) / shaper->qpcFrequency.QuadPart;
queue->available_bytes += accumulated;
const UINT64 max_available = bps * 8;
if (queue->available_bytes > max_available) {
queue->available_bytes = max_available;
}
}
static void fort_shaper_queue_process_bandwidth(
PFORT_SHAPER shaper, PFORT_PACKET_QUEUE queue, const LARGE_INTEGER now)
{
fort_shaper_queue_advance_available(shaper, queue, now);
/*
LOG("Shaper: BAND: npkt=%d queued=%d avail=%d ms=%d\n", queue->queued_packets,
(UINT32) queue->queued_bytes, (UINT32) queue->available_bytes,
(UINT32) (((now.QuadPart - queue->last_tick.QuadPart) * 1000)
/ shaper->qpcFrequency.QuadPart));
*/
queue->last_tick = now; queue->last_tick = now;
/* Move packets to the latency queue as the accumulated available bytes will allow */ /* Move packets to the latency queue as the accumulated available bytes will allow */
@ -606,7 +624,7 @@ static void fort_shaper_queue_process_bandwidth(
do { do {
const UINT64 pkt_length = pkt->data_length; const UINT64 pkt_length = pkt->data_length;
if (bps != 0LL && queue->available_bytes < pkt_length) if (queue->available_bytes < pkt_length)
break; break;
queue->available_bytes -= pkt_length; queue->available_bytes -= pkt_length;
@ -674,6 +692,8 @@ static PFORT_FLOW_PACKET fort_shaper_queue_get_packets(
KLOCK_QUEUE_HANDLE lock_queue; KLOCK_QUEUE_HANDLE lock_queue;
KeAcquireInStackQueuedSpinLock(&queue->lock, &lock_queue); KeAcquireInStackQueuedSpinLock(&queue->lock, &lock_queue);
queue->queued_bytes = 0;
pkt = fort_shaper_packet_list_get(&queue->latency_list, pkt); pkt = fort_shaper_packet_list_get(&queue->latency_list, pkt);
pkt = fort_shaper_packet_list_get(&queue->bandwidth_list, pkt); pkt = fort_shaper_packet_list_get(&queue->bandwidth_list, pkt);
@ -750,6 +770,8 @@ inline static PFORT_PACKET_QUEUE fort_shaper_create_queue(PFORT_SHAPER shaper, i
static void fort_shaper_create_queues( static void fort_shaper_create_queues(
PFORT_SHAPER shaper, PFORT_SPEED_LIMIT limits, UINT32 limit_io_bits) PFORT_SHAPER shaper, PFORT_SPEED_LIMIT limits, UINT32 limit_io_bits)
{ {
const LARGE_INTEGER now = KeQueryPerformanceCounter(NULL);
for (int i = 0; limit_io_bits != 0; ++i) { for (int i = 0; limit_io_bits != 0; ++i) {
const BOOL queue_exists = (limit_io_bits & 1) != 0; const BOOL queue_exists = (limit_io_bits & 1) != 0;
limit_io_bits >>= 1; limit_io_bits >>= 1;
@ -762,28 +784,7 @@ static void fort_shaper_create_queues(
continue; continue;
queue->limit = limits[i]; queue->limit = limits[i];
}
}
static void fort_shaper_init_queues(PFORT_SHAPER shaper, UINT32 group_io_bits)
{
if (group_io_bits == 0)
return;
const LARGE_INTEGER now = KeQueryPerformanceCounter(NULL);
for (int i = 0; group_io_bits != 0; ++i) {
const BOOL queue_exists = (group_io_bits & 1) != 0;
group_io_bits >>= 1;
if (!queue_exists)
continue;
PFORT_PACKET_QUEUE queue = shaper->queues[i];
if (queue == NULL)
continue;
queue->queued_bytes = 0;
queue->available_bytes = FORT_QUEUE_INITIAL_TOKEN_COUNT; queue->available_bytes = FORT_QUEUE_INITIAL_TOKEN_COUNT;
queue->last_tick = now; queue->last_tick = now;
} }
@ -854,8 +855,9 @@ static void fort_shaper_thread_loop(PVOID context)
PFORT_SHAPER shaper = context; PFORT_SHAPER shaper = context;
PKEVENT thread_event = &shaper->thread_event; PKEVENT thread_event = &shaper->thread_event;
LARGE_INTEGER due; LARGE_INTEGER delay = {
due.QuadPart = 1LL * -10000LL /* ms -> us */; .QuadPart = -2 * 1000 * 10 /* sleep 2000us (2ms) */
};
PLARGE_INTEGER timeout = NULL; PLARGE_INTEGER timeout = NULL;
@ -864,7 +866,7 @@ static void fort_shaper_thread_loop(PVOID context)
const BOOL is_active = fort_shaper_thread_process(shaper); const BOOL is_active = fort_shaper_thread_process(shaper);
timeout = is_active ? &due : NULL; timeout = is_active ? &delay : NULL;
} while ((fort_shaper_flags(shaper) & FORT_SHAPER_CLOSED) == 0); } while ((fort_shaper_flags(shaper) & FORT_SHAPER_CLOSED) == 0);
} }
@ -925,7 +927,7 @@ FORT_API void fort_shaper_open(PFORT_SHAPER shaper)
KeInitializeEvent(&shaper->thread_event, SynchronizationEvent, FALSE); KeInitializeEvent(&shaper->thread_event, SynchronizationEvent, FALSE);
fort_thread_run(&shaper->thread, &fort_shaper_thread_loop, shaper); fort_thread_run(&shaper->thread, &fort_shaper_thread_loop, shaper, /*priorityIncrement=*/1);
} }
FORT_API void fort_shaper_close(PFORT_SHAPER shaper) FORT_API void fort_shaper_close(PFORT_SHAPER shaper)
@ -956,10 +958,7 @@ FORT_API void fort_shaper_conf_update(PFORT_SHAPER shaper, const PFORT_CONF_IO c
{ {
flush_io_bits = (limit_io_bits ^ shaper->group_io_bits); flush_io_bits = (limit_io_bits ^ shaper->group_io_bits);
const UINT32 new_limit_io_bits = (flush_io_bits & limit_io_bits);
fort_shaper_create_queues(shaper, conf_group->limits, limit_io_bits); fort_shaper_create_queues(shaper, conf_group->limits, limit_io_bits);
fort_shaper_init_queues(shaper, new_limit_io_bits);
shaper->limit_io_bits = limit_io_bits; shaper->limit_io_bits = limit_io_bits;
@ -981,10 +980,6 @@ void fort_shaper_conf_flags_update(PFORT_SHAPER shaper, const PFORT_CONF_FLAGS c
{ {
flush_io_bits = (group_io_bits ^ shaper->group_io_bits); flush_io_bits = (group_io_bits ^ shaper->group_io_bits);
const UINT32 new_group_io_bits = (flush_io_bits & group_io_bits);
fort_shaper_init_queues(shaper, new_group_io_bits);
fort_shaper_io_bits_exchange( fort_shaper_io_bits_exchange(
&shaper->group_io_bits, (shaper->limit_io_bits & group_io_bits)); &shaper->group_io_bits, (shaper->limit_io_bits & group_io_bits));
} }
@ -993,14 +988,17 @@ void fort_shaper_conf_flags_update(PFORT_SHAPER shaper, const PFORT_CONF_FLAGS c
fort_shaper_flush(shaper, flush_io_bits, /*drop=*/FALSE); fort_shaper_flush(shaper, flush_io_bits, /*drop=*/FALSE);
} }
static void fort_shaper_packet_queue_add_packet(PFORT_PACKET_QUEUE queue, PFORT_FLOW_PACKET pkt) static void fort_shaper_packet_queue_add_packet(
PFORT_SHAPER shaper, PFORT_PACKET_QUEUE queue, PFORT_FLOW_PACKET pkt, UINT32 queue_bit)
{ {
KLOCK_QUEUE_HANDLE lock_queue; KLOCK_QUEUE_HANDLE lock_queue;
KeAcquireInStackQueuedSpinLock(&queue->lock, &lock_queue); KeAcquireInStackQueuedSpinLock(&queue->lock, &lock_queue);
{ {
queue->queued_bytes += pkt->data_length; queue->queued_bytes += pkt->data_length;
fort_shaper_packet_list_add(&queue->bandwidth_list, pkt); fort_shaper_packet_list_add_chain(&queue->bandwidth_list, pkt, pkt);
fort_shaper_io_bits_set(&shaper->active_io_bits, queue_bit, TRUE);
} }
KeReleaseInStackQueuedSpinLock(&lock_queue); KeReleaseInStackQueuedSpinLock(&lock_queue);
} }
@ -1081,12 +1079,9 @@ inline static NTSTATUS fort_shaper_packet_queue(
pkt->data_length = data_length; pkt->data_length = data_length;
/* Add the Packet to Queue */ /* Add the Packet to Queue */
fort_shaper_packet_queue_add_packet(queue, pkt); fort_shaper_packet_queue_add_packet(shaper, queue, pkt, queue_bit);
/* Packets in transport layer must be re-injected in DCP due to locking */ /* Packets in transport layer must be re-injected in DCP/thread due to locking */
fort_shaper_io_bits_set(&shaper->active_io_bits, queue_bit, TRUE);
/* Process the queued packets in thread */
fort_shaper_thread_set_event(shaper); fort_shaper_thread_set_event(shaper);
return STATUS_SUCCESS; return STATUS_SUCCESS;

View File

@ -415,8 +415,9 @@ FORT_API void fort_stat_close_flows(PFORT_STAT stat)
KeReleaseInStackQueuedSpinLock(&lock_queue); KeReleaseInStackQueuedSpinLock(&lock_queue);
/* Wait for asynchronously deleting flows */ /* Wait for asynchronously deleting flows */
LARGE_INTEGER delay; LARGE_INTEGER delay = {
delay.QuadPart = -50 * 1000 * 10; /* sleep 50000us (50ms) */ .QuadPart = -50 * 1000 * 10 /* sleep 50000us (50ms) */
};
KeDelayExecutionThread(KernelMode, FALSE, &delay); KeDelayExecutionThread(KernelMode, FALSE, &delay);
} }

View File

@ -5,7 +5,8 @@
#include "fortcb.h" #include "fortcb.h"
#include "fortdbg.h" #include "fortdbg.h"
FORT_API NTSTATUS fort_thread_run(PFORT_THREAD thread, PKSTART_ROUTINE routine, PVOID context) FORT_API NTSTATUS fort_thread_run(
PFORT_THREAD thread, PKSTART_ROUTINE routine, PVOID context, int priorityIncrement)
{ {
NTSTATUS status; NTSTATUS status;
@ -21,6 +22,10 @@ FORT_API NTSTATUS fort_thread_run(PFORT_THREAD thread, PKSTART_ROUTINE routine,
ZwClose(hThread); ZwClose(hThread);
if (NT_SUCCESS(status)) {
KeSetBasePriorityThread(thread->thread_obj, priorityIncrement);
}
return status; return status;
} }

View File

@ -12,7 +12,8 @@ typedef struct fort_thread
extern "C" { extern "C" {
#endif #endif
FORT_API NTSTATUS fort_thread_run(PFORT_THREAD thread, PKSTART_ROUTINE routine, PVOID context); FORT_API NTSTATUS fort_thread_run(
PFORT_THREAD thread, PKSTART_ROUTINE routine, PVOID context, int priorityIncrement);
FORT_API void fort_thread_wait(PFORT_THREAD thread); FORT_API void fort_thread_wait(PFORT_THREAD thread);

View File

@ -80,8 +80,9 @@ void fort_timer_set_running(PFORT_TIMER timer, BOOL run)
const ULONG interval = (flags & FORT_TIMER_ONESHOT) != 0 ? 0 : period; const ULONG interval = (flags & FORT_TIMER_ONESHOT) != 0 ? 0 : period;
const ULONG delay = (flags & FORT_TIMER_COALESCABLE) != 0 ? 500 : 0; const ULONG delay = (flags & FORT_TIMER_COALESCABLE) != 0 ? 500 : 0;
LARGE_INTEGER due; LARGE_INTEGER due = {
due.QuadPart = (INT64) period * -10000LL /* ms -> us */; .QuadPart = (INT64) period * -10000LL /* ms -> us */
};
KeSetCoalescableTimer(&timer->id, due, interval, delay, &timer->dpc); KeSetCoalescableTimer(&timer->id, due, interval, delay, &timer->dpc);
} else { } else {

View File

@ -46,10 +46,11 @@ static void fort_worker_wait(PFORT_WORKER worker)
for (;;) { for (;;) {
const SHORT queue_size = InterlockedOr16(&worker->queue_size, 0); const SHORT queue_size = InterlockedOr16(&worker->queue_size, 0);
LARGE_INTEGER timeout; LARGE_INTEGER delay = {
timeout.QuadPart = -50 * 1000 * 10; /* 50 msecs */ .QuadPart = -50 * 1000 * 10 /* 50 msecs */
};
KeDelayExecutionThread(KernelMode, FALSE, &timeout); KeDelayExecutionThread(KernelMode, FALSE, &delay);
if (queue_size == 0) if (queue_size == 0)
break; /* Check the extra one time to ensure thread's exit from callback function */ break; /* Check the extra one time to ensure thread's exit from callback function */

View File

@ -22,3 +22,10 @@ NTSTATUS KeExpandKernelStackAndCallout(PEXPAND_STACK_CALLOUT callout, PVOID para
UNUSED(size); UNUSED(size);
return STATUS_SUCCESS; return STATUS_SUCCESS;
} }
LONG KeSetBasePriorityThread(PVOID threadObject, LONG increment)
{
UNUSED(threadObject);
UNUSED(increment);
return 0;
}

View File

@ -45,6 +45,8 @@ typedef EXPAND_STACK_CALLOUT *PEXPAND_STACK_CALLOUT;
FORT_API NTSTATUS KeExpandKernelStackAndCallout( FORT_API NTSTATUS KeExpandKernelStackAndCallout(
PEXPAND_STACK_CALLOUT callout, PVOID parameter, SIZE_T size); PEXPAND_STACK_CALLOUT callout, PVOID parameter, SIZE_T size);
FORT_API LONG KeSetBasePriorityThread(PVOID threadObject, LONG increment);
#ifdef __cplusplus #ifdef __cplusplus
} // extern "C" } // extern "C"
#endif #endif