diff --git a/src/Cedar/Proto.c b/src/Cedar/Proto.c index b15f8e52..d4299da0 100644 --- a/src/Cedar/Proto.c +++ b/src/Cedar/Proto.c @@ -19,6 +19,102 @@ int ProtoImplCompare(void *p1, void *p2) return false; } +int ProtoSessionCompare(void *p1, void *p2) +{ + int ret; + PROTO_SESSION *session_1, *session_2; + + if (p1 == NULL || p2 == NULL) + { + return 0; + } + + session_1 = *(PROTO_SESSION **)p1; + session_2 = *(PROTO_SESSION **)p2; + + // The source port must match + ret = COMPARE_RET(session_1->SrcPort, session_2->SrcPort); + if (ret != 0) + { + return ret; + } + + // The destination port must match + ret = COMPARE_RET(session_1->DstPort, session_2->DstPort); + if (ret != 0) + { + return ret; + } + + // The source IP address must match + ret = CmpIpAddr(&session_1->SrcIp, &session_2->SrcIp); + if (ret != 0) + { + return ret; + } + + // The destination IP address must match + return CmpIpAddr(&session_1->DstIp, &session_2->DstIp); +} + +UINT ProtoSessionHash(void *p) +{ + IP *ip; + UINT ret = 0; + PROTO_SESSION *session = p; + + if (session == NULL) + { + return 0; + } + + ip = &session->SrcIp; + if (IsIP6(ip)) + { + UINT i; + for (i = 0; i < sizeof(ip->ipv6_addr); ++i) + { + ret += ip->ipv6_addr[i]; + } + + ret += ip->ipv6_scope_id; + } + else + { + UINT i; + for (i = 0; i < sizeof(ip->addr); ++i) + { + ret += ip->addr[i]; + } + } + + ret += session->SrcPort; + + ip = &session->DstIp; + if (IsIP6(ip)) + { + UINT i; + for (i = 0; i < sizeof(ip->ipv6_addr); ++i) + { + ret += ip->ipv6_addr[i]; + } + + ret += ip->ipv6_scope_id; + } + else + { + UINT i; + for (i = 0; i < sizeof(ip->addr); ++i) + { + ret += ip->addr[i]; + } + } + + ret += session->DstPort; + + return ret; +} + PROTO *ProtoNew(CEDAR *cedar) { PROTO *proto; @@ -31,22 +127,36 @@ PROTO *ProtoNew(CEDAR *cedar) proto = Malloc(sizeof(PROTO)); proto->Cedar = cedar; proto->Impls = NewList(ProtoImplCompare); + proto->Sessions = NewHashList(ProtoSessionHash, ProtoSessionCompare, 0, true); AddRef(cedar->ref); // OpenVPN ProtoImplAdd(proto, OvsGetProtoImpl()); + proto->UdpListener = NewUdpListener(ProtoHandleDatagrams, proto, &cedar->Server->ListenIP); + return proto; } void ProtoDelete(PROTO *proto) { + UINT i = 0; + if (proto == NULL) { return; } + StopUdpListener(proto->UdpListener); + + for (i = 0; i < HASH_LIST_NUM(proto->Sessions); ++i) + { + ProtoDeleteSession(LIST_DATA(proto->Sessions->AllList, i)); + } + + FreeUdpListener(proto->UdpListener); + ReleaseHashList(proto->Sessions); ReleaseList(proto->Impls); ReleaseCedar(proto->Cedar); Free(proto); @@ -96,6 +206,109 @@ PROTO_IMPL *ProtoImplDetect(PROTO *proto, const PROTO_MODE mode, const UCHAR *da return NULL; } +PROTO_SESSION *ProtoNewSession(PROTO *proto, PROTO_IMPL *impl, const IP *src_ip, const USHORT src_port, const IP *dst_ip, const USHORT dst_port) +{ + PROTO_SESSION *session; + + if (impl == NULL || src_ip == NULL || src_port == 0 || dst_ip == NULL || dst_port == 0) + { + return NULL; + } + + session = ZeroMalloc(sizeof(PROTO_SESSION)); + + session->SockEvent = NewSockEvent(); + session->InterruptManager = NewInterruptManager(); + + if (impl->Init != NULL && impl->Init(&session->Param, proto->Cedar, session->InterruptManager, session->SockEvent) == false) + { + Debug("ProtoNewSession(): failed to initialize %s\n", impl->Name()); + + ReleaseSockEvent(session->SockEvent); + FreeInterruptManager(session->InterruptManager); + Free(session); + + return NULL; + } + + session->Proto = proto; + session->Impl = impl; + + CopyIP(&session->SrcIp, src_ip); + session->SrcPort = src_port; + CopyIP(&session->DstIp, dst_ip); + session->DstPort = dst_port; + + session->DatagramsIn = NewListFast(NULL); + session->DatagramsOut = NewListFast(NULL); + + session->Lock = NewLock(); + session->Thread = NewThread(ProtoSessionThread, session); + + return session; +} + +void ProtoDeleteSession(PROTO_SESSION *session) +{ + if (session == NULL) + { + return; + } + + session->Halt = true; + SetSockEvent(session->SockEvent); + + WaitThread(session->Thread, INFINITE); + ReleaseThread(session->Thread); + + session->Impl->Free(session->Param); + + ReleaseSockEvent(session->SockEvent); + FreeInterruptManager(session->InterruptManager); + + ReleaseList(session->DatagramsIn); + ReleaseList(session->DatagramsOut); + + DeleteLock(session->Lock); + + Free(session); +} + +bool ProtoSetListenIP(PROTO *proto, const IP *ip) +{ + if (proto == NULL || ip == NULL) + { + return false; + } + + Copy(&proto->UdpListener->ListenIP, ip, sizeof(proto->UdpListener->ListenIP)); + + return true; +} + +bool ProtoSetUdpPorts(PROTO *proto, const LIST *ports) +{ + UINT i = 0; + + if (proto == NULL || ports == NULL) + { + return false; + } + + DeleteAllPortFromUdpListener(proto->UdpListener); + + for (i = 0; i < LIST_NUM(ports); ++i) + { + UINT port = *((UINT *)LIST_DATA(ports, i)); + if (port >= 1 && port <= 65535) + { + AddPortToUdpListener(proto->UdpListener, port); + } + } + + return true; +} + bool ProtoHandleConnection(PROTO *proto, SOCK *sock) { void *impl_data = NULL; @@ -228,3 +441,120 @@ bool ProtoHandleConnection(PROTO *proto, SOCK *sock) return true; } + +void ProtoHandleDatagrams(UDPLISTENER *listener, LIST *datagrams) +{ + UINT i; + HASH_LIST *sessions; + PROTO *proto = listener->Param; + + if (proto == NULL || listener == NULL || datagrams == NULL) + { + return; + } + + sessions = proto->Sessions; + + for (i = 0; i < LIST_NUM(datagrams); ++i) + { + UDPPACKET *datagram = LIST_DATA(datagrams, i); + PROTO_SESSION *session, tmp; + + CopyIP(&tmp.SrcIp, &datagram->SrcIP); + tmp.SrcPort = datagram->SrcPort; + CopyIP(&tmp.DstIp, &datagram->DstIP); + tmp.DstPort = datagram->DestPort; + + session = SearchHash(sessions, &tmp); + if (session == NULL) + { + tmp.Impl = ProtoImplDetect(proto, PROTO_MODE_UDP, datagram->Data, datagram->Size); + if (tmp.Impl == NULL) + { + continue; + } + + session = ProtoNewSession(proto, tmp.Impl, &tmp.SrcIp, tmp.SrcPort, &tmp.DstIp, tmp.DstPort); + if (session == NULL) + { + continue; + } + + AddHash(proto->Sessions, session); + } + + if (session->Halt) + { + DeleteHash(sessions, session); + ProtoDeleteSession(session); + continue; + } + + Lock(session->Lock); + { + void *data = Clone(datagram->Data, datagram->Size); + UDPPACKET *packet = NewUdpPacket(&datagram->SrcIP, datagram->SrcPort, &datagram->DstIP, datagram->DestPort, data, datagram->Size); + Add(session->DatagramsIn, packet); + } + Unlock(session->Lock); + } + + for (i = 0; i < LIST_NUM(sessions->AllList); ++i) + { + PROTO_SESSION *session = LIST_DATA(sessions->AllList, i); + if (LIST_NUM(session->DatagramsIn) > 0) + { + SetSockEvent(session->SockEvent); + } + } +} + +void ProtoSessionThread(THREAD *thread, void *param) +{ + PROTO_SESSION *session = param; + + if (thread == NULL || session == NULL) + { + return; + } + + while (session->Halt == false) + { + bool ok; + UINT interval; + void *param = session->Param; + PROTO_IMPL *impl = session->Impl; + LIST *received = session->DatagramsIn; + LIST *to_send = session->DatagramsOut; + + Lock(session->Lock); + { + UINT i; + + ok = impl->ProcessDatagrams(param, received, to_send); + + UdpListenerSendPackets(session->Proto->UdpListener, to_send); + + for (i = 0; i < LIST_NUM(received); ++i) + { + FreeUdpPacket(LIST_DATA(received, i)); + } + + DeleteAll(received); + DeleteAll(to_send); + } + Unlock(session->Lock); + + if (ok == false) + { + Debug("ProtoSessionThread(): breaking main loop\n"); + session->Halt = true; + break; + } + + // Wait until the next event occurs + interval = GetNextIntervalForInterrupt(session->InterruptManager); + interval = MIN(interval, UDPLISTENER_WAIT_INTERVAL); + WaitSockEvent(session->SockEvent, interval); + } +} diff --git a/src/Cedar/Proto.h b/src/Cedar/Proto.h index 0fcc5e69..b9b946f2 100644 --- a/src/Cedar/Proto.h +++ b/src/Cedar/Proto.h @@ -18,6 +18,8 @@ typedef struct PROTO { CEDAR *Cedar; LIST *Impls; + HASH_LIST *Sessions; + UDPLISTENER *UdpListener; } PROTO; typedef struct PROTO_IMPL @@ -26,11 +28,33 @@ typedef struct PROTO_IMPL void (*Free)(void *param); char *(*Name)(); bool (*IsPacketForMe)(const PROTO_MODE mode, const UCHAR *data, const UINT size); - bool (*ProcessData)(void *param, TCP_RAW_DATA *received_data, FIFO *data_to_send); + bool (*ProcessData)(void *param, TCP_RAW_DATA *in, FIFO *out); + bool (*ProcessDatagrams)(void *param, LIST *in, LIST *out); void (*BufferLimit)(void *param, const bool reached); } PROTO_IMPL; +typedef struct PROTO_SESSION +{ + void *Param; + PROTO *Proto; + PROTO_IMPL *Impl; + IP SrcIp; + USHORT SrcPort; + IP DstIp; + USHORT DstPort; + LIST *DatagramsIn; + LIST *DatagramsOut; + SOCK_EVENT *SockEvent; + INTERRUPT_MANAGER *InterruptManager; + THREAD *Thread; + LOCK *Lock; + volatile bool Halt; +} PROTO_SESSION; + int ProtoImplCompare(void *p1, void *p2); +int ProtoSessionCompare(void *p1, void *p2); + +UINT ProtoSessionHash(void *p); PROTO *ProtoNew(CEDAR *cedar); void ProtoDelete(PROTO *proto); @@ -38,6 +62,14 @@ void ProtoDelete(PROTO *proto); bool ProtoImplAdd(PROTO *proto, PROTO_IMPL *impl); PROTO_IMPL *ProtoImplDetect(PROTO *proto, const PROTO_MODE mode, const UCHAR *data, const UINT size); +PROTO_SESSION *ProtoNewSession(PROTO *proto, PROTO_IMPL *impl, const IP *src_ip, const USHORT src_port, const IP *dst_ip, const USHORT dst_port); +void ProtoDeleteSession(PROTO_SESSION *session); + +bool ProtoSetListenIP(PROTO *proto, const IP *ip); +bool ProtoSetUdpPorts(PROTO *proto, const LIST *ports); + bool ProtoHandleConnection(PROTO *proto, SOCK *sock); +void ProtoHandleDatagrams(UDPLISTENER *listener, LIST *datagrams); +void ProtoSessionThread(THREAD *thread, void *param); #endif