70 lines
1.9 KiB
Python
70 lines
1.9 KiB
Python
|
|
import struct
|
|
|
|
class NMBError(Exception): pass
|
|
|
|
|
|
class NotConnectedError(NMBError):
|
|
"""
|
|
Raisd when the underlying NMB connection has been disconnected or not connected yet
|
|
"""
|
|
pass
|
|
|
|
|
|
class NMBSessionMessage:
|
|
|
|
HEADER_STRUCT_FORMAT = '>BBH'
|
|
HEADER_STRUCT_SIZE = struct.calcsize(HEADER_STRUCT_FORMAT)
|
|
|
|
def __init__(self):
|
|
self.reset()
|
|
|
|
def reset(self):
|
|
self.type = 0
|
|
self.flags = 0
|
|
self.data = ''
|
|
|
|
def decode(self, data, offset):
|
|
data_len = len(data)
|
|
|
|
if data_len < offset + self.HEADER_STRUCT_SIZE:
|
|
# Not enough data for decoding
|
|
return 0
|
|
|
|
self.reset()
|
|
self.type, self.flags, length = struct.unpack(self.HEADER_STRUCT_FORMAT, data[offset:offset+self.HEADER_STRUCT_SIZE])
|
|
|
|
if self.flags & 0x01:
|
|
length |= 0x010000
|
|
|
|
if data_len < offset + self.HEADER_STRUCT_SIZE + length:
|
|
return 0
|
|
|
|
self.data = data[offset+self.HEADER_STRUCT_SIZE:offset+self.HEADER_STRUCT_SIZE+length]
|
|
return self.HEADER_STRUCT_SIZE + length
|
|
|
|
|
|
class DirectTCPSessionMessage(NMBSessionMessage):
|
|
|
|
HEADER_STRUCT_FORMAT = '>I'
|
|
HEADER_STRUCT_SIZE = struct.calcsize(HEADER_STRUCT_FORMAT)
|
|
|
|
def decode(self, data, offset):
|
|
data_len = len(data)
|
|
|
|
if data_len < offset + self.HEADER_STRUCT_SIZE:
|
|
# Not enough data for decoding
|
|
return 0
|
|
|
|
self.reset()
|
|
length = struct.unpack(self.HEADER_STRUCT_FORMAT, data[offset:offset+self.HEADER_STRUCT_SIZE])[0]
|
|
|
|
if length >> 24 != 0:
|
|
raise NMBError("Invalid protocol header for Direct TCP session message")
|
|
|
|
if data_len < offset + self.HEADER_STRUCT_SIZE + length:
|
|
return 0
|
|
|
|
self.data = data[offset+self.HEADER_STRUCT_SIZE:offset+self.HEADER_STRUCT_SIZE+length]
|
|
return self.HEADER_STRUCT_SIZE + length
|