Net-Tofu

Net-Tofu is a simple networked multiplayer game. The game does not use a network communication library and the server is compatible with both Windows and Linux. All network functionality is achieved using only the native Winsock(Windows) and socket(Linux) APIs and the UDP network protocol. Players control small white squares that must avoid falling off descending platforms while firing tiny white squares at their opponents. This gameplay video shows players from four different computers playing on the same server.

Some of the network functions built for this game include guaranteed messages, message bundling, throttling, dead-reckoning and timeouts.

View Code

#pragma once

#include "CommonTypes.h"
#include "SimpleException.h"
#include "Util.h"

namespace Lucid
{
	const unsigned MAX_PACKET_SIZE = 512;
	const unsigned UDP_HEADER_RESERVED = 16;
	const unsigned MAX_SENDABLE_MESSAGE = MAX_PACKET_SIZE - UDP_HEADER_RESERVED - sizeof( uint64 );

	const unsigned CONNECTION_TIMEOUT_MILLIS = 5000;
	const unsigned KEEPALIVE_MILLIS = 500;

	const unsigned INITIAL_ESTIMATED_RTT_MILLIS = 32;

	typedef unsigned SessionID;
	const SessionID INVALID_SESSION_ID = (unsigned) -1;

	//=====================================================================
	//=====================================================================
	class UDPSocket;
	class UDPSocketImpl;

	class UDPAddress
	{
	public:
		UDPAddress();
		/**
		* Constructs an address from (port) and (ip)
		* @param port Port
		* @param ip SZ string IP, or nullptr for any IP
		**/
		UDPAddress( unsigned short port, const char* ip );

		bool operator==( const UDPAddress& ) const;
		bool operator<( const UDPAddress& ) const;

		/**
		* Returns true if the port and IP are valid
		*/
		bool isValid() const;

		friend class UDPSocket;
		friend class UDPSocketImpl;

	private:
		unsigned ip;
		unsigned short port;
	};

	//=====================================================================
	//=====================================================================
	class PortInUseException: public IOException
	{ public: const char* what() const throw() { return "Port already bound"; } };
	
	//=====================================================================
	//=====================================================================
	class InvalidSessionException: public IOException
	{ public: const char* what() const throw() { return "Attempt to reference invalid session"; } };

	//=====================================================================
	//=====================================================================
	class NetworkException: public IOException
	{ public: const char* what() const throw() { return "Error in network subsystem"; } };


#pragma warning( disable : 4100 ) // unreferenced parameters
	//=====================================================================
	// Listener class for receiving notifications from the socket
	//=====================================================================
	class UDPSocketListener
	{
	public:
		virtual void ReceivedMessage(
			const void* data, unsigned dataSize,
			SessionID id, const UDPAddress& remoteAddress ) { }

		virtual void SessionStarted(
			SessionID id, const UDPAddress& remoteAddress ) { }

		virtual void SessionEnded(
			SessionID id, const UDPAddress& remoteAddress,
			bool bUndeliveredMessages ) { }
	};

#pragma warning( default : 4100 )

	//=====================================================================
	//=====================================================================

	class UDPSocket
	{
	public:
		/*
		* Constructs a new UDPSocket.
		* @exception NetworkException
		*/
		UDPSocket();

		~UDPSocket();

		void addListener( UDPSocketListener* );
		void removeListener( UDPSocketListener* );

		/*
		* Begins listening for messages on (port)
		* @param port Port on which to accept incoming messages
		* @exception PortInUseException
		* @exception NetworkException 
		*/
		void bindPort( unsigned short port );

		
		// Send a datagram to (remoteAddr)
		// Send will complete during next update() call
		void send( const void* data, unsigned dataSize, const UDPAddress& remoteAddr );
		void send( const void* data, unsigned dataSize, SessionID sessionID );
			// throws InvalidSessionException()

		// Sends a datagram to (remoteAddr) repeatedly until a confirmation
		// is received.
		void sendGuaranteed( const void* data, unsigned dataSize, const UDPAddress& remoteAddr );
		void sendGuaranteed( const void* data, unsigned dataSize, SessionID sessionID );
			// throws InvalidSessionException()

		// Marks a session for termination.
		// Sessions will terminate when all pending guaranteed messages are
		// acknowledged or the session times out.
		void terminateSession( const UDPAddress& remoteAddr );
			// throws InvalidSessionException()
		void terminateSession( SessionID );
			// throws InvalidSessionException()

		void getRemoteAddressByID( SessionID, UDPAddress& outAddress );
			// throws InvalidSessionException()

		unsigned getRTTMillis( SessionID ) const;
			// throws InvalidSessionException()

		// Process sending and receiving.
		void update();
			// throws NetworkException()
			// TODO - more descriptive exceptions here

	private:
		UDPSocketImpl* m_;

		PREVENT_COPYING( UDPSocket )
	};



}
#include "UDPSocket.h"

#undef SIMULATE_PACKET_LOSS
#define SIMULATED_PACKET_LOSS_PERCENT 75

#define SIMULATE_NETWORK_LATENCY
#define ADDITIONAL_LATENCY_MILLIS 15

#ifdef WIN32
	#include <Winsock2.h>
	#pragma comment ( lib, "ws2_32.lib" )
#endif

#ifdef LINUX
	#include <sys/types.h>
	#include <sys/socket.h>
	#include <sys/fcntl.h>
	#include <sys/ioctl.h>
	#include <netinet/in.h>
	#include <arpa/inet.h>
	#include <netdb.h>
	#include <math.h>
	
	#define SOCKET int
	#define INVALID_SOCKET -1
	#define SOCKET_ERROR -1
	typedef sockaddr SOCKADDR;
#endif

#include "LucidTime.h"
#include "Statistics.h"

#include <string.h>
#include <vector>
#include <map>

#ifdef SIMULATE_NETWORK_LATENCY
	#include <queue>
#endif

namespace Lucid
{
	const unsigned RTT_POLL_FREQUENCY_MILLIS = 10000;
	const unsigned INITIAL_RTT_POLL_FREQUENCY_MILLIS = 1000;
	const unsigned RTT_SAMPLES = 10;

	const unsigned MIN_PUBLISH_RATE_MILLIS = 8;
	const unsigned PUBLISH_RATE_PERCENT_RTT = 50;

	const unsigned MIN_GUARANTEED_SENDRATE_MILLIS = 32;
	const unsigned GUARANTEED_SENDRATE_PERCENT_RTT = 200;
	const unsigned GUARANTEED_MESSAGE_MAX_RETRIES = 100;



	//=====================================================================
	//=====================================================================
	unsigned IPFromSZ( const char* sz )
	{
		if ( sz == nullptr ) return 0;

		unsigned result = 0;
		for ( int curQuad = 0; curQuad < 4; ++ curQuad )
		{
			unsigned char curByte = 0;
			int i;

			for ( i = 0; i < 3; ++i )
			{
				if ( sz[ i ] >= '0' && sz[ i ] <= '9' )
					curByte = curByte * 10 + sz[ i ] - '0';
				else
					break;
			}

			if ( i == 0 ) return (unsigned) -1;
			sz += i;

			if ( curQuad < 3 && *sz != '.' ) return (unsigned) -1;
			if ( curQuad == 3 && *sz != 0 ) return (unsigned) -1;
			++sz;

			result = ( result << 8 ) | curByte;
		}

		return result;
	}

	//=====================================================================
	//=====================================================================
	UDPAddress::UDPAddress() : ip( -1 ), port( 0 ) { }

	//=====================================================================
	//=====================================================================
	UDPAddress::UDPAddress( unsigned short port, const char* ip )
		: port( port ), ip( IPFromSZ( ip ) ) { }

	//=====================================================================
	//=====================================================================
	bool UDPAddress::operator==( const UDPAddress& other ) const
	{
		return ip == other.ip && port == other.port;
	}

	bool UDPAddress::operator<( const UDPAddress& other ) const
	{
		if ( ip == other.ip )
			return port < other.port;
		else
			return ip < other.ip;
	}

	//=====================================================================
	//=====================================================================
	bool UDPAddress::isValid() const
	{
		return ip != (unsigned) -1 && ip != 0 &&
			port != (unsigned short) -1 && port != 0;
	}

	//=====================================================================
	//=====================================================================
	enum UDPMessageType : uint64
	{
		PACKET_NORMAL, PACKET_GUARANTEED, MESSAGE_DELIVERY_ACK,
		PACKET_KEEPALIVE, PACKET_PING, PACKET_PING_RESPONSE
	};

	struct UDPHeader
	{
		UDPMessageType type;

		uint64 sequenceNum;
	};

	//=====================================================================
	//=====================================================================
	struct PendingPacket
	{
		UDPAddress destinationAddr;
		unsigned totalSize;
		
		SystemClocks lastSendTimeClocks;
		SystemClocks guaranteedResendClocks;

		unsigned sendAttempts;

		SessionID session;

		union
		{
			char rawData[ MAX_PACKET_SIZE ];
			UDPHeader header;
		};
	};

	//=====================================================================
	//=====================================================================
	struct SessionInfo
	{
		SessionID id;
		UDPAddress address;

		uint64 nextLocalSeq;
		uint64 nextRemoteSeq;

		SystemClocks lastReceiptTimeClocks;
		SystemClocks lastSendTimeClocks;
		SystemClocks lastPingTimeClocks;

		SystemClocks bufferCreationTimeClocks;

		PendingPacket packetBuffer;

		bool bTerminated;
		bool bTimeout;

		unsigned pendingGuaranteedCount;

		RunningAverage< SystemClocks, RTT_SAMPLES > RTTLog;

		void clearBuffer()
		{
			SystemClocks minResendRate = SecondsToClocks( MIN_GUARANTEED_SENDRATE_MILLIS * 0.001 );

			packetBuffer.session = id;

			packetBuffer.destinationAddr = address;
			packetBuffer.totalSize = UDP_HEADER_RESERVED;

			packetBuffer.lastSendTimeClocks = 0;
			packetBuffer.guaranteedResendClocks = ( RTTLog.getAverage() * GUARANTEED_SENDRATE_PERCENT_RTT ) / 100;
			if ( packetBuffer.guaranteedResendClocks < minResendRate )
				packetBuffer.guaranteedResendClocks = minResendRate;

			packetBuffer.header.type = PACKET_NORMAL;
			packetBuffer.header.sequenceNum = 0;

			packetBuffer.sendAttempts = 0;
			bufferCreationTimeClocks = (SystemClocks) -1;
		}
	};

	//=====================================================================
	//=====================================================================
	struct WinsockInitializer
	{

		bool bInitialized;
		WinsockInitializer() : bInitialized( false ) { }

		void InitWinsock() {
			if ( bInitialized ) return;

#ifdef WIN32
			WSADATA data;
			if ( WSAStartup( WINSOCK_VERSION, &data ) != 0 || data.wVersion != WINSOCK_VERSION )
				throw ResourceException( "Error initializing Winsock" );
#endif
		}

#ifdef WIN32
		~WinsockInitializer()
		{ if ( 0 != WSACleanup() ) { } /* trace( "Error shutting down Winsock" ) */ }
#endif
	};

	//=====================================================================
	//=====================================================================
	class UDPSocketImpl
	{
	public:
		UDPSocketImpl()
		{
			s_winsockInit.InitWinsock();

			sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
			if ( sock == INVALID_SOCKET )
				throw NetworkException();	
			
			// Set socket to non blocking
#ifdef WIN32
			unsigned long setNonBlocking = 1;
			if ( 0 != ioctlsocket( sock, FIONBIO, &setNonBlocking ) )
			{
				if ( 0 != closesocket( sock ) ) trace( "Error shutting down socket" );
				throw NetworkException();
			}
#endif

#ifdef LINUX
			int x;
			x = fcntl( sock, F_GETFL, 0 );
			fcntl( sock, F_SETFL, x | O_NONBLOCK );
#endif

			nextSessionID = 0;
		}

		//=================================================================
		//=================================================================
		~UDPSocketImpl()
		{
#ifdef WIN32
			if ( 0 != closesocket( sock ) ) trace( "Error shutting down socket" );
#endif
#ifdef LINUX
			if ( 0 != close( sock ) ) trace( "Error shutting down socket" );
#endif
		}

		//=================================================================
		//=================================================================
		void queueMessage( SessionInfo& session, const void* data,
			unsigned dataSize, bool bGuaranteed = false )
		{
			if ( dataSize > MAX_SENDABLE_MESSAGE )
				throw std::exception( /* "Invalid argument, message too large to send as one packet" */ );

			// No room for the new message in the packet buffer? -publish the buffer
			if ( session.packetBuffer.totalSize + dataSize + sizeof( uint64 ) > MAX_PACKET_SIZE )
				publishPacket( session );

			uint64* ptMessageSize = reinterpret_cast< uint64* >( &session.packetBuffer.rawData[ session.packetBuffer.totalSize ] );
			char* ptMessage = reinterpret_cast< char* >( ptMessageSize + 1 );

			// Copy the message to the packet
			*ptMessageSize = dataSize;
			memcpy( ptMessage, data, dataSize );

			// Align messages within packet to 8-byte boundary
			unsigned paddingAmt = dataSize % sizeof( uint64 );
			if ( paddingAmt != 0 )
			{
				paddingAmt = sizeof( uint64 ) - paddingAmt;

				memset( ptMessage + dataSize, 0, paddingAmt );
			}

			session.packetBuffer.totalSize += dataSize + sizeof( uint64 ) + paddingAmt;

			if ( bGuaranteed ) session.packetBuffer.header.type = PACKET_GUARANTEED;

			if ( session.bufferCreationTimeClocks == (SystemClocks) -1 )
				session.bufferCreationTimeClocks = GetAbsoluteTimeClocks();
		}

		//=================================================================
		//=================================================================
		void publishPacket( SessionInfo& session )
		{
			if ( session.packetBuffer.header.type == PACKET_GUARANTEED )
			{
				session.packetBuffer.header.sequenceNum = session.nextLocalSeq++;
				pendingGuaranteedPackets.push_back( session.packetBuffer );

				++session.pendingGuaranteedCount;
			}
			else
			{
				pendingPackets.push_back( session.packetBuffer );
			}

			session.clearBuffer();

			session.lastSendTimeClocks = GetAbsoluteTimeClocks();
		}

		//=================================================================
		//=================================================================
		void keepAlive( SessionInfo& session )
		{
			PendingPacket keepAlivePacket;

			keepAlivePacket.destinationAddr = session.address;
			keepAlivePacket.lastSendTimeClocks = 0;
			keepAlivePacket.sendAttempts = 0;
			keepAlivePacket.totalSize = UDP_HEADER_RESERVED;

			keepAlivePacket.header.type = PACKET_KEEPALIVE;
			keepAlivePacket.header.sequenceNum = 0;

			keepAlivePacket.session = session.id;

			pendingPackets.push_back( keepAlivePacket );

			session.lastSendTimeClocks = GetAbsoluteTimeClocks();
		}

		//=================================================================
		//=================================================================
		void sendPing ( SessionInfo& session )
		{
			PendingPacket keepAlivePacket;

			keepAlivePacket.destinationAddr = session.address;
			keepAlivePacket.lastSendTimeClocks = 0;
			keepAlivePacket.sendAttempts = 0;
			keepAlivePacket.totalSize = UDP_HEADER_RESERVED;

			keepAlivePacket.header.type = PACKET_PING;
			keepAlivePacket.header.sequenceNum = (SystemClocks) GetAbsoluteTimeClocks();

			keepAlivePacket.session = session.id;

			pendingPackets.push_back( keepAlivePacket );

			session.lastSendTimeClocks = GetAbsoluteTimeClocks();
		}

		//=================================================================
		//=================================================================
		void respondToPing( SessionInfo& session, uint64 timestamp )
		{
			PendingPacket keepAlivePacket;

			keepAlivePacket.destinationAddr = session.address;
			keepAlivePacket.lastSendTimeClocks = 0;
			keepAlivePacket.sendAttempts = 0;
			keepAlivePacket.totalSize = UDP_HEADER_RESERVED;

			keepAlivePacket.header.type = PACKET_PING_RESPONSE;
			keepAlivePacket.header.sequenceNum = (SystemClocks) timestamp;

			keepAlivePacket.session = session.id;

			pendingPackets.push_back( keepAlivePacket );

			session.lastSendTimeClocks = GetAbsoluteTimeClocks();
		}

		//=================================================================
		//=================================================================
		void processAcknowledge( SessionInfo& session, uint64 sequenceNum )
		{
			for ( int i = (int) pendingGuaranteedPackets.size() - 1; i >= 0; --i )
			{
				PendingPacket& curPacket = pendingGuaranteedPackets[ i ];

				if ( curPacket.session == session.id && curPacket.header.sequenceNum <= sequenceNum )
				{
					pendingGuaranteedPackets[ i ] = pendingGuaranteedPackets[ pendingGuaranteedPackets.size() - 1 ];
					pendingGuaranteedPackets.pop_back();

					--session.pendingGuaranteedCount;
				}
			}
		}

		//=================================================================
		//=================================================================
		void sendAcknowledge( SessionInfo& session )
		{
			if ( session.nextRemoteSeq == 0 ) return;

			PendingPacket ackPacket;

			ackPacket.destinationAddr = session.address;
			ackPacket.lastSendTimeClocks = 0;
			ackPacket.sendAttempts = 0;
			ackPacket.totalSize = UDP_HEADER_RESERVED;

			ackPacket.header.type = MESSAGE_DELIVERY_ACK;
			ackPacket.header.sequenceNum = session.nextRemoteSeq - 1;

			ackPacket.session = session.id;

			pendingPackets.push_back( ackPacket );

			session.lastSendTimeClocks = GetAbsoluteTimeClocks();
		}

		//=================================================================
		//=================================================================
		SessionInfo& getSession( SessionID id )
		{
			auto itr = activeSessions.find( id );
			if ( itr == activeSessions.end() )
				throw InvalidSessionException();

			return itr->second;
		}

		//=================================================================
		//=================================================================
		SessionInfo& getSession( const UDPAddress& address )
		{
			// TODO - maybe too algorithmically expensive using O( n )
			for ( auto itr = activeSessions.begin(); itr != activeSessions.end(); ++itr )
				if ( itr->second.address == address )
					return itr->second;

			throw InvalidSessionException();
		}

		//=================================================================
		//=================================================================
		SessionInfo& openSession( const UDPAddress& address )
		{
			SessionInfo newSession;

			newSession.address = address;
			newSession.id = nextSessionID++;

			newSession.lastReceiptTimeClocks = GetAbsoluteTimeClocks();
			newSession.lastSendTimeClocks = 0;
			newSession.lastPingTimeClocks = GetAbsoluteTimeClocks();

			newSession.nextLocalSeq = 0;
			newSession.nextRemoteSeq = 0;

			newSession.bTerminated = false;
			newSession.bTimeout = false;

			newSession.pendingGuaranteedCount = 0;

			newSession.RTTLog.reset( SecondsToClocks( INITIAL_ESTIMATED_RTT_MILLIS * 0.001 ) );
			newSession.clearBuffer();

			newSessions.push_back( newSession.id );

			return ( activeSessions[ newSession.id ] = newSession );
		}

		//=================================================================
		//=================================================================
		SessionInfo& getOrOpenSession( const UDPAddress& address )
		{
			try { 
				return getSession( address );
			}
			catch ( InvalidSessionException ) {
				return openSession( address );
			}
		}

		//=================================================================
		//=================================================================
		void notifyNewSessions()
		{
			try {
				for ( int sessionIdx = 0; sessionIdx < newSessions.size(); ++sessionIdx )
					for ( int listenerIdx = 0; listenerIdx < listeners.size(); ++listenerIdx )
					{
						SessionID newID = newSessions[ sessionIdx ];

						listeners[ listenerIdx ]->SessionStarted( newID,
							getSession( newID ).address );
					}
			} catch ( InvalidSessionException ) {
				PROMISES( false, "Internal error in socket class" );
			}
			newSessions.clear();
		}

		//=================================================================
		//=================================================================
		void notifyReceived( const SessionInfo& session, int bytesRemaining )
		{
			bytesRemaining -= UDP_HEADER_RESERVED;

			UDPHeader* ptPacketHeader = reinterpret_cast< UDPHeader* >( receiveBuffer );
			uint64* ptMessageSize = reinterpret_cast< uint64* >( &receiveBuffer[ UDP_HEADER_RESERVED ] );
			void* ptMessageBody = ptMessageSize + 1;

			while ( bytesRemaining >= sizeof( uint64 ) )
			{
				uint64 readBytes = *ptMessageSize + sizeof( uint64 );

				// Malformed packet?
				if ( readBytes > bytesRemaining )
					throw NetworkException();

				for ( int listenerIdx = 0; listenerIdx < listeners.size(); ++listenerIdx )
				{
					listeners[ listenerIdx ]->ReceivedMessage( ptMessageBody,
						(unsigned) *ptMessageSize, session.id, session.address );
				}

				ptMessageSize = OffsetByBytes( ptMessageSize, readBytes );
				ptMessageBody = ptMessageSize + 1;

				bytesRemaining -= (int) readBytes;
			}
		}

		//=================================================================
		//=================================================================
		bool GetNextPacket( int& outBytesReceived, sockaddr_in& outAddr )
		{
			unsigned long dataAvail = 0;

#ifdef WIN32
			if ( 0 != ioctlsocket( sock, FIONREAD, &dataAvail ) )
				throw NetworkException();
#endif
#ifdef LINUX
			if ( 0 != ioctl( sock, FIONREAD, &dataAvail ) )
				throw NetworkException();
#endif

			if ( dataAvail == 0 ) return false;

			int addrSize = sizeof( sockaddr_in );

#ifdef WIN32
			outBytesReceived = recvfrom( sock, receiveBuffer,
				MAX_PACKET_SIZE, 0, (SOCKADDR*) &outAddr, &addrSize );
#endif
#ifdef LINUX
			outBytesReceived = recvfrom( sock, receiveBuffer,
				MAX_PACKET_SIZE, 0, (SOCKADDR*) &outAddr, (unsigned*) &addrSize );
#endif

#ifdef SIMULATE_PACKET_LOSS
			if ( rand() % 100 < SIMULATED_PACKET_LOSS_PERCENT )
				return;
#endif

#ifdef WIN32
			if ( SOCKET_ERROR == outBytesReceived ) switch ( WSAGetLastError() )
			{
			case WSAEMSGSIZE: // too big
				outBytesReceived = MAX_PACKET_SIZE;
				break;

			case WSAECONNRESET: // port unreachable from previous send() call
			case WSAETIMEDOUT: // took too long
			case WSAENETRESET: // time to live expired
				outBytesReceived = 0;
				return true;
			default:
				throw NetworkException();
			}
#endif

#ifdef LINUX
			if ( 0 == outBytesReceived ) switch ( errno )
			{
			case EMSGSIZE: // too big
				outBytesReceived = MAX_PACKET_SIZE;
				break;

			case ECONNRESET: // port unreachable from previous send() call
			case ETIMEDOUT: // took too long
			case ENETRESET: // time to live expired
				return true;
			default:
				throw NetworkException();
			}
#endif
			return true;
		}

		//=================================================================
		//=================================================================
		void processReceive()
		{
			int bytesReceived;
			sockaddr_in remoteAddrIn;
			UDPAddress remoteAddress;

#ifdef SIMULATE_NETWORK_LATENCY
			while ( GetNextPacket( bytesReceived, remoteAddrIn ) )
			{
				PendingPacket newPacket;

				newPacket.destinationAddr.ip = ntohl( remoteAddrIn.sin_addr.s_addr );
				newPacket.destinationAddr.port = ntohs( remoteAddrIn.sin_port );
				newPacket.lastSendTimeClocks = GetAbsoluteTimeClocks();
				newPacket.totalSize = bytesReceived;

				memcpy( newPacket.rawData, receiveBuffer, MAX_PACKET_SIZE );

				latency_queue.push( newPacket );
			}
#endif
			
			// while there are packets on the socket
			
			for ( ; ; )
			{
				// receive the next packet
#ifdef SIMULATE_NETWORK_LATENCY
				if ( latency_queue.size() == 0 ) return;

				SystemClocks receiveDelayClocks = SecondsToClocks( ADDITIONAL_LATENCY_MILLIS * 0.001 );
				PendingPacket& curPacket = latency_queue.front();
				
				if ( curPacket.lastSendTimeClocks + receiveDelayClocks <= GetAbsoluteTimeClocks() )
				{
					memcpy( receiveBuffer, curPacket.rawData, MAX_PACKET_SIZE );
					remoteAddress = curPacket.destinationAddr;

					bytesReceived = curPacket.totalSize;
					latency_queue.pop();
				}
				else
				{
					return;
				}
#else
				if ( !GetNextPacket( bytesReceived, remoteAddrIn ) ) return;

				remoteAddress.ip = ntohl( remoteAddrIn.sin_addr.s_addr );
				remoteAddress.port = ntohs( remoteAddrIn.sin_port );
#endif
				
				// Got a packet, start processing it

				// Too small to be valid?
				if ( bytesReceived < UDP_HEADER_RESERVED ) continue;

				UDPHeader* header = (UDPHeader*) receiveBuffer;
				
				// Determine the packet's session
				SessionInfo* curSession;

				try {
					curSession = &getSession( remoteAddress );
				}
				catch ( InvalidSessionException ) {
					// No active session for this address

					// Do not start new sessions for OOO guaranteed messages,
					// ACKs, or keepalives
					bool bInvalidPacketForNewSession( false );

					switch ( header->type )
					{
					case PACKET_PING:
					case PACKET_PING_RESPONSE:
					case PACKET_KEEPALIVE:
					case MESSAGE_DELIVERY_ACK:
						bInvalidPacketForNewSession = true;
						break;

					case PACKET_GUARANTEED:
						if ( header->sequenceNum != 0 )
							bInvalidPacketForNewSession = true;
						break;
					}

					if ( bInvalidPacketForNewSession ) continue;

					curSession = &openSession( remoteAddress );

					// To avoid listeners getting a packet before a session started
					// message we need to inform them if there was a new session here
					notifyNewSessions();
				}

				// Update the last activity for the session
				if ( !curSession->bTerminated && !curSession->bTimeout )
					curSession->lastReceiptTimeClocks = GetAbsoluteTimeClocks();

				bool bNotifyListeners( true );

				switch ( header->type )
				{
				case PACKET_PING:
					bNotifyListeners = false;
					respondToPing( *curSession, header->sequenceNum );
					break;

				case PACKET_PING_RESPONSE:
					curSession->RTTLog.logSample(
						GetAbsoluteTimeClocks() - (SystemClocks) header->sequenceNum );
					bNotifyListeners = false;
					break;

				case PACKET_KEEPALIVE:
					bNotifyListeners = false;
					break;

				case MESSAGE_DELIVERY_ACK:
					bNotifyListeners = false;
					processAcknowledge( *curSession, header->sequenceNum );
					break;

				case PACKET_GUARANTEED:
					if ( header->sequenceNum == curSession->nextRemoteSeq )
						++curSession->nextRemoteSeq;
					else
						bNotifyListeners = false;

					sendAcknowledge( *curSession );

					break;

				case PACKET_NORMAL:
					break;

				// Malformed packet
				default: throw NetworkException();
				}
				
				if ( bNotifyListeners )
					notifyReceived( *curSession, bytesReceived );
			}
		}

		//=================================================================
		//=================================================================
		int winSend( PendingPacket& curPacket )
		{
			sockaddr_in remoteAddress;
			remoteAddress.sin_family = AF_INET;
			remoteAddress.sin_port = htons( curPacket.destinationAddr.port );
			remoteAddress.sin_addr.s_addr = htonl( curPacket.destinationAddr.ip );

			return sendto( sock, curPacket.rawData, curPacket.totalSize,
				0, (SOCKADDR*) &remoteAddress, sizeof( sockaddr_in ) );
		}

		//=================================================================
		//=================================================================
		void processSend()
		{
			SystemClocks keepAliveClocks = SecondsToClocks( KEEPALIVE_MILLIS * 0.001 );
			SystemClocks initialPingClocks = SecondsToClocks( INITIAL_RTT_POLL_FREQUENCY_MILLIS * 0.001 );
			SystemClocks normalPingClocks = SecondsToClocks( RTT_POLL_FREQUENCY_MILLIS * 0.001 );
			SystemClocks minPublishClocks = SecondsToClocks( MIN_PUBLISH_RATE_MILLIS * 0.001 );

			SystemClocks curTime = GetAbsoluteTimeClocks();

			for ( auto sessionItr = activeSessions.begin(); sessionItr != activeSessions.end(); ++sessionItr )
			{
				SessionInfo& curSession = sessionItr->second;

				SystemClocks publishClocks = ( curSession.RTTLog.getAverage() * PUBLISH_RATE_PERCENT_RTT ) / 100;
				if ( publishClocks < minPublishClocks ) publishClocks = minPublishClocks;

				// Send RTT pings
				if ( ( curSession.RTTLog.validSamples == RTT_SAMPLES && curSession.lastPingTimeClocks + normalPingClocks <= curTime ) ||
					( curSession.RTTLog.validSamples != RTT_SAMPLES &&curSession.lastPingTimeClocks + initialPingClocks <= curTime ) )
				{
					sendPing( curSession );
					curSession.lastPingTimeClocks = curTime;
				}

				// check sessions for packets that need publishing
				if ( curSession.bufferCreationTimeClocks != -1 && 
					( curSession.bTerminated ||
					curSession.bufferCreationTimeClocks + publishClocks <= curTime ) )
				{
					publishPacket( curSession );
				}

				// check sessions to see if keepalives are required
				else if ( !curSession.bTerminated &&
					curSession.lastSendTimeClocks + keepAliveClocks <= curTime )
				{
					if ( curSession.bufferCreationTimeClocks != -1 )
						publishPacket( curSession );
					else
						keepAlive( curSession );
				}

			}

			// send non-guaranteed packets
			while ( pendingPackets.size() > 0 )
			{
				int result = winSend( pendingPackets[ pendingPackets.size() - 1 ] );

				pendingPackets.pop_back();

				if ( result == SOCKET_ERROR )
					throw NetworkException();
			}
			
			// send any guaranteed packets that are due
			for ( int packetItr = (int) pendingGuaranteedPackets.size() - 1; packetItr >= 0; --packetItr )
			{
				PendingPacket& curPacket = pendingGuaranteedPackets[ packetItr ];

				if ( curPacket.lastSendTimeClocks + curPacket.guaranteedResendClocks <= curTime )
				{
					curPacket.lastSendTimeClocks = curTime;

					if ( ++curPacket.sendAttempts <= GUARANTEED_MESSAGE_MAX_RETRIES )
					{
						if ( winSend( curPacket ) == SOCKET_ERROR )
							throw NetworkException();
					}
					else
					{
						// if we are past its max retries, mark the session as terminated
						auto sessionItr = activeSessions.find( curPacket.session );
						if ( sessionItr != activeSessions.end() )
							sessionItr->second.bTimeout = true;

						// remove the packet from consideration
						pendingGuaranteedPackets[ packetItr ] = pendingGuaranteedPackets[ pendingGuaranteedPackets.size() - 1 ];
						pendingGuaranteedPackets.pop_back();
					}

				}
			}
		}

		//=================================================================
		//=================================================================
		void postUpdate()
		{
			SystemClocks timeoutClocks = SecondsToClocks( CONNECTION_TIMEOUT_MILLIS * 0.001 );
			SystemClocks curTime = GetAbsoluteTimeClocks();

			// check sessions for timeouts and terminations
			for ( auto sessionItr = activeSessions.begin(); sessionItr != activeSessions.end(); )
			{
				SessionInfo& curSession = sessionItr->second;

				if ( curSession.lastReceiptTimeClocks + timeoutClocks <= curTime ||
					curSession.bTimeout ||
					( curSession.bTerminated && curSession.pendingGuaranteedCount == 0 ) )
				{
					// remove any guaranteed messages posted by a terminated session
					processAcknowledge( curSession, (uint64) -1 );

					// notify listeners of terminated sessions
					for ( int listenItr = 0; listenItr < (int) listeners.size(); ++listenItr )
					{
						listeners[ listenItr ]->SessionEnded( curSession.id,
							curSession.address, curSession.bTimeout );
					}

					auto temp = sessionItr;
					++sessionItr;
					activeSessions.erase( temp );
				}
				else
				{
					++sessionItr;
				}
			}

		}

		//=================================================================
		//=================================================================
		static WinsockInitializer s_winsockInit;
		SOCKET sock;
		
		char receiveBuffer[ MAX_PACKET_SIZE ];
		
		std::vector< PendingPacket > pendingPackets;
		std::vector< PendingPacket > pendingGuaranteedPackets;

		std::map< SessionID, SessionInfo > activeSessions;
		unsigned nextSessionID;

		std::vector< UDPSocketListener* > listeners;

		std::vector< SessionID > newSessions;

#ifdef SIMULATE_NETWORK_LATENCY
		std::queue< PendingPacket > latency_queue;
#endif
	};

	WinsockInitializer UDPSocketImpl::s_winsockInit;

	//=====================================================================
	//=====================================================================
	UDPSocket::UDPSocket() : m_( new UDPSocketImpl() ) { }
	UDPSocket::~UDPSocket() { delete m_; }

	//=====================================================================
	//=====================================================================
	void UDPSocket::addListener( UDPSocketListener* newListener )
	{
		removeListener( newListener );
		m_->listeners.push_back( newListener );
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::removeListener( UDPSocketListener* oldListener )
	{
		for ( int i = 0; i < m_->listeners.size(); ++i )
			if ( m_->listeners[ i ] == oldListener )
			{
				m_->listeners[ i ] = m_->listeners[ m_->listeners.size() - 1 ];
				m_->listeners.pop_back();
				--i;
			}
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::bindPort( unsigned short port )
	{
		sockaddr_in bindAddr;
		bindAddr.sin_family = AF_INET;
		bindAddr.sin_port = htons( port );
		bindAddr.sin_addr.s_addr = htonl( INADDR_ANY );
			
		int result = bind( m_->sock, (SOCKADDR *) &bindAddr, sizeof( sockaddr_in ) );
		
#ifdef WIN32
		if ( result == SOCKET_ERROR ) switch ( WSAGetLastError() )
		{
		case WSAEADDRINUSE: throw PortInUseException();
		// TODO - more detail on types of errors
		default: throw NetworkException();
		}
#endif
#ifdef LINUX
		if ( result == SOCKET_ERROR ) switch ( errno )
		{
		case EADDRINUSE: throw PortInUseException();
			// TODO - more detail on types of errors
		default: throw NetworkException();
		}
#endif
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::send( const void* data, unsigned dataSize, const UDPAddress& remoteAddr )
	{
		m_->queueMessage( m_->getOrOpenSession( remoteAddr ), data, dataSize );
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::send( const void* data, unsigned dataSize, SessionID sessionID )
	{
		m_->queueMessage( m_->getSession( sessionID ), data, dataSize );
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::sendGuaranteed( const void* data, unsigned dataSize, const UDPAddress& remoteAddr )
	{
		m_->queueMessage( m_->getOrOpenSession( remoteAddr ), data, dataSize, true );
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::sendGuaranteed( const void* data, unsigned dataSize, SessionID sessionID )
	{
		m_->queueMessage( m_->getSession( sessionID ), data, dataSize, true );
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::terminateSession( const UDPAddress& remoteAddr )
	{
		m_->getSession( remoteAddr ).bTerminated = true;
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::terminateSession( SessionID id )
	{
		m_->getSession( id ).bTerminated = true;
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::getRemoteAddressByID( SessionID sessionID, UDPAddress& outAddress )
	{
		outAddress = m_->getSession( sessionID ).address;
	}


	//=====================================================================
	//=====================================================================
	unsigned UDPSocket::getRTTMillis( SessionID id ) const
	{
		return (unsigned) ceil( ClocksToSeconds(
			m_->getSession( id ).RTTLog.getAverage() ) * 1000.0 );
	}

	//=====================================================================
	//=====================================================================
	void UDPSocket::update()
	{
		m_->notifyNewSessions();

		m_->processReceive();
		m_->processSend();

		m_->postUpdate();
	}
}