Driver: Shaper: Rework packets processing

This commit is contained in:
Nodir Temirkhodjaev 2024-02-16 19:46:41 +03:00
parent 6cec5d5029
commit e880ea9ae5
10 changed files with 169 additions and 44 deletions

View File

@ -19,6 +19,7 @@ SOURCES += \
fortscb.c \
fortstat.c \
forttds.c \
fortthr.c \
forttlsf.c \
forttmr.c \
forttrace.c \
@ -57,6 +58,7 @@ HEADERS += \
fortscb.h \
fortstat.h \
forttds.h \
fortthr.h \
forttlsf.h \
forttmr.h \
forttrace.h \

View File

@ -26,7 +26,6 @@ typedef enum FORT_FUNC_ID {
FORT_DEVICE_LOAD,
FORT_DEVICE_UNLOAD,
FORT_PACKET_INJECT_COMPLETE,
FORT_SHAPER_TIMER_PROCESS,
FORT_SYSCB_POWER,
FORT_SYSCB_TIME,
FORT_TIMER_CALLBACK,

View File

@ -29,6 +29,7 @@
#include "fortps.c"
#include "fortstat.c"
#include "fortscb.c"
#include "fortthr.c"
#include "forttmr.c"
#include "forttrace.c"
#include "fortutl.c"

View File

@ -17,6 +17,16 @@
typedef void FORT_SHAPER_PACKET_FOREACH_FUNC(PFORT_SHAPER, PFORT_FLOW_PACKET);
static UCHAR fort_shaper_flags_set(PFORT_SHAPER shaper, UCHAR flags, BOOL on)
{
return on ? InterlockedOr8(&shaper->flags, flags) : InterlockedAnd8(&shaper->flags, ~flags);
}
static UCHAR fort_shaper_flags(PFORT_SHAPER shaper)
{
return fort_shaper_flags_set(shaper, 0, TRUE);
}
static LONG fort_shaper_io_bits_exchange(volatile LONG *io_bits, LONG v)
{
return InterlockedExchange(io_bits, v);
@ -83,15 +93,10 @@ inline static BOOL fort_packet_is_ipsec_tunneled(PCFORT_CALLOUT_ARG ca)
return info.isTunnelMode && !info.isDeTunneled;
}
static ULONG fort_packet_data_length(const PNET_BUFFER_LIST netBufList)
inline static ULONG fort_packet_data_length(PCFORT_CALLOUT_ARG ca)
{
ULONG data_length = 0;
PNET_BUFFER netBuf = NET_BUFFER_LIST_FIRST_NB(netBufList);
while (netBuf != NULL) {
data_length += NET_BUFFER_DATA_LENGTH(netBuf);
netBuf = NET_BUFFER_NEXT_NB(netBuf);
}
PNET_BUFFER netBuf = NET_BUFFER_LIST_FIRST_NB(ca->netBufList);
const ULONG data_length = NET_BUFFER_DATA_LENGTH(netBuf);
return data_length;
}
@ -578,14 +583,15 @@ static void fort_shaper_queue_process_bandwidth(
{
const UINT64 bps = queue->limit.bps;
/* Advance the available bytes */
const UINT64 accumulated =
((now.QuadPart - queue->last_tick.QuadPart) * bps) / shaper->qpcFrequency.QuadPart;
queue->available_bytes += accumulated;
if (fort_shaper_packet_list_is_empty(&queue->bandwidth_list)
&& queue->available_bytes > FORT_QUEUE_INITIAL_TOKEN_COUNT) {
queue->available_bytes = FORT_QUEUE_INITIAL_TOKEN_COUNT;
} else {
/* Advance the available bytes */
const UINT64 accumulated =
((now.QuadPart - queue->last_tick.QuadPart) * bps) / shaper->qpcFrequency.QuadPart;
queue->available_bytes += accumulated;
}
queue->last_tick = now;
@ -598,11 +604,13 @@ static void fort_shaper_queue_process_bandwidth(
PFORT_FLOW_PACKET pkt_tail = NULL;
PFORT_FLOW_PACKET pkt = pkt_chain;
do {
if (bps != 0LL && queue->available_bytes < pkt->data_length)
const UINT64 pkt_length = pkt->data_length;
if (bps != 0LL && queue->available_bytes < pkt_length)
break;
queue->available_bytes -= pkt->data_length;
queue->queued_bytes -= pkt->data_length;
queue->available_bytes -= pkt_length;
queue->queued_bytes -= pkt_length;
pkt->latency_start = now;
@ -626,6 +634,12 @@ static PFORT_FLOW_PACKET fort_shaper_queue_process_latency(
const UINT32 latency_ms = queue->limit.latency_ms;
if (latency_ms == 0) {
fort_shaper_packet_list_cut_chain(&queue->latency_list, queue->latency_list.packet_tail);
return pkt_chain;
}
const UINT64 qpcFrequency = shaper->qpcFrequency.QuadPart;
const UINT64 qpcFrequencyHalfMs = qpcFrequency / 2000LL;
@ -786,17 +800,12 @@ static void fort_shaper_free_queues(PFORT_SHAPER shaper)
}
}
static void fort_shaper_timer_start(PFORT_SHAPER shaper)
inline static void fort_shaper_thread_set_event(PFORT_SHAPER shaper)
{
const ULONG active_io_bits = fort_shaper_io_bits(&shaper->active_io_bits);
if (active_io_bits == 0)
return;
fort_timer_set_running(&shaper->timer, /*run=*/TRUE);
KeSetEvent(&shaper->thread_event, IO_NO_INCREMENT, FALSE);
}
inline static ULONG fort_shaper_timer_process_queues(PFORT_SHAPER shaper, ULONG active_io_bits)
inline static ULONG fort_shaper_thread_process_queues(PFORT_SHAPER shaper, ULONG active_io_bits)
{
ULONG new_active_io_bits = 0;
@ -821,25 +830,50 @@ inline static ULONG fort_shaper_timer_process_queues(PFORT_SHAPER shaper, ULONG
return new_active_io_bits;
}
static void fort_shaper_timer_process(void)
inline static BOOL fort_shaper_thread_process(PFORT_SHAPER shaper)
{
FORT_CHECK_STACK(FORT_SHAPER_TIMER_PROCESS);
PFORT_SHAPER shaper = &fort_device()->shaper;
ULONG active_io_bits =
fort_shaper_io_bits_set(&shaper->active_io_bits, FORT_PACKET_FLUSH_ALL, FALSE);
if (active_io_bits == 0)
return;
return FALSE;
active_io_bits = fort_shaper_timer_process_queues(shaper, active_io_bits);
active_io_bits = fort_shaper_thread_process_queues(shaper, active_io_bits);
if (active_io_bits != 0) {
fort_shaper_io_bits_set(&shaper->active_io_bits, active_io_bits, TRUE);
fort_shaper_timer_start(shaper);
return TRUE;
}
return FALSE;
}
static void fort_shaper_thread_loop(PVOID context)
{
PFORT_SHAPER shaper = context;
PKEVENT thread_event = &shaper->thread_event;
LARGE_INTEGER due;
due.QuadPart = 1LL * -10000LL /* ms -> us */;
PLARGE_INTEGER timeout = NULL;
do {
KeWaitForSingleObject(thread_event, Executive, KernelMode, FALSE, timeout);
const BOOL is_active = fort_shaper_thread_process(shaper);
timeout = is_active ? &due : NULL;
} while ((fort_shaper_flags(shaper) & FORT_SHAPER_CLOSED) == 0);
}
static void fort_shaper_thread_close(PFORT_SHAPER shaper)
{
fort_shaper_thread_set_event(shaper);
fort_thread_wait(&shaper->thread);
}
inline static PFORT_FLOW_PACKET fort_shaper_flush_queues(PFORT_SHAPER shaper, UINT32 group_io_bits)
@ -889,13 +923,16 @@ FORT_API void fort_shaper_open(PFORT_SHAPER shaper)
KeInitializeSpinLock(&shaper->lock);
fort_timer_open(
&shaper->timer, /*period(ms)=*/1, FORT_TIMER_ONESHOT, &fort_shaper_timer_process);
KeInitializeEvent(&shaper->thread_event, SynchronizationEvent, FALSE);
fort_thread_run(&shaper->thread, &fort_shaper_thread_loop, shaper);
}
FORT_API void fort_shaper_close(PFORT_SHAPER shaper)
{
fort_timer_close(&shaper->timer);
fort_shaper_flags_set(shaper, FORT_SHAPER_CLOSED, TRUE);
fort_shaper_thread_close(shaper);
fort_shaper_drop_packets(shaper);
fort_shaper_free_queues(shaper);
@ -1020,11 +1057,12 @@ inline static NTSTATUS fort_shaper_packet_queue(
return STATUS_NO_SUCH_GROUP;
/* Calculate the Packets' Data Length */
const ULONG data_length = fort_packet_data_length(ca->netBufList);
const ULONG data_length = fort_packet_data_length(ca);
/* Check the Queue for new Packet */
if (!fort_shaper_packet_queue_check_packet(queue, data_length))
if (!fort_shaper_packet_queue_check_packet(queue, data_length)) {
return STATUS_SUCCESS; /* drop the packet */
}
/* Create the Packet */
PFORT_FLOW_PACKET pkt = fort_shaper_packet_get(shaper);
@ -1048,8 +1086,8 @@ inline static NTSTATUS fort_shaper_packet_queue(
/* Packets in transport layer must be re-injected in DCP due to locking */
fort_shaper_io_bits_set(&shaper->active_io_bits, queue_bit, TRUE);
/* Start the Timer */
fort_shaper_timer_start(shaper);
/* Process the queued packets in thread */
fort_shaper_thread_set_event(shaper);
return STATUS_SUCCESS;
}

View File

@ -6,7 +6,7 @@
#include "common/fortconf.h"
#include "fortcoutarg.h"
#include "forttds.h"
#include "forttmr.h"
#include "fortthr.h"
#define FORT_PACKET_QUEUE_BAD_INDEX ((UINT16) -1)
@ -129,8 +129,12 @@ typedef struct fort_pending
KSPIN_LOCK lock;
} FORT_PENDING, *PFORT_PENDING;
#define FORT_SHAPER_CLOSED 0x01
typedef struct fort_shaper
{
UCHAR volatile flags;
UINT32 limit_io_bits;
LONG volatile group_io_bits;
@ -139,7 +143,8 @@ typedef struct fort_shaper
ULONG randomSeed;
LARGE_INTEGER qpcFrequency;
FORT_TIMER timer;
KEVENT thread_event;
FORT_THREAD thread;
PFORT_FLOW_PACKET packet_free;
tommy_arrayof packets;

36
src/driver/fortthr.c Normal file
View File

@ -0,0 +1,36 @@
/* Fort Firewall Thread */
#include "fortthr.h"
#include "fortcb.h"
#include "fortdbg.h"
FORT_API NTSTATUS fort_thread_run(PFORT_THREAD thread, PKSTART_ROUTINE routine, PVOID context)
{
NTSTATUS status;
thread->thread_obj = NULL;
HANDLE hThread;
status = PsCreateSystemThread(&hThread, 0, NULL, NULL, NULL, routine, context);
if (!NT_SUCCESS(status))
return status;
status = ObReferenceObjectByHandle(
hThread, THREAD_ALL_ACCESS, NULL, KernelMode, &thread->thread_obj, NULL);
ZwClose(hThread);
return status;
}
FORT_API void fort_thread_wait(PFORT_THREAD thread)
{
if (thread->thread_obj == NULL)
return;
KeWaitForSingleObject(thread->thread_obj, Executive, KernelMode, FALSE, NULL);
ObDereferenceObject(thread->thread_obj);
thread->thread_obj = NULL;
}

23
src/driver/fortthr.h Normal file
View File

@ -0,0 +1,23 @@
#ifndef FORTTHR_H
#define FORTTHR_H
#include "fortdrv.h"
typedef struct fort_thread
{
PVOID thread_obj;
} FORT_THREAD, *PFORT_THREAD;
#if defined(__cplusplus)
extern "C" {
#endif
FORT_API NTSTATUS fort_thread_run(PFORT_THREAD thread, PKSTART_ROUTINE routine, PVOID context);
FORT_API void fort_thread_wait(PFORT_THREAD thread);
#ifdef __cplusplus
} // extern "C"
#endif
#endif // FORTTHR_H

View File

@ -315,6 +315,20 @@ void IoQueueWorkItemEx(
UNUSED(context);
}
NTSTATUS PsCreateSystemThread(PHANDLE threadHandle, ULONG desiredAccess,
POBJECT_ATTRIBUTES objectAttributes, HANDLE processHandle, PVOID clientId,
PKSTART_ROUTINE startRoutine, PVOID startContext)
{
UNUSED(threadHandle);
UNUSED(desiredAccess);
UNUSED(objectAttributes);
UNUSED(processHandle);
UNUSED(clientId);
UNUSED(startRoutine);
UNUSED(startContext);
return STATUS_SUCCESS;
}
LARGE_INTEGER KeQueryPerformanceCounter(PLARGE_INTEGER performanceFrequency)
{
UNUSED(performanceFrequency);

View File

@ -246,6 +246,9 @@ typedef IO_WORKITEM_ROUTINE *PIO_WORKITEM_ROUTINE;
typedef VOID IO_WORKITEM_ROUTINE_EX(PVOID IoObject, PVOID Context, PIO_WORKITEM IoWorkItem);
typedef IO_WORKITEM_ROUTINE_EX *PIO_WORKITEM_ROUTINE_EX;
typedef VOID KSTART_ROUTINE(PVOID startContext);
typedef KSTART_ROUTINE *PKSTART_ROUTINE;
typedef struct
{
short Year; // range [1601...]
@ -393,6 +396,10 @@ FORT_API void IoQueueWorkItem(
FORT_API void IoQueueWorkItemEx(
PIO_WORKITEM workItem, PIO_WORKITEM_ROUTINE_EX routine, int queueType, PVOID context);
FORT_API NTSTATUS PsCreateSystemThread(PHANDLE threadHandle, ULONG desiredAccess,
POBJECT_ATTRIBUTES objectAttributes, HANDLE processHandle, PVOID clientId,
PKSTART_ROUTINE startRoutine, PVOID startContext);
FORT_API LARGE_INTEGER KeQueryPerformanceCounter(PLARGE_INTEGER performanceFrequency);
FORT_API void KeQuerySystemTime(PLARGE_INTEGER time);

View File

@ -664,7 +664,7 @@ void ConfUtil::writeLimit(fort_speed_limit *limit, quint32 kBits, quint32 buffer
limit->plr = packetLoss;
limit->latency_ms = latencyMsec;
limit->buffer_bytes = bufferSize;
limit->bps = quint64(kBits) * (1024 / 8);
limit->bps = quint64(kBits) * (1024LL / 8); /* to bytes per second */
}
void ConfUtil::writeAddressRanges(char **data, const addrranges_arr_t &addressRanges)