mirror of
https://github.com/SoftEtherVPN/SoftEtherVPN.git
synced 2024-11-22 09:29:52 +03:00
Cedar/Proto: implement UDP system
When a datagram is received, the matching session is looked up in a hash list; if it's not found, a new session is created. This method allows to use a single UDP port for multiple protocols, as we do with TCP. Also, each session has its own dedicated thread, used to process the received datagrams and generate the ones that are then sent through the UDP listener. In addition to guaranteeing constant performance, separate threads also prevent a single one from blocking all sessions.
This commit is contained in:
parent
0570f7d31c
commit
a3aea00820
@ -19,6 +19,102 @@ int ProtoImplCompare(void *p1, void *p2)
|
||||
return false;
|
||||
}
|
||||
|
||||
int ProtoSessionCompare(void *p1, void *p2)
|
||||
{
|
||||
int ret;
|
||||
PROTO_SESSION *session_1, *session_2;
|
||||
|
||||
if (p1 == NULL || p2 == NULL)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
session_1 = *(PROTO_SESSION **)p1;
|
||||
session_2 = *(PROTO_SESSION **)p2;
|
||||
|
||||
// The source port must match
|
||||
ret = COMPARE_RET(session_1->SrcPort, session_2->SrcPort);
|
||||
if (ret != 0)
|
||||
{
|
||||
return ret;
|
||||
}
|
||||
|
||||
// The destination port must match
|
||||
ret = COMPARE_RET(session_1->DstPort, session_2->DstPort);
|
||||
if (ret != 0)
|
||||
{
|
||||
return ret;
|
||||
}
|
||||
|
||||
// The source IP address must match
|
||||
ret = CmpIpAddr(&session_1->SrcIp, &session_2->SrcIp);
|
||||
if (ret != 0)
|
||||
{
|
||||
return ret;
|
||||
}
|
||||
|
||||
// The destination IP address must match
|
||||
return CmpIpAddr(&session_1->DstIp, &session_2->DstIp);
|
||||
}
|
||||
|
||||
UINT ProtoSessionHash(void *p)
|
||||
{
|
||||
IP *ip;
|
||||
UINT ret = 0;
|
||||
PROTO_SESSION *session = p;
|
||||
|
||||
if (session == NULL)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
ip = &session->SrcIp;
|
||||
if (IsIP6(ip))
|
||||
{
|
||||
UINT i;
|
||||
for (i = 0; i < sizeof(ip->ipv6_addr); ++i)
|
||||
{
|
||||
ret += ip->ipv6_addr[i];
|
||||
}
|
||||
|
||||
ret += ip->ipv6_scope_id;
|
||||
}
|
||||
else
|
||||
{
|
||||
UINT i;
|
||||
for (i = 0; i < sizeof(ip->addr); ++i)
|
||||
{
|
||||
ret += ip->addr[i];
|
||||
}
|
||||
}
|
||||
|
||||
ret += session->SrcPort;
|
||||
|
||||
ip = &session->DstIp;
|
||||
if (IsIP6(ip))
|
||||
{
|
||||
UINT i;
|
||||
for (i = 0; i < sizeof(ip->ipv6_addr); ++i)
|
||||
{
|
||||
ret += ip->ipv6_addr[i];
|
||||
}
|
||||
|
||||
ret += ip->ipv6_scope_id;
|
||||
}
|
||||
else
|
||||
{
|
||||
UINT i;
|
||||
for (i = 0; i < sizeof(ip->addr); ++i)
|
||||
{
|
||||
ret += ip->addr[i];
|
||||
}
|
||||
}
|
||||
|
||||
ret += session->DstPort;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
PROTO *ProtoNew(CEDAR *cedar)
|
||||
{
|
||||
PROTO *proto;
|
||||
@ -31,22 +127,36 @@ PROTO *ProtoNew(CEDAR *cedar)
|
||||
proto = Malloc(sizeof(PROTO));
|
||||
proto->Cedar = cedar;
|
||||
proto->Impls = NewList(ProtoImplCompare);
|
||||
proto->Sessions = NewHashList(ProtoSessionHash, ProtoSessionCompare, 0, true);
|
||||
|
||||
AddRef(cedar->ref);
|
||||
|
||||
// OpenVPN
|
||||
ProtoImplAdd(proto, OvsGetProtoImpl());
|
||||
|
||||
proto->UdpListener = NewUdpListener(ProtoHandleDatagrams, proto, &cedar->Server->ListenIP);
|
||||
|
||||
return proto;
|
||||
}
|
||||
|
||||
void ProtoDelete(PROTO *proto)
|
||||
{
|
||||
UINT i = 0;
|
||||
|
||||
if (proto == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
StopUdpListener(proto->UdpListener);
|
||||
|
||||
for (i = 0; i < HASH_LIST_NUM(proto->Sessions); ++i)
|
||||
{
|
||||
ProtoDeleteSession(LIST_DATA(proto->Sessions->AllList, i));
|
||||
}
|
||||
|
||||
FreeUdpListener(proto->UdpListener);
|
||||
ReleaseHashList(proto->Sessions);
|
||||
ReleaseList(proto->Impls);
|
||||
ReleaseCedar(proto->Cedar);
|
||||
Free(proto);
|
||||
@ -96,6 +206,109 @@ PROTO_IMPL *ProtoImplDetect(PROTO *proto, const PROTO_MODE mode, const UCHAR *da
|
||||
return NULL;
|
||||
}
|
||||
|
||||
PROTO_SESSION *ProtoNewSession(PROTO *proto, PROTO_IMPL *impl, const IP *src_ip, const USHORT src_port, const IP *dst_ip, const USHORT dst_port)
|
||||
{
|
||||
PROTO_SESSION *session;
|
||||
|
||||
if (impl == NULL || src_ip == NULL || src_port == 0 || dst_ip == NULL || dst_port == 0)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
session = ZeroMalloc(sizeof(PROTO_SESSION));
|
||||
|
||||
session->SockEvent = NewSockEvent();
|
||||
session->InterruptManager = NewInterruptManager();
|
||||
|
||||
if (impl->Init != NULL && impl->Init(&session->Param, proto->Cedar, session->InterruptManager, session->SockEvent) == false)
|
||||
{
|
||||
Debug("ProtoNewSession(): failed to initialize %s\n", impl->Name());
|
||||
|
||||
ReleaseSockEvent(session->SockEvent);
|
||||
FreeInterruptManager(session->InterruptManager);
|
||||
Free(session);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
session->Proto = proto;
|
||||
session->Impl = impl;
|
||||
|
||||
CopyIP(&session->SrcIp, src_ip);
|
||||
session->SrcPort = src_port;
|
||||
CopyIP(&session->DstIp, dst_ip);
|
||||
session->DstPort = dst_port;
|
||||
|
||||
session->DatagramsIn = NewListFast(NULL);
|
||||
session->DatagramsOut = NewListFast(NULL);
|
||||
|
||||
session->Lock = NewLock();
|
||||
session->Thread = NewThread(ProtoSessionThread, session);
|
||||
|
||||
return session;
|
||||
}
|
||||
|
||||
void ProtoDeleteSession(PROTO_SESSION *session)
|
||||
{
|
||||
if (session == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
session->Halt = true;
|
||||
SetSockEvent(session->SockEvent);
|
||||
|
||||
WaitThread(session->Thread, INFINITE);
|
||||
ReleaseThread(session->Thread);
|
||||
|
||||
session->Impl->Free(session->Param);
|
||||
|
||||
ReleaseSockEvent(session->SockEvent);
|
||||
FreeInterruptManager(session->InterruptManager);
|
||||
|
||||
ReleaseList(session->DatagramsIn);
|
||||
ReleaseList(session->DatagramsOut);
|
||||
|
||||
DeleteLock(session->Lock);
|
||||
|
||||
Free(session);
|
||||
}
|
||||
|
||||
bool ProtoSetListenIP(PROTO *proto, const IP *ip)
|
||||
{
|
||||
if (proto == NULL || ip == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
Copy(&proto->UdpListener->ListenIP, ip, sizeof(proto->UdpListener->ListenIP));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ProtoSetUdpPorts(PROTO *proto, const LIST *ports)
|
||||
{
|
||||
UINT i = 0;
|
||||
|
||||
if (proto == NULL || ports == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
DeleteAllPortFromUdpListener(proto->UdpListener);
|
||||
|
||||
for (i = 0; i < LIST_NUM(ports); ++i)
|
||||
{
|
||||
UINT port = *((UINT *)LIST_DATA(ports, i));
|
||||
if (port >= 1 && port <= 65535)
|
||||
{
|
||||
AddPortToUdpListener(proto->UdpListener, port);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ProtoHandleConnection(PROTO *proto, SOCK *sock)
|
||||
{
|
||||
void *impl_data = NULL;
|
||||
@ -228,3 +441,120 @@ bool ProtoHandleConnection(PROTO *proto, SOCK *sock)
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void ProtoHandleDatagrams(UDPLISTENER *listener, LIST *datagrams)
|
||||
{
|
||||
UINT i;
|
||||
HASH_LIST *sessions;
|
||||
PROTO *proto = listener->Param;
|
||||
|
||||
if (proto == NULL || listener == NULL || datagrams == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
sessions = proto->Sessions;
|
||||
|
||||
for (i = 0; i < LIST_NUM(datagrams); ++i)
|
||||
{
|
||||
UDPPACKET *datagram = LIST_DATA(datagrams, i);
|
||||
PROTO_SESSION *session, tmp;
|
||||
|
||||
CopyIP(&tmp.SrcIp, &datagram->SrcIP);
|
||||
tmp.SrcPort = datagram->SrcPort;
|
||||
CopyIP(&tmp.DstIp, &datagram->DstIP);
|
||||
tmp.DstPort = datagram->DestPort;
|
||||
|
||||
session = SearchHash(sessions, &tmp);
|
||||
if (session == NULL)
|
||||
{
|
||||
tmp.Impl = ProtoImplDetect(proto, PROTO_MODE_UDP, datagram->Data, datagram->Size);
|
||||
if (tmp.Impl == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
session = ProtoNewSession(proto, tmp.Impl, &tmp.SrcIp, tmp.SrcPort, &tmp.DstIp, tmp.DstPort);
|
||||
if (session == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
AddHash(proto->Sessions, session);
|
||||
}
|
||||
|
||||
if (session->Halt)
|
||||
{
|
||||
DeleteHash(sessions, session);
|
||||
ProtoDeleteSession(session);
|
||||
continue;
|
||||
}
|
||||
|
||||
Lock(session->Lock);
|
||||
{
|
||||
void *data = Clone(datagram->Data, datagram->Size);
|
||||
UDPPACKET *packet = NewUdpPacket(&datagram->SrcIP, datagram->SrcPort, &datagram->DstIP, datagram->DestPort, data, datagram->Size);
|
||||
Add(session->DatagramsIn, packet);
|
||||
}
|
||||
Unlock(session->Lock);
|
||||
}
|
||||
|
||||
for (i = 0; i < LIST_NUM(sessions->AllList); ++i)
|
||||
{
|
||||
PROTO_SESSION *session = LIST_DATA(sessions->AllList, i);
|
||||
if (LIST_NUM(session->DatagramsIn) > 0)
|
||||
{
|
||||
SetSockEvent(session->SockEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ProtoSessionThread(THREAD *thread, void *param)
|
||||
{
|
||||
PROTO_SESSION *session = param;
|
||||
|
||||
if (thread == NULL || session == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
while (session->Halt == false)
|
||||
{
|
||||
bool ok;
|
||||
UINT interval;
|
||||
void *param = session->Param;
|
||||
PROTO_IMPL *impl = session->Impl;
|
||||
LIST *received = session->DatagramsIn;
|
||||
LIST *to_send = session->DatagramsOut;
|
||||
|
||||
Lock(session->Lock);
|
||||
{
|
||||
UINT i;
|
||||
|
||||
ok = impl->ProcessDatagrams(param, received, to_send);
|
||||
|
||||
UdpListenerSendPackets(session->Proto->UdpListener, to_send);
|
||||
|
||||
for (i = 0; i < LIST_NUM(received); ++i)
|
||||
{
|
||||
FreeUdpPacket(LIST_DATA(received, i));
|
||||
}
|
||||
|
||||
DeleteAll(received);
|
||||
DeleteAll(to_send);
|
||||
}
|
||||
Unlock(session->Lock);
|
||||
|
||||
if (ok == false)
|
||||
{
|
||||
Debug("ProtoSessionThread(): breaking main loop\n");
|
||||
session->Halt = true;
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait until the next event occurs
|
||||
interval = GetNextIntervalForInterrupt(session->InterruptManager);
|
||||
interval = MIN(interval, UDPLISTENER_WAIT_INTERVAL);
|
||||
WaitSockEvent(session->SockEvent, interval);
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,8 @@ typedef struct PROTO
|
||||
{
|
||||
CEDAR *Cedar;
|
||||
LIST *Impls;
|
||||
HASH_LIST *Sessions;
|
||||
UDPLISTENER *UdpListener;
|
||||
} PROTO;
|
||||
|
||||
typedef struct PROTO_IMPL
|
||||
@ -26,11 +28,33 @@ typedef struct PROTO_IMPL
|
||||
void (*Free)(void *param);
|
||||
char *(*Name)();
|
||||
bool (*IsPacketForMe)(const PROTO_MODE mode, const UCHAR *data, const UINT size);
|
||||
bool (*ProcessData)(void *param, TCP_RAW_DATA *received_data, FIFO *data_to_send);
|
||||
bool (*ProcessData)(void *param, TCP_RAW_DATA *in, FIFO *out);
|
||||
bool (*ProcessDatagrams)(void *param, LIST *in, LIST *out);
|
||||
void (*BufferLimit)(void *param, const bool reached);
|
||||
} PROTO_IMPL;
|
||||
|
||||
typedef struct PROTO_SESSION
|
||||
{
|
||||
void *Param;
|
||||
PROTO *Proto;
|
||||
PROTO_IMPL *Impl;
|
||||
IP SrcIp;
|
||||
USHORT SrcPort;
|
||||
IP DstIp;
|
||||
USHORT DstPort;
|
||||
LIST *DatagramsIn;
|
||||
LIST *DatagramsOut;
|
||||
SOCK_EVENT *SockEvent;
|
||||
INTERRUPT_MANAGER *InterruptManager;
|
||||
THREAD *Thread;
|
||||
LOCK *Lock;
|
||||
volatile bool Halt;
|
||||
} PROTO_SESSION;
|
||||
|
||||
int ProtoImplCompare(void *p1, void *p2);
|
||||
int ProtoSessionCompare(void *p1, void *p2);
|
||||
|
||||
UINT ProtoSessionHash(void *p);
|
||||
|
||||
PROTO *ProtoNew(CEDAR *cedar);
|
||||
void ProtoDelete(PROTO *proto);
|
||||
@ -38,6 +62,14 @@ void ProtoDelete(PROTO *proto);
|
||||
bool ProtoImplAdd(PROTO *proto, PROTO_IMPL *impl);
|
||||
PROTO_IMPL *ProtoImplDetect(PROTO *proto, const PROTO_MODE mode, const UCHAR *data, const UINT size);
|
||||
|
||||
PROTO_SESSION *ProtoNewSession(PROTO *proto, PROTO_IMPL *impl, const IP *src_ip, const USHORT src_port, const IP *dst_ip, const USHORT dst_port);
|
||||
void ProtoDeleteSession(PROTO_SESSION *session);
|
||||
|
||||
bool ProtoSetListenIP(PROTO *proto, const IP *ip);
|
||||
bool ProtoSetUdpPorts(PROTO *proto, const LIST *ports);
|
||||
|
||||
bool ProtoHandleConnection(PROTO *proto, SOCK *sock);
|
||||
void ProtoHandleDatagrams(UDPLISTENER *listener, LIST *datagrams);
|
||||
void ProtoSessionThread(THREAD *thread, void *param);
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user