Net-Tofu
|
Net-Tofu is a simple networked multiplayer game. The game does not use a network communication library and the server is compatible with both Windows and Linux. All network functionality is achieved using only the native Winsock(Windows) and socket(Linux) APIs and the UDP network protocol. Players control small white squares that must avoid falling off descending platforms while firing tiny white squares at their opponents. This gameplay video shows players from four different computers playing on the same server. Some of the network functions built for this game include guaranteed messages, message bundling, throttling, dead-reckoning and timeouts. |
View Code
#include "CommonTypes.h"
#include "SimpleException.h"
#include "Util.h"
namespace Lucid
{
const unsigned MAX_PACKET_SIZE = 512;
const unsigned UDP_HEADER_RESERVED = 16;
const unsigned MAX_SENDABLE_MESSAGE = MAX_PACKET_SIZE – UDP_HEADER_RESERVED – sizeof( uint64 );
const unsigned CONNECTION_TIMEOUT_MILLIS = 5000;
const unsigned KEEPALIVE_MILLIS = 500;
const unsigned INITIAL_ESTIMATED_RTT_MILLIS = 32;
typedef unsigned SessionID;
const SessionID INVALID_SESSION_ID = (unsigned) -1;
//=====================================================================
//=====================================================================
class UDPSocket;
class UDPSocketImpl;
class UDPAddress
{
public:
UDPAddress();
/**
* Constructs an address from (port) and (ip)
* @param port Port
* @param ip SZ string IP, or nullptr for any IP
**/
UDPAddress( unsigned short port, const char* ip );
bool operator==( const UDPAddress& ) const;
bool operator<( const UDPAddress& ) const;
/**
* Returns true if the port and IP are valid
*/
bool isValid() const;
friend class UDPSocket;
friend class UDPSocketImpl;
private:
unsigned ip;
unsigned short port;
};
//=====================================================================
//=====================================================================
class PortInUseException: public IOException
{ public: const char* what() const throw() { return "Port already bound"; } };
//=====================================================================
//=====================================================================
class InvalidSessionException: public IOException
{ public: const char* what() const throw() { return "Attempt to reference invalid session"; } };
//=====================================================================
//=====================================================================
class NetworkException: public IOException
{ public: const char* what() const throw() { return "Error in network subsystem"; } };
#pragma warning( disable : 4100 ) // unreferenced parameters
//=====================================================================
// Listener class for receiving notifications from the socket
//=====================================================================
class UDPSocketListener
{
public:
virtual void ReceivedMessage(
const void* data, unsigned dataSize,
SessionID id, const UDPAddress& remoteAddress ) { }
virtual void SessionStarted(
SessionID id, const UDPAddress& remoteAddress ) { }
virtual void SessionEnded(
SessionID id, const UDPAddress& remoteAddress,
bool bUndeliveredMessages ) { }
};
#pragma warning( default : 4100 )
//=====================================================================
//=====================================================================
class UDPSocket
{
public:
/*
* Constructs a new UDPSocket.
* @exception NetworkException
*/
UDPSocket();
~UDPSocket();
void addListener( UDPSocketListener* );
void removeListener( UDPSocketListener* );
/*
* Begins listening for messages on (port)
* @param port Port on which to accept incoming messages
* @exception PortInUseException
* @exception NetworkException
*/
void bindPort( unsigned short port );
// Send a datagram to (remoteAddr)
// Send will complete during next update() call
void send( const void* data, unsigned dataSize, const UDPAddress& remoteAddr );
void send( const void* data, unsigned dataSize, SessionID sessionID );
// throws InvalidSessionException()
// Sends a datagram to (remoteAddr) repeatedly until a confirmation
// is received.
void sendGuaranteed( const void* data, unsigned dataSize, const UDPAddress& remoteAddr );
void sendGuaranteed( const void* data, unsigned dataSize, SessionID sessionID );
// throws InvalidSessionException()
// Marks a session for termination.
// Sessions will terminate when all pending guaranteed messages are
// acknowledged or the session times out.
void terminateSession( const UDPAddress& remoteAddr );
// throws InvalidSessionException()
void terminateSession( SessionID );
// throws InvalidSessionException()
void getRemoteAddressByID( SessionID, UDPAddress& outAddress );
// throws InvalidSessionException()
unsigned getRTTMillis( SessionID ) const;
// throws InvalidSessionException()
// Process sending and receiving.
void update();
// throws NetworkException()
// TODO – more descriptive exceptions here
private:
UDPSocketImpl* m_;
PREVENT_COPYING( UDPSocket )
};
}
[/code]
#undef SIMULATE_PACKET_LOSS
#define SIMULATED_PACKET_LOSS_PERCENT 75
#define SIMULATE_NETWORK_LATENCY
#define ADDITIONAL_LATENCY_MILLIS 15
#ifdef WIN32
#include <Winsock2.h>
#pragma comment ( lib, "ws2_32.lib" )
#endif
#ifdef LINUX
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/fcntl.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <math.h>
#define SOCKET int
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
typedef sockaddr SOCKADDR;
#endif
#include "LucidTime.h"
#include "Statistics.h"
#include <string.h>
#include <vector>
#include <map>
#ifdef SIMULATE_NETWORK_LATENCY
#include <queue>
#endif
namespace Lucid
{
const unsigned RTT_POLL_FREQUENCY_MILLIS = 10000;
const unsigned INITIAL_RTT_POLL_FREQUENCY_MILLIS = 1000;
const unsigned RTT_SAMPLES = 10;
const unsigned MIN_PUBLISH_RATE_MILLIS = 8;
const unsigned PUBLISH_RATE_PERCENT_RTT = 50;
const unsigned MIN_GUARANTEED_SENDRATE_MILLIS = 32;
const unsigned GUARANTEED_SENDRATE_PERCENT_RTT = 200;
const unsigned GUARANTEED_MESSAGE_MAX_RETRIES = 100;
//=====================================================================
//=====================================================================
unsigned IPFromSZ( const char* sz )
{
if ( sz == nullptr ) return 0;
unsigned result = 0;
for ( int curQuad = 0; curQuad < 4; ++ curQuad )
{
unsigned char curByte = 0;
int i;
for ( i = 0; i < 3; ++i )
{
if ( sz[ i ] >= ‘0’ && sz[ i ] <= ‘9’ )
curByte = curByte * 10 + sz[ i ] – ‘0’;
else
break;
}
if ( i == 0 ) return (unsigned) -1;
sz += i;
if ( curQuad < 3 && *sz != ‘.’ ) return (unsigned) -1;
if ( curQuad == 3 && *sz != 0 ) return (unsigned) -1;
++sz;
result = ( result << 8 ) | curByte;
}
return result;
}
//=====================================================================
//=====================================================================
UDPAddress::UDPAddress() : ip( -1 ), port( 0 ) { }
//=====================================================================
//=====================================================================
UDPAddress::UDPAddress( unsigned short port, const char* ip )
: port( port ), ip( IPFromSZ( ip ) ) { }
//=====================================================================
//=====================================================================
bool UDPAddress::operator==( const UDPAddress& other ) const
{
return ip == other.ip && port == other.port;
}
bool UDPAddress::operator<( const UDPAddress& other ) const
{
if ( ip == other.ip )
return port < other.port;
else
return ip < other.ip;
}
//=====================================================================
//=====================================================================
bool UDPAddress::isValid() const
{
return ip != (unsigned) -1 && ip != 0 &&
port != (unsigned short) -1 && port != 0;
}
//=====================================================================
//=====================================================================
enum UDPMessageType : uint64
{
PACKET_NORMAL, PACKET_GUARANTEED, MESSAGE_DELIVERY_ACK,
PACKET_KEEPALIVE, PACKET_PING, PACKET_PING_RESPONSE
};
struct UDPHeader
{
UDPMessageType type;
uint64 sequenceNum;
};
//=====================================================================
//=====================================================================
struct PendingPacket
{
UDPAddress destinationAddr;
unsigned totalSize;
SystemClocks lastSendTimeClocks;
SystemClocks guaranteedResendClocks;
unsigned sendAttempts;
SessionID session;
union
{
char rawData[ MAX_PACKET_SIZE ];
UDPHeader header;
};
};
//=====================================================================
//=====================================================================
struct SessionInfo
{
SessionID id;
UDPAddress address;
uint64 nextLocalSeq;
uint64 nextRemoteSeq;
SystemClocks lastReceiptTimeClocks;
SystemClocks lastSendTimeClocks;
SystemClocks lastPingTimeClocks;
SystemClocks bufferCreationTimeClocks;
PendingPacket packetBuffer;
bool bTerminated;
bool bTimeout;
unsigned pendingGuaranteedCount;
RunningAverage< SystemClocks, RTT_SAMPLES > RTTLog;
void clearBuffer()
{
SystemClocks minResendRate = SecondsToClocks( MIN_GUARANTEED_SENDRATE_MILLIS * 0.001 );
packetBuffer.session = id;
packetBuffer.destinationAddr = address;
packetBuffer.totalSize = UDP_HEADER_RESERVED;
packetBuffer.lastSendTimeClocks = 0;
packetBuffer.guaranteedResendClocks = ( RTTLog.getAverage() * GUARANTEED_SENDRATE_PERCENT_RTT ) / 100;
if ( packetBuffer.guaranteedResendClocks < minResendRate )
packetBuffer.guaranteedResendClocks = minResendRate;
packetBuffer.header.type = PACKET_NORMAL;
packetBuffer.header.sequenceNum = 0;
packetBuffer.sendAttempts = 0;
bufferCreationTimeClocks = (SystemClocks) -1;
}
};
//=====================================================================
//=====================================================================
struct WinsockInitializer
{
bool bInitialized;
WinsockInitializer() : bInitialized( false ) { }
void InitWinsock() {
if ( bInitialized ) return;
#ifdef WIN32
WSADATA data;
if ( WSAStartup( WINSOCK_VERSION, &data ) != 0 || data.wVersion != WINSOCK_VERSION )
throw ResourceException( "Error initializing Winsock" );
#endif
}
#ifdef WIN32
~WinsockInitializer()
{ if ( 0 != WSACleanup() ) { } /* trace( "Error shutting down Winsock" ) */ }
#endif
};
//=====================================================================
//=====================================================================
class UDPSocketImpl
{
public:
UDPSocketImpl()
{
s_winsockInit.InitWinsock();
sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if ( sock == INVALID_SOCKET )
throw NetworkException();
// Set socket to non blocking
#ifdef WIN32
unsigned long setNonBlocking = 1;
if ( 0 != ioctlsocket( sock, FIONBIO, &setNonBlocking ) )
{
if ( 0 != closesocket( sock ) ) trace( "Error shutting down socket" );
throw NetworkException();
}
#endif
#ifdef LINUX
int x;
x = fcntl( sock, F_GETFL, 0 );
fcntl( sock, F_SETFL, x | O_NONBLOCK );
#endif
nextSessionID = 0;
}
//=================================================================
//=================================================================
~UDPSocketImpl()
{
#ifdef WIN32
if ( 0 != closesocket( sock ) ) trace( "Error shutting down socket" );
#endif
#ifdef LINUX
if ( 0 != close( sock ) ) trace( "Error shutting down socket" );
#endif
}
//=================================================================
//=================================================================
void queueMessage( SessionInfo& session, const void* data,
unsigned dataSize, bool bGuaranteed = false )
{
if ( dataSize > MAX_SENDABLE_MESSAGE )
throw std::exception( /* "Invalid argument, message too large to send as one packet" */ );
// No room for the new message in the packet buffer? -publish the buffer
if ( session.packetBuffer.totalSize + dataSize + sizeof( uint64 ) > MAX_PACKET_SIZE )
publishPacket( session );
uint64* ptMessageSize = reinterpret_cast< uint64* >( &session.packetBuffer.rawData[ session.packetBuffer.totalSize ] );
char* ptMessage = reinterpret_cast< char* >( ptMessageSize + 1 );
// Copy the message to the packet
*ptMessageSize = dataSize;
memcpy( ptMessage, data, dataSize );
// Align messages within packet to 8-byte boundary
unsigned paddingAmt = dataSize % sizeof( uint64 );
if ( paddingAmt != 0 )
{
paddingAmt = sizeof( uint64 ) – paddingAmt;
memset( ptMessage + dataSize, 0, paddingAmt );
}
session.packetBuffer.totalSize += dataSize + sizeof( uint64 ) + paddingAmt;
if ( bGuaranteed ) session.packetBuffer.header.type = PACKET_GUARANTEED;
if ( session.bufferCreationTimeClocks == (SystemClocks) -1 )
session.bufferCreationTimeClocks = GetAbsoluteTimeClocks();
}
//=================================================================
//=================================================================
void publishPacket( SessionInfo& session )
{
if ( session.packetBuffer.header.type == PACKET_GUARANTEED )
{
session.packetBuffer.header.sequenceNum = session.nextLocalSeq++;
pendingGuaranteedPackets.push_back( session.packetBuffer );
++session.pendingGuaranteedCount;
}
else
{
pendingPackets.push_back( session.packetBuffer );
}
session.clearBuffer();
session.lastSendTimeClocks = GetAbsoluteTimeClocks();
}
//=================================================================
//=================================================================
void keepAlive( SessionInfo& session )
{
PendingPacket keepAlivePacket;
keepAlivePacket.destinationAddr = session.address;
keepAlivePacket.lastSendTimeClocks = 0;
keepAlivePacket.sendAttempts = 0;
keepAlivePacket.totalSize = UDP_HEADER_RESERVED;
keepAlivePacket.header.type = PACKET_KEEPALIVE;
keepAlivePacket.header.sequenceNum = 0;
keepAlivePacket.session = session.id;
pendingPackets.push_back( keepAlivePacket );
session.lastSendTimeClocks = GetAbsoluteTimeClocks();
}
//=================================================================
//=================================================================
void sendPing ( SessionInfo& session )
{
PendingPacket keepAlivePacket;
keepAlivePacket.destinationAddr = session.address;
keepAlivePacket.lastSendTimeClocks = 0;
keepAlivePacket.sendAttempts = 0;
keepAlivePacket.totalSize = UDP_HEADER_RESERVED;
keepAlivePacket.header.type = PACKET_PING;
keepAlivePacket.header.sequenceNum = (SystemClocks) GetAbsoluteTimeClocks();
keepAlivePacket.session = session.id;
pendingPackets.push_back( keepAlivePacket );
session.lastSendTimeClocks = GetAbsoluteTimeClocks();
}
//=================================================================
//=================================================================
void respondToPing( SessionInfo& session, uint64 timestamp )
{
PendingPacket keepAlivePacket;
keepAlivePacket.destinationAddr = session.address;
keepAlivePacket.lastSendTimeClocks = 0;
keepAlivePacket.sendAttempts = 0;
keepAlivePacket.totalSize = UDP_HEADER_RESERVED;
keepAlivePacket.header.type = PACKET_PING_RESPONSE;
keepAlivePacket.header.sequenceNum = (SystemClocks) timestamp;
keepAlivePacket.session = session.id;
pendingPackets.push_back( keepAlivePacket );
session.lastSendTimeClocks = GetAbsoluteTimeClocks();
}
//=================================================================
//=================================================================
void processAcknowledge( SessionInfo& session, uint64 sequenceNum )
{
for ( int i = (int) pendingGuaranteedPackets.size() – 1; i >= 0; –i )
{
PendingPacket& curPacket = pendingGuaranteedPackets[ i ];
if ( curPacket.session == session.id && curPacket.header.sequenceNum <= sequenceNum )
{
pendingGuaranteedPackets[ i ] = pendingGuaranteedPackets[ pendingGuaranteedPackets.size() – 1 ];
pendingGuaranteedPackets.pop_back();
–session.pendingGuaranteedCount;
}
}
}
//=================================================================
//=================================================================
void sendAcknowledge( SessionInfo& session )
{
if ( session.nextRemoteSeq == 0 ) return;
PendingPacket ackPacket;
ackPacket.destinationAddr = session.address;
ackPacket.lastSendTimeClocks = 0;
ackPacket.sendAttempts = 0;
ackPacket.totalSize = UDP_HEADER_RESERVED;
ackPacket.header.type = MESSAGE_DELIVERY_ACK;
ackPacket.header.sequenceNum = session.nextRemoteSeq – 1;
ackPacket.session = session.id;
pendingPackets.push_back( ackPacket );
session.lastSendTimeClocks = GetAbsoluteTimeClocks();
}
//=================================================================
//=================================================================
SessionInfo& getSession( SessionID id )
{
auto itr = activeSessions.find( id );
if ( itr == activeSessions.end() )
throw InvalidSessionException();
return itr->second;
}
//=================================================================
//=================================================================
SessionInfo& getSession( const UDPAddress& address )
{
// TODO – maybe too algorithmically expensive using O( n )
for ( auto itr = activeSessions.begin(); itr != activeSessions.end(); ++itr )
if ( itr->second.address == address )
return itr->second;
throw InvalidSessionException();
}
//=================================================================
//=================================================================
SessionInfo& openSession( const UDPAddress& address )
{
SessionInfo newSession;
newSession.address = address;
newSession.id = nextSessionID++;
newSession.lastReceiptTimeClocks = GetAbsoluteTimeClocks();
newSession.lastSendTimeClocks = 0;
newSession.lastPingTimeClocks = GetAbsoluteTimeClocks();
newSession.nextLocalSeq = 0;
newSession.nextRemoteSeq = 0;
newSession.bTerminated = false;
newSession.bTimeout = false;
newSession.pendingGuaranteedCount = 0;
newSession.RTTLog.reset( SecondsToClocks( INITIAL_ESTIMATED_RTT_MILLIS * 0.001 ) );
newSession.clearBuffer();
newSessions.push_back( newSession.id );
return ( activeSessions[ newSession.id ] = newSession );
}
//=================================================================
//=================================================================
SessionInfo& getOrOpenSession( const UDPAddress& address )
{
try {
return getSession( address );
}
catch ( InvalidSessionException ) {
return openSession( address );
}
}
//=================================================================
//=================================================================
void notifyNewSessions()
{
try {
for ( int sessionIdx = 0; sessionIdx < newSessions.size(); ++sessionIdx )
for ( int listenerIdx = 0; listenerIdx < listeners.size(); ++listenerIdx )
{
SessionID newID = newSessions[ sessionIdx ];
listeners[ listenerIdx ]->SessionStarted( newID,
getSession( newID ).address );
}
} catch ( InvalidSessionException ) {
PROMISES( false, "Internal error in socket class" );
}
newSessions.clear();
}
//=================================================================
//=================================================================
void notifyReceived( const SessionInfo& session, int bytesRemaining )
{
bytesRemaining -= UDP_HEADER_RESERVED;
UDPHeader* ptPacketHeader = reinterpret_cast< UDPHeader* >( receiveBuffer );
uint64* ptMessageSize = reinterpret_cast< uint64* >( &receiveBuffer[ UDP_HEADER_RESERVED ] );
void* ptMessageBody = ptMessageSize + 1;
while ( bytesRemaining >= sizeof( uint64 ) )
{
uint64 readBytes = *ptMessageSize + sizeof( uint64 );
// Malformed packet?
if ( readBytes > bytesRemaining )
throw NetworkException();
for ( int listenerIdx = 0; listenerIdx < listeners.size(); ++listenerIdx )
{
listeners[ listenerIdx ]->ReceivedMessage( ptMessageBody,
(unsigned) *ptMessageSize, session.id, session.address );
}
ptMessageSize = OffsetByBytes( ptMessageSize, readBytes );
ptMessageBody = ptMessageSize + 1;
bytesRemaining -= (int) readBytes;
}
}
//=================================================================
//=================================================================
bool GetNextPacket( int& outBytesReceived, sockaddr_in& outAddr )
{
unsigned long dataAvail = 0;
#ifdef WIN32
if ( 0 != ioctlsocket( sock, FIONREAD, &dataAvail ) )
throw NetworkException();
#endif
#ifdef LINUX
if ( 0 != ioctl( sock, FIONREAD, &dataAvail ) )
throw NetworkException();
#endif
if ( dataAvail == 0 ) return false;
int addrSize = sizeof( sockaddr_in );
#ifdef WIN32
outBytesReceived = recvfrom( sock, receiveBuffer,
MAX_PACKET_SIZE, 0, (SOCKADDR*) &outAddr, &addrSize );
#endif
#ifdef LINUX
outBytesReceived = recvfrom( sock, receiveBuffer,
MAX_PACKET_SIZE, 0, (SOCKADDR*) &outAddr, (unsigned*) &addrSize );
#endif
#ifdef SIMULATE_PACKET_LOSS
if ( rand() % 100 < SIMULATED_PACKET_LOSS_PERCENT )
return;
#endif
#ifdef WIN32
if ( SOCKET_ERROR == outBytesReceived ) switch ( WSAGetLastError() )
{
case WSAEMSGSIZE: // too big
outBytesReceived = MAX_PACKET_SIZE;
break;
case WSAECONNRESET: // port unreachable from previous send() call
case WSAETIMEDOUT: // took too long
case WSAENETRESET: // time to live expired
outBytesReceived = 0;
return true;
default:
throw NetworkException();
}
#endif
#ifdef LINUX
if ( 0 == outBytesReceived ) switch ( errno )
{
case EMSGSIZE: // too big
outBytesReceived = MAX_PACKET_SIZE;
break;
case ECONNRESET: // port unreachable from previous send() call
case ETIMEDOUT: // took too long
case ENETRESET: // time to live expired
return true;
default:
throw NetworkException();
}
#endif
return true;
}
//=================================================================
//=================================================================
void processReceive()
{
int bytesReceived;
sockaddr_in remoteAddrIn;
UDPAddress remoteAddress;
#ifdef SIMULATE_NETWORK_LATENCY
while ( GetNextPacket( bytesReceived, remoteAddrIn ) )
{
PendingPacket newPacket;
newPacket.destinationAddr.ip = ntohl( remoteAddrIn.sin_addr.s_addr );
newPacket.destinationAddr.port = ntohs( remoteAddrIn.sin_port );
newPacket.lastSendTimeClocks = GetAbsoluteTimeClocks();
newPacket.totalSize = bytesReceived;
memcpy( newPacket.rawData, receiveBuffer, MAX_PACKET_SIZE );
latency_queue.push( newPacket );
}
#endif
// while there are packets on the socket
for ( ; ; )
{
// receive the next packet
#ifdef SIMULATE_NETWORK_LATENCY
if ( latency_queue.size() == 0 ) return;
SystemClocks receiveDelayClocks = SecondsToClocks( ADDITIONAL_LATENCY_MILLIS * 0.001 );
PendingPacket& curPacket = latency_queue.front();
if ( curPacket.lastSendTimeClocks + receiveDelayClocks <= GetAbsoluteTimeClocks() )
{
memcpy( receiveBuffer, curPacket.rawData, MAX_PACKET_SIZE );
remoteAddress = curPacket.destinationAddr;
bytesReceived = curPacket.totalSize;
latency_queue.pop();
}
else
{
return;
}
#else
if ( !GetNextPacket( bytesReceived, remoteAddrIn ) ) return;
remoteAddress.ip = ntohl( remoteAddrIn.sin_addr.s_addr );
remoteAddress.port = ntohs( remoteAddrIn.sin_port );
#endif
// Got a packet, start processing it
// Too small to be valid?
if ( bytesReceived < UDP_HEADER_RESERVED ) continue;
UDPHeader* header = (UDPHeader*) receiveBuffer;
// Determine the packet’s session
SessionInfo* curSession;
try {
curSession = &getSession( remoteAddress );
}
catch ( InvalidSessionException ) {
// No active session for this address
// Do not start new sessions for OOO guaranteed messages,
// ACKs, or keepalives
bool bInvalidPacketForNewSession( false );
switch ( header->type )
{
case PACKET_PING:
case PACKET_PING_RESPONSE:
case PACKET_KEEPALIVE:
case MESSAGE_DELIVERY_ACK:
bInvalidPacketForNewSession = true;
break;
case PACKET_GUARANTEED:
if ( header->sequenceNum != 0 )
bInvalidPacketForNewSession = true;
break;
}
if ( bInvalidPacketForNewSession ) continue;
curSession = &openSession( remoteAddress );
// To avoid listeners getting a packet before a session started
// message we need to inform them if there was a new session here
notifyNewSessions();
}
// Update the last activity for the session
if ( !curSession->bTerminated && !curSession->bTimeout )
curSession->lastReceiptTimeClocks = GetAbsoluteTimeClocks();
bool bNotifyListeners( true );
switch ( header->type )
{
case PACKET_PING:
bNotifyListeners = false;
respondToPing( *curSession, header->sequenceNum );
break;
case PACKET_PING_RESPONSE:
curSession->RTTLog.logSample(
GetAbsoluteTimeClocks() – (SystemClocks) header->sequenceNum );
bNotifyListeners = false;
break;
case PACKET_KEEPALIVE:
bNotifyListeners = false;
break;
case MESSAGE_DELIVERY_ACK:
bNotifyListeners = false;
processAcknowledge( *curSession, header->sequenceNum );
break;
case PACKET_GUARANTEED:
if ( header->sequenceNum == curSession->nextRemoteSeq )
++curSession->nextRemoteSeq;
else
bNotifyListeners = false;
sendAcknowledge( *curSession );
break;
case PACKET_NORMAL:
break;
// Malformed packet
default: throw NetworkException();
}
if ( bNotifyListeners )
notifyReceived( *curSession, bytesReceived );
}
}
//=================================================================
//=================================================================
int winSend( PendingPacket& curPacket )
{
sockaddr_in remoteAddress;
remoteAddress.sin_family = AF_INET;
remoteAddress.sin_port = htons( curPacket.destinationAddr.port );
remoteAddress.sin_addr.s_addr = htonl( curPacket.destinationAddr.ip );
return sendto( sock, curPacket.rawData, curPacket.totalSize,
0, (SOCKADDR*) &remoteAddress, sizeof( sockaddr_in ) );
}
//=================================================================
//=================================================================
void processSend()
{
SystemClocks keepAliveClocks = SecondsToClocks( KEEPALIVE_MILLIS * 0.001 );
SystemClocks initialPingClocks = SecondsToClocks( INITIAL_RTT_POLL_FREQUENCY_MILLIS * 0.001 );
SystemClocks normalPingClocks = SecondsToClocks( RTT_POLL_FREQUENCY_MILLIS * 0.001 );
SystemClocks minPublishClocks = SecondsToClocks( MIN_PUBLISH_RATE_MILLIS * 0.001 );
SystemClocks curTime = GetAbsoluteTimeClocks();
for ( auto sessionItr = activeSessions.begin(); sessionItr != activeSessions.end(); ++sessionItr )
{
SessionInfo& curSession = sessionItr->second;
SystemClocks publishClocks = ( curSession.RTTLog.getAverage() * PUBLISH_RATE_PERCENT_RTT ) / 100;
if ( publishClocks < minPublishClocks ) publishClocks = minPublishClocks;
// Send RTT pings
if ( ( curSession.RTTLog.validSamples == RTT_SAMPLES && curSession.lastPingTimeClocks + normalPingClocks <= curTime ) ||
( curSession.RTTLog.validSamples != RTT_SAMPLES &&curSession.lastPingTimeClocks + initialPingClocks <= curTime ) )
{
sendPing( curSession );
curSession.lastPingTimeClocks = curTime;
}
// check sessions for packets that need publishing
if ( curSession.bufferCreationTimeClocks != -1 &&
( curSession.bTerminated ||
curSession.bufferCreationTimeClocks + publishClocks <= curTime ) )
{
publishPacket( curSession );
}
// check sessions to see if keepalives are required
else if ( !curSession.bTerminated &&
curSession.lastSendTimeClocks + keepAliveClocks <= curTime )
{
if ( curSession.bufferCreationTimeClocks != -1 )
publishPacket( curSession );
else
keepAlive( curSession );
}
}
// send non-guaranteed packets
while ( pendingPackets.size() > 0 )
{
int result = winSend( pendingPackets[ pendingPackets.size() – 1 ] );
pendingPackets.pop_back();
if ( result == SOCKET_ERROR )
throw NetworkException();
}
// send any guaranteed packets that are due
for ( int packetItr = (int) pendingGuaranteedPackets.size() – 1; packetItr >= 0; –packetItr )
{
PendingPacket& curPacket = pendingGuaranteedPackets[ packetItr ];
if ( curPacket.lastSendTimeClocks + curPacket.guaranteedResendClocks <= curTime )
{
curPacket.lastSendTimeClocks = curTime;
if ( ++curPacket.sendAttempts <= GUARANTEED_MESSAGE_MAX_RETRIES )
{
if ( winSend( curPacket ) == SOCKET_ERROR )
throw NetworkException();
}
else
{
// if we are past its max retries, mark the session as terminated
auto sessionItr = activeSessions.find( curPacket.session );
if ( sessionItr != activeSessions.end() )
sessionItr->second.bTimeout = true;
// remove the packet from consideration
pendingGuaranteedPackets[ packetItr ] = pendingGuaranteedPackets[ pendingGuaranteedPackets.size() – 1 ];
pendingGuaranteedPackets.pop_back();
}
}
}
}
//=================================================================
//=================================================================
void postUpdate()
{
SystemClocks timeoutClocks = SecondsToClocks( CONNECTION_TIMEOUT_MILLIS * 0.001 );
SystemClocks curTime = GetAbsoluteTimeClocks();
// check sessions for timeouts and terminations
for ( auto sessionItr = activeSessions.begin(); sessionItr != activeSessions.end(); )
{
SessionInfo& curSession = sessionItr->second;
if ( curSession.lastReceiptTimeClocks + timeoutClocks <= curTime ||
curSession.bTimeout ||
( curSession.bTerminated && curSession.pendingGuaranteedCount == 0 ) )
{
// remove any guaranteed messages posted by a terminated session
processAcknowledge( curSession, (uint64) -1 );
// notify listeners of terminated sessions
for ( int listenItr = 0; listenItr < (int) listeners.size(); ++listenItr )
{
listeners[ listenItr ]->SessionEnded( curSession.id,
curSession.address, curSession.bTimeout );
}
auto temp = sessionItr;
++sessionItr;
activeSessions.erase( temp );
}
else
{
++sessionItr;
}
}
}
//=================================================================
//=================================================================
static WinsockInitializer s_winsockInit;
SOCKET sock;
char receiveBuffer[ MAX_PACKET_SIZE ];
std::vector< PendingPacket > pendingPackets;
std::vector< PendingPacket > pendingGuaranteedPackets;
std::map< SessionID, SessionInfo > activeSessions;
unsigned nextSessionID;
std::vector< UDPSocketListener* > listeners;
std::vector< SessionID > newSessions;
#ifdef SIMULATE_NETWORK_LATENCY
std::queue< PendingPacket > latency_queue;
#endif
};
WinsockInitializer UDPSocketImpl::s_winsockInit;
//=====================================================================
//=====================================================================
UDPSocket::UDPSocket() : m_( new UDPSocketImpl() ) { }
UDPSocket::~UDPSocket() { delete m_; }
//=====================================================================
//=====================================================================
void UDPSocket::addListener( UDPSocketListener* newListener )
{
removeListener( newListener );
m_->listeners.push_back( newListener );
}
//=====================================================================
//=====================================================================
void UDPSocket::removeListener( UDPSocketListener* oldListener )
{
for ( int i = 0; i < m_->listeners.size(); ++i )
if ( m_->listeners[ i ] == oldListener )
{
m_->listeners[ i ] = m_->listeners[ m_->listeners.size() – 1 ];
m_->listeners.pop_back();
–i;
}
}
//=====================================================================
//=====================================================================
void UDPSocket::bindPort( unsigned short port )
{
sockaddr_in bindAddr;
bindAddr.sin_family = AF_INET;
bindAddr.sin_port = htons( port );
bindAddr.sin_addr.s_addr = htonl( INADDR_ANY );
int result = bind( m_->sock, (SOCKADDR *) &bindAddr, sizeof( sockaddr_in ) );
#ifdef WIN32
if ( result == SOCKET_ERROR ) switch ( WSAGetLastError() )
{
case WSAEADDRINUSE: throw PortInUseException();
// TODO – more detail on types of errors
default: throw NetworkException();
}
#endif
#ifdef LINUX
if ( result == SOCKET_ERROR ) switch ( errno )
{
case EADDRINUSE: throw PortInUseException();
// TODO – more detail on types of errors
default: throw NetworkException();
}
#endif
}
//=====================================================================
//=====================================================================
void UDPSocket::send( const void* data, unsigned dataSize, const UDPAddress& remoteAddr )
{
m_->queueMessage( m_->getOrOpenSession( remoteAddr ), data, dataSize );
}
//=====================================================================
//=====================================================================
void UDPSocket::send( const void* data, unsigned dataSize, SessionID sessionID )
{
m_->queueMessage( m_->getSession( sessionID ), data, dataSize );
}
//=====================================================================
//=====================================================================
void UDPSocket::sendGuaranteed( const void* data, unsigned dataSize, const UDPAddress& remoteAddr )
{
m_->queueMessage( m_->getOrOpenSession( remoteAddr ), data, dataSize, true );
}
//=====================================================================
//=====================================================================
void UDPSocket::sendGuaranteed( const void* data, unsigned dataSize, SessionID sessionID )
{
m_->queueMessage( m_->getSession( sessionID ), data, dataSize, true );
}
//=====================================================================
//=====================================================================
void UDPSocket::terminateSession( const UDPAddress& remoteAddr )
{
m_->getSession( remoteAddr ).bTerminated = true;
}
//=====================================================================
//=====================================================================
void UDPSocket::terminateSession( SessionID id )
{
m_->getSession( id ).bTerminated = true;
}
//=====================================================================
//=====================================================================
void UDPSocket::getRemoteAddressByID( SessionID sessionID, UDPAddress& outAddress )
{
outAddress = m_->getSession( sessionID ).address;
}
//=====================================================================
//=====================================================================
unsigned UDPSocket::getRTTMillis( SessionID id ) const
{
return (unsigned) ceil( ClocksToSeconds(
m_->getSession( id ).RTTLog.getAverage() ) * 1000.0 );
}
//=====================================================================
//=====================================================================
void UDPSocket::update()
{
m_->notifyNewSessions();
m_->processReceive();
m_->processSend();
m_->postUpdate();
}
}
[/code]