/******************************************************************************* * Copyright (c) 2021 Nerian Vision GmbH * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. *******************************************************************************/ #include #include #include #include #include #include "visiontransfer/datablockprotocol.h" #include "visiontransfer/exceptions.h" // Network headers #ifdef _WIN32 #include #undef min #undef max #else #include #endif #define LOG_ERROR(expr) //#define LOG_ERROR(expr) std::cerr << "DataBlockProtocol: " << expr << std::endl using namespace std; using namespace visiontransfer; using namespace visiontransfer::internal; namespace visiontransfer { namespace internal { DataBlockProtocol::DataBlockProtocol(bool server, ProtocolType protType, int maxUdpPacketSize) : isServer(server), protType(protType), transferDone(true), overwrittenTransferData{0}, overwrittenTransferIndex{-1}, overwrittenTransferBlock{-1}, transferHeaderData{nullptr}, transferHeaderSize{0}, totalBytesCompleted{0}, totalTransferSize{0}, waitingForMissingSegments(false), totalReceiveSize(0), connectionConfirmed(false), confirmationMessagePending(false), eofMessagePending(false), clientConnectionPending(false), resendMessagePending(false), lastRemoteHostActivity(), lastSentHeartbeat(), lastReceivedHeartbeat(std::chrono::steady_clock::now()), finishedReception(false), droppedReceptions(0), completedReceptions(0), lostSegmentRate(0.0), lostSegmentBytes(0), unprocessedMsgLength(0), headerReceived(false) { // Determine the maximum allowed payload size if(protType == PROTOCOL_TCP) { maxPayloadSize = MAX_TCP_BYTES_TRANSFER - sizeof(SegmentHeaderTCP); minPayloadSize = 0; } else { maxPayloadSize = maxUdpPacketSize - sizeof(SegmentHeaderUDP); minPayloadSize = maxPayloadSize; } zeroStructures(); resizeReceiveBuffer(); resetReception(false); } void DataBlockProtocol::splitRawOffset(int rawSegmentOffset, int& dataBlockID, int& segmentOffset) { int selector = (rawSegmentOffset >> 28) & 0xf; dataBlockID = selector & 0x7; // Note: 0x8 bit is reserved for now segmentOffset = rawSegmentOffset & 0x0FFFffff; } int DataBlockProtocol::mergeRawOffset(int dataBlockID, int segmentOffset, int reserved_defaults0) { return ((reserved_defaults0 & 1) << 31) | ((dataBlockID & 0x07) << 28) | (segmentOffset & 0x0FFFffff); } void DataBlockProtocol::zeroStructures() { for (int i=0; i= numTransferBlocks) { throw ProtocolException("Request to set data block size - block index too high!"); } transferSize[block] = bytes; HeaderPreamble* hp = reinterpret_cast(transferHeaderData); hp->netTransferSizes[block] = htonl(bytes); } void DataBlockProtocol::setTransferHeader(unsigned char* data, int headerSize, int blocks) { if(!transferDone && numTransferBlocks > 0) { throw ProtocolException("Header data set while transfer is active!"); } else if(headerSize + 9 > static_cast(sizeof(controlMessageBuffer))) { throw ProtocolException("Transfer header is too large!"); } numTransferBlocks = blocks; transferDone = false; for (int i=0; itransferSize[i] = 0; // must be set via setRawTransferBytes() } int headerBaseOffset = sizeof(HeaderPreamble); transferHeaderData = &data[-headerBaseOffset]; HeaderPreamble* ourHeader = reinterpret_cast(transferHeaderData); unsigned short netHeaderSize = htons(static_cast(headerSize)); ourHeader->netHeaderSize = netHeaderSize; ourHeader->netTransferSizeDummy = htonl(-1); // clashes on purpose with old recipients headerSize += headerBaseOffset; if(protType == PROTOCOL_UDP) { // In UDP mode we still need to make this a control message transferHeaderData[headerSize++] = HEADER_MESSAGE; transferHeaderData[headerSize++] = 0xFF; transferHeaderData[headerSize++] = 0xFF; transferHeaderData[headerSize++] = 0xFF; transferHeaderData[headerSize++] = 0xFF; } transferHeaderSize = headerSize; } void DataBlockProtocol::setTransferData(int block, unsigned char* data, int validBytes) { if(transferHeaderSize == 0 || transferHeaderData == nullptr) { throw ProtocolException("The transfer header has not yet been set!"); } transferDone = false; rawDataArr[block] = data; transferOffset[block] = 0; overwrittenTransferIndex = -1; overwrittenTransferBlock = -1; rawValidBytes[block] = min(transferSize[block], validBytes); totalBytesCompleted = 0; } void DataBlockProtocol::setTransferValidBytes(int block, int validBytes) { if(validBytes >= transferSize[block]) { rawValidBytes[block] = transferSize[block]; } else if(validBytes < static_cast(sizeof(int))) { rawValidBytes[block] = 0; } else { rawValidBytes[block] = validBytes; } } std::string DataBlockProtocol::statusReport() { std::stringstream ss; ss << "DataBlockProtocol, blocks=" << numTransferBlocks << ": "; for (int i=0; i(&rawDataArr[block][offset + length]); std::memcpy(overwrittenTransferData, segmentHeader, sizeof(SegmentHeaderUDP)); segmentHeader->segmentOffset = static_cast(htonl(mergeRawOffset(block, offset))); length += sizeof(SegmentHeaderUDP); lastTransmittedBlock = block; return &rawDataArr[block][offset]; } else { // For tcp, we *PRE*pend the header consisting of segment offset plus the packet payload size int headerOffset = offset - sizeof(SegmentHeaderTCP); overwrittenTransferBlock = block; overwrittenTransferIndex = headerOffset; SegmentHeaderTCP* segmentHeader = reinterpret_cast(&rawDataArr[block][headerOffset]); std::memcpy(overwrittenTransferData, segmentHeader, sizeof(SegmentHeaderTCP)); segmentHeader->fragmentSize = htons(length); segmentHeader->segmentOffset = static_cast(htonl(mergeRawOffset(block, offset))); length += sizeof(SegmentHeaderTCP); lastTransmittedBlock = block; return &rawDataArr[block][headerOffset]; } } void DataBlockProtocol::getNextTransferSegment(int& block, int& offset, int& length) { if(missingTransferSegments.size() == 0) { // Select from block with the most unsent data int sendBlock = 0, amount = 0; for (int i=0; i amount) { amount = avail; sendBlock = i; } } length = std::min(maxPayloadSize, amount); if(length == 0 || (length < minPayloadSize && rawValidBytes[sendBlock] != transferSize[sendBlock])) { length = 0; return; } block = sendBlock; offset = transferOffset[sendBlock]; transferOffset[sendBlock] += length; // for next transfer if (protType == PROTOCOL_UDP) { bool complete = true; for (int i=0; i= 0) { if(protType == PROTOCOL_UDP) { std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData, sizeof(SegmentHeaderUDP)); } else { std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData, sizeof(SegmentHeaderTCP)); } } overwrittenTransferIndex = -1; overwrittenTransferBlock = -1; } bool DataBlockProtocol::transferComplete() { for (int i=0; i receiveBuffer.size()) { receiveBuffer.resize(receiveOffset + maxLength); } return &receiveBuffer[receiveOffset]; } void DataBlockProtocol::processReceivedMessage(int length, bool& transferComplete) { transferComplete = false; if(length <= 0) { return; // Nothing received } if(finishedReception) { // First reset for next frame resetReception(false); } if(protType == PROTOCOL_UDP) { processReceivedUdpMessage(length, transferComplete); } else { processReceivedTcpMessage(length, transferComplete); } transferComplete = finishedReception; } void DataBlockProtocol::processReceivedUdpMessage(int length, bool& transferComplete) { if(length < static_cast(sizeof(int)) || 0 + length > static_cast(receiveBuffer.size())) { throw ProtocolException("Received message size is invalid!"); } // Extract the sequence number int rawSegmentOffset = ntohl(*reinterpret_cast( &receiveBuffer[0 + length - sizeof(int)])); // for holding the offset with blanked-out channel index int dataBlockID, segmentOffset; splitRawOffset(rawSegmentOffset, dataBlockID, segmentOffset); if(rawSegmentOffset == static_cast(0xFFFFFFFF)) { // This is a control packet processControlMessage(length); } else if(headerReceived) { // Correct the length by subtracting the size of the segment offset int realPayloadOffset = 0; int payloadLength = length - sizeof(int); if(segmentOffset != blockReceiveOffsets[dataBlockID]) { // The segment offset doesn't match what we expected. Probably // a packet was dropped if(!waitingForMissingSegments && //receiveOffset > 0 && segmentOffset > blockReceiveOffsets[dataBlockID] && segmentOffset + payloadLength < (int)blockReceiveBuffers[dataBlockID].size()) { // We can just ask for a retransmission of this packet LOG_ERROR("Missing segment: " << blockReceiveOffsets[dataBlockID] << " - " << segmentOffset << " (" << missingReceiveSegments.size() << ")"); MissingReceiveSegment missingSeg; missingSeg.offset = mergeRawOffset(dataBlockID, blockReceiveOffsets[dataBlockID]); missingSeg.length = segmentOffset - blockReceiveOffsets[dataBlockID]; missingSeg.isEof = false; lostSegmentBytes += missingSeg.length; missingReceiveSegments.push_back(missingSeg); // Move the received data to the right place in the buffer memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength); // Advance block receive offset blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength; } else { // In this case we cannot recover from the packet loss or // we just didn't get the EOF packet and everything is // actually fine resetReception(blockReceiveOffsets[0] > 0); if(segmentOffset > 0 ) { if(blockReceiveOffsets[dataBlockID] > 0) { LOG_ERROR("Resend failed!"); } return; } else { LOG_ERROR("Missed EOF message!"); } } } else { if ((realPayloadOffset+payloadLength) > (int)receiveBuffer.size()) { throw ProtocolException("Received out-of-bound data."); } // append to correct block buffer memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength); // advance the expected next data offset for this block blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength; if (waitingForMissingSegments) { // segment extends the currently valid region (suspended once we missed out first segment) if ((missingReceiveSegments.size() == 1) && (missingReceiveSegments.front().length <= payloadLength)) { // last gap closed by this segment blockValidSize[dataBlockID] = blockReceiveSize[dataBlockID]; } else { blockValidSize[dataBlockID] = segmentOffset + payloadLength; } } else if (missingReceiveSegments.size() == 0) { blockValidSize[dataBlockID] = segmentOffset + payloadLength; } } if(segmentOffset == 0 && dataBlockID == 0) { // This is the beginning of a new frame lastRemoteHostActivity = std::chrono::steady_clock::now(); } // Try to fill missing regions integrateMissingUdpSegments(dataBlockID, segmentOffset, payloadLength); } } void DataBlockProtocol::integrateMissingUdpSegments(int block, int lastSegmentOffset, int lastSegmentSize) { if(waitingForMissingSegments) { // Things get more complicated when re-transmitting dropped packets int checkBlock, checkOffset; MissingReceiveSegment& firstSeg = missingReceiveSegments.front(); splitRawOffset(firstSeg.offset, checkBlock, checkOffset); if(lastSegmentOffset != checkOffset) { LOG_ERROR("Received invalid resend: " << lastSegmentOffset); resetReception(true); } else { firstSeg.offset += lastSegmentSize; firstSeg.length -= lastSegmentSize; if(firstSeg.length == 0) { missingReceiveSegments.pop_front(); } if(missingReceiveSegments.size() == 0) { waitingForMissingSegments = false; finishedReception = true; } else { blockReceiveOffsets[block] = missingReceiveSegments.front().offset; } } } } void DataBlockProtocol::processReceivedTcpMessage(int length, bool& transferComplete) { // In TCP mode the header must be the first data item to be transmitted if(!headerReceived) { int totalHeaderSize = parseReceivedHeader(length, 0); if(totalHeaderSize == 0) { // Not yet enough data. Keep on buffering. receiveOffset += length; // append in next recv return; } else { // Header successfully parsed // Move the remaining data to the beginning of the buffer length -= totalHeaderSize; // The rest is the first [part of] buffer segment data if(length == 0) { return; // No more data remaining } int movelength = receiveOffset + length; // also move the old stuff ::memmove(&receiveBuffer[0], &receiveBuffer[totalHeaderSize], movelength); receiveOffset = movelength; // append in next recv } } else { receiveOffset += length; // modified below if complete chunks are present } if (legacyTransfer) { // Legacy TCP transfer: no segment headers, just raw data for block 0, up to the expected size int remainingSize = blockReceiveSize[0] - blockValidSize[0]; int availableSize = std::min(receiveOffset, remainingSize); // Update actual target buffer std::memcpy(&blockReceiveBuffers[0][blockReceiveOffsets[0]], &receiveBuffer[0], availableSize); blockReceiveOffsets[0] += availableSize; blockValidSize[0] = blockReceiveOffsets[0]; // Extra data, store at buffer start for next reception to append to if (receiveOffset <= remainingSize) { // Start next reception at recv buffer start receiveOffset = 0; } else { // Mark next reception to append to unhandled data remainder std::memmove(&receiveBuffer[0], &receiveBuffer[remainingSize], availableSize - remainingSize); receiveOffset = availableSize - remainingSize; } } else { // Parse the SegmentHeaderTCP (if present) to see if a full fragment is present int ofs = 0; while ((receiveOffset - ofs) >= (int) sizeof(SegmentHeaderTCP)) { SegmentHeaderTCP* header = reinterpret_cast(&receiveBuffer[ofs]); int fragsize = ntohs(header->fragmentSize); int rawSegmentOffset = ntohl(header->segmentOffset); int block, offset; splitRawOffset(rawSegmentOffset, block, offset); if (block == 7) { // Block 7 is reserved; control message (the next header), stop moving image data break; } if ((receiveOffset - ofs) >= (fragsize + (int) sizeof(SegmentHeaderTCP))) { // Incorporate fragment // assert here that offset==blockReceiveOffsets[block] if (offset != blockReceiveOffsets[block]) { throw ProtocolException("Received invalid header!"); } std::memcpy(&blockReceiveBuffers[block][blockReceiveOffsets[block]], &receiveBuffer[ofs+sizeof(SegmentHeaderTCP)], fragsize); blockReceiveOffsets[block] += fragsize; blockValidSize[block] = blockReceiveOffsets[block]; // Advance to next potential chunk ofs += fragsize + sizeof(SegmentHeaderTCP); } else { // Fragment incomplete, will be appended to in next recv (offset increased above) break; } } if (ofs > 0) { // Move start of next unaccounted-for fragment to start of buffer std::memmove(&receiveBuffer[0], &receiveBuffer[ofs], receiveOffset - ofs); receiveOffset -= ofs; // and shift append position accordingly } } // Determine whether all buffers are filled now bool complete = true; for (int i=0; i(&receiveBuffer[offset])); if (length < (headerExtraBytes + headerSize)) { return 0; } totalReceiveSize = static_cast(ntohl(*reinterpret_cast(&receiveBuffer[offset + 2]))); if (totalReceiveSize >= 0) { // old-style single block transfer legacyTransfer = true; headerExtraBytes = 6; numReceptionBlocks = 1; // ONE interleaved buffer blockReceiveSize[0] = totalReceiveSize; } else { // marked -1 for new-style multi block transfer legacyTransfer = false; headerExtraBytes = static_cast(sizeof(HeaderPreamble)); HeaderPreamble* header = reinterpret_cast(&receiveBuffer[offset]); numReceptionBlocks = 0; totalReceiveSize = 0; for (int i=0; inetTransferSizes[i]); if (s > 0) { blockReceiveSize[i] = s; numReceptionBlocks++; totalReceiveSize += s; } else { // first non-positive payload size signals end of blocks //break; } } } if (numReceptionBlocks==0) throw std::runtime_error("Received a transfer with zero blocks"); if (numReceptionBlocks > MAX_DATA_BLOCKS) throw std::runtime_error("Received a transfer with too many blocks"); if(headerSize + headerExtraBytes > static_cast(receiveBuffer.size()) || totalReceiveSize < 0 || headerSize + headerExtraBytes > length ) { throw ProtocolException("Received invalid header!"); } headerReceived = true; receivedHeader.assign(receiveBuffer.begin() + offset + headerExtraBytes, receiveBuffer.begin() + offset + headerSize + headerExtraBytes); resizeReceiveBuffer(); return headerSize + headerExtraBytes; } void DataBlockProtocol::resetReception(bool dropped) { numReceptionBlocks = 0; headerReceived = false; missingReceiveSegments.clear(); receivedHeader.clear(); waitingForMissingSegments = false; totalReceiveSize = 0; finishedReception = false; lostSegmentBytes = 0; for (int i=0; i 0) { length = min(length, missingReceiveSegments[0].offset); } return &receiveBuffer[0]; } unsigned char* DataBlockProtocol::getReceivedHeader(int& length) { if(receivedHeader.size() > 0) { length = static_cast(receivedHeader.size()); return &receivedHeader[0]; } else { return nullptr; } } bool DataBlockProtocol::processControlMessage(int length) { if(length < static_cast(sizeof(int) + 1)) { return false; } int payloadLength = length - sizeof(int) - 1; switch(receiveBuffer[0 + payloadLength]) { case CONFIRM_MESSAGE: // Our connection request has been accepted connectionConfirmed = true; break; case CONNECTION_MESSAGE: // We establish a new connection connectionConfirmed = true; confirmationMessagePending = true; clientConnectionPending = true; // A connection request is just as good as a heartbeat lastReceivedHeartbeat = std::chrono::steady_clock::now(); break; case HEADER_MESSAGE: { if (anyPayloadReceived()) { if (allBlocksDone()) { LOG_ERROR("No EOF message received!"); } else { LOG_ERROR("Received header too late/early!"); } resetReception(true); } if(parseReceivedHeader(payloadLength, 0) == 0) { throw ProtocolException("Received header is too short!"); } } break; case EOF_MESSAGE: // This is the end of the frame if(anyPayloadReceived()) { parseEofMessage(length); } break; case RESEND_MESSAGE: { // The client requested retransmission of missing packets parseResendMessage(payloadLength); break; } case HEARTBEAT_MESSAGE: // A cyclic heartbeat message lastReceivedHeartbeat = std::chrono::steady_clock::now(); break; default: throw ProtocolException("Received invalid control message!"); break; } return true; } bool DataBlockProtocol::isConnected() const { if(protType == PROTOCOL_TCP) { // Connection is handled by TCP and not by us return true; } else if(connectionConfirmed) { return !isServer || std::chrono::duration_cast( std::chrono::steady_clock::now() - lastReceivedHeartbeat).count() < 2*HEARTBEAT_INTERVAL_MS; } else return false; } const unsigned char* DataBlockProtocol::getNextControlMessage(int& length) { length = 0; if(protType == PROTOCOL_TCP) { // There are no control messages for TCP return nullptr; } if(confirmationMessagePending) { // Send confirmation message confirmationMessagePending = false; controlMessageBuffer[0] = CONFIRM_MESSAGE; length = 1; } else if(!isServer && std::chrono::duration_cast( std::chrono::steady_clock::now() - lastRemoteHostActivity).count() > RECONNECT_TIMEOUT_MS) { // Send a new connection request controlMessageBuffer[0] = CONNECTION_MESSAGE; length = 1; // Also update time stamps lastRemoteHostActivity = lastSentHeartbeat = std::chrono::steady_clock::now(); } else if(transferHeaderData != nullptr && isConnected()) { // We need to send a new protocol header length = transferHeaderSize; const unsigned char* ret = transferHeaderData; transferHeaderData = nullptr; return ret; } else if(eofMessagePending) { // Send end of frame message eofMessagePending = false; unsigned int networkOffset = htonl(mergeRawOffset(lastTransmittedBlock, transferSize[lastTransmittedBlock])); memcpy(&controlMessageBuffer[0], &networkOffset, sizeof(int)); controlMessageBuffer[sizeof(int)] = EOF_MESSAGE; length = 5; } else if(resendMessagePending) { // Send a re-send request for missing messages resendMessagePending = false; if(!generateResendRequest(length)) { length = 0; return nullptr; } } else if(!isServer && std::chrono::duration_cast( std::chrono::steady_clock::now() - lastSentHeartbeat).count() > HEARTBEAT_INTERVAL_MS) { // Send a heartbeat message controlMessageBuffer[0] = HEARTBEAT_MESSAGE; length = 1; lastSentHeartbeat = std::chrono::steady_clock::now(); } else { return nullptr; } // Mark this message as a control message controlMessageBuffer[length++] = 0xff; controlMessageBuffer[length++] = 0xff; controlMessageBuffer[length++] = 0xff; controlMessageBuffer[length++] = 0xff; return controlMessageBuffer; } bool DataBlockProtocol::newClientConnected() { if(clientConnectionPending) { clientConnectionPending = false; return true; } else { return false; } } bool DataBlockProtocol::generateResendRequest(int& length) { length = static_cast(missingReceiveSegments.size() * (sizeof(int) + sizeof(unsigned short))); if(length + sizeof(int) + 1> sizeof(controlMessageBuffer)) { return false; } length = 0; for(MissingReceiveSegment segment: missingReceiveSegments) { unsigned int segOffset = htonl(static_cast(segment.offset)); unsigned int segLen = htonl(static_cast(segment.length)); memcpy(&controlMessageBuffer[length], &segOffset, sizeof(segOffset)); length += sizeof(unsigned int); memcpy(&controlMessageBuffer[length], &segLen, sizeof(segLen)); length += sizeof(unsigned int); } controlMessageBuffer[length++] = RESEND_MESSAGE; return true; } void DataBlockProtocol::parseResendMessage(int length) { missingTransferSegments.clear(); int num = length / (sizeof(unsigned int) + sizeof(unsigned short)); int bufferOffset = 0; for(int i=0; i(&receiveBuffer[bufferOffset]); bufferOffset += sizeof(unsigned int); unsigned int segLenNet = *reinterpret_cast(&receiveBuffer[bufferOffset]); bufferOffset += sizeof(unsigned int); int segmentOffsetRaw = static_cast(ntohl(segOffsetNet)); // with block ID int segmentLength = static_cast(ntohl(segLenNet)); int dataBlockID, segmentOffset; splitRawOffset(segmentOffsetRaw, dataBlockID, segmentOffset); if(segmentOffset >= 0 && segmentLength > 0 && (segmentOffset + segmentLength) <= rawValidBytes[dataBlockID]) { missingTransferSegments.push_back(std::pair( segmentOffsetRaw, segmentLength)); } } } void DataBlockProtocol::parseEofMessage(int length) { completedReceptions++; lostSegmentRate = (lostSegmentRate * (completedReceptions-1) + ((double) lostSegmentBytes) / totalReceiveSize) / completedReceptions; if(length >= 4) { // Find all missing segments at the end of blocks for (int i=0; i 0) { waitingForMissingSegments = true; resendMessagePending = true; // Initialize all missing block start indices with earliest missing address int mblock, moffset; for (int i=0; i(missingReceiveSegments.size()); ++i) { splitRawOffset(missingReceiveSegments[i].offset, mblock, moffset); if (moffset < blockReceiveOffsets[mblock]) { blockReceiveOffsets[mblock] = moffset; } } } else { finishedReception = true; } } else { LOG_ERROR("EOF message too short, length " << length); } } void DataBlockProtocol::resizeReceiveBuffer() { if(totalReceiveSize < 0) { throw ProtocolException("Received invalid transfer size!"); } // We increase the requested size to allow for one // additional network message and the protocol overhead int bufferSize = 2*getMaxReceptionSize() + MAX_OUTSTANDING_BYTES + sizeof(int); // Resize the buffer if(static_cast(receiveBuffer.size()) < bufferSize) { receiveBuffer.resize(bufferSize); } for (int i=0; i(blockReceiveBuffers[i].size()) < blockReceiveSize[i]) { blockReceiveBuffers[i].resize(blockReceiveSize[i]); } } } }} // namespace