# jsonsocket.py v1.7
import json
import socket
import traceback
import logging as log
log.basicConfig(level=log.INFO)
logging = log.getLogger(__name__)
try:
from Cryptodome.Cipher import AES
import hashlib
except (SystemError, ImportError):
AES = None
logging.warning(
'CryptodomeX or hashlib not installed! Install with "pip3 install pycryptodomex"')
HOST_WHITELIST = ['127.0.0.1', 'localhost']
[docs]class NoPasswordProtectionError(Exception):
"""
Raised when a password was provided, but server has no protectionAttributes.
Attributes:
expression -- input expression in which the error occurred
message -- explanation of the error
"""
def __init__(self, expression, message):
self.expression = expression
self.message = message
[docs]class WrongPasswordError(Exception):
"""
Raised when the password is wrong.
Attributes:
expression -- input expression in which the error occurred
message -- explanation of the error
"""
def __init__(self, expression, message):
self.expression = expression
self.message = message
[docs]class PasswordProtectedError(Exception):
"""
Raised when the server may be password protected.
Attributes:
expression -- input expression in which the error occurred
message -- explanation of the error
"""
def __init__(self, expression, message):
self.expression = expression
self.message = message
# class ValueTooLargeError(Exception):
# """
# Raised when the input value is too large.
#
# Attributes:
# expression -- input expression in which the error occurred
# message -- explanation of the error
# """
# def __init__(self, expression, message):
# self.expression = expression
# self.message = message
BACKLOG = 5
[docs]class Server(object):
"""
A JSON socket server used to communicate with a JSON socket client. All the
data is serialized in JSON.
Args:
host (str): Hostname (e.g. 0.0.0.0)
port (int): Port to bind tcp-socket (e.g. 5050)
keyword (str or None): Set a keyword for encrypted communication. Leaf blank for unsecure connection. Note: This will not encrypt host-intern-communication. (default: None)
reuse_port (bool): Enable/disable reuse_port (default: True)
"""
def __init__(self, host, port, keyword=None, reuse_port=True, timeout=5):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client = None
if reuse_port:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# if hasattr(socket, "TCP_KEEPIDLE") and hasattr(socket, "TCP_KEEPINTVL") and hasattr(socket, "TCP_KEEPCNT"):
# self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1 * 60)
# self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 5 * 60)
# self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 10)
# self.socket.setblocking(0)
self.socket.settimeout(timeout)
self.socket.bind((host, port))
self.socket.listen(BACKLOG)
self.keyword = keyword
[docs] def setKeyword(self, keyword=None):
"""
Set a keyword to protect the tcp communication.
Args:
keyword (str or None): A save passcode or None to disable protection
"""
self.keyword = keyword
# def __del__(self):
# self.close()
[docs] def accept(self):
"""
Accept a new client connection
Returns:
client
"""
# if a client is already connected, disconnect it
if self.client:
self.client.close()
self.client, self.client_addr = self.socket.accept()
return self.client
[docs] def send(self, data):
"""
Send a dict to the connected client
Args:
data (dict): The dict you want to transmit.
"""
if not self.client:
raise Exception('Cannot send data, no client is connected')
if self.client_addr[0] not in HOST_WHITELIST:
_send(self.client, data, self.keyword)
else:
_send(self.client, data, "")
return self
[docs] def recv(self):
"""
Receives a dict from client
Returns:
data (dict): The dict sent by the connected client
"""
if not self.client:
raise Exception('Cannot receive data, no client is connected')
if self.client_addr[0] not in HOST_WHITELIST:
return _recv(self.client, self.keyword)
else:
return _recv(self.client, "")
[docs] def close(self):
"""
Close the server-socket.
Do this at the very end of your communication!
"""
if self.client:
self.client.close()
self.client = None
if self.socket:
self.socket.close()
self.socket = None
self.keyword = None
[docs]class Client(object):
"""
A JSON socket client used to communicate with a JSON socket server. All the
data is serialized in JSON. How to use it:
Args:
keyword (str or None): Set a keyword for encrypted communication. Leaf blank for unsecure connection. (default: None)
"""
def __init__(self):
self.socket = None
self.keyword = None
self.host = None
[docs] def setKeyword(self, keyword=None):
"""
Set a keyword to protect the tcp communication.
Args:
keyword (str or None): A save passcode or None to disable protection
"""
self.keyword = keyword
# def __del__(self):
# self.close()
[docs] def connect(self, host, port, keyword=None, reuse_port=True, timeout=5):
"""
Establish a connection to a host (server)
Args:
host (str): Hostname (e.g. 0.0.0.0)
port (int): Port to bind tcp-socket (e.g. 5050)
keyword (str or None): Set a keyword for encrypted communication. Leaf blank for unsecure connection. (default: None)
reuse_port (bool): Enable/disable reuse_port (default: True)
"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if reuse_port:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# self.socket.setblocking(0)
self.socket.settimeout(timeout)
self.socket.connect((host, port))
self.keyword = keyword
self.host = host
return self
[docs] def send(self, data):
"""
Send a dict to the connected server
Args:
data (dict): The dict you want to transmit.
"""
if not self.socket:
raise Exception('You have to connect first before sending data')
if self.host not in HOST_WHITELIST:
_send(self.socket, data, self.keyword)
else:
_send(self.socket, data, "")
return self
[docs] def recv(self):
"""
Receives a dict from server
Returns:
data (dict): The dict sent by the connected server
"""
if not self.socket:
raise Exception('You have to connect first before receiving data')
if self.host not in HOST_WHITELIST:
return _recv(self.socket, self.keyword)
else:
return _recv(self.socket, "")
[docs] def recv_and_close(self):
"""
Receives a dict from server and closes connection.
Returns:
data (dict): The dict sent by the connected server
"""
data = self.recv()
self.close()
return data
[docs] def close(self):
"""
Close the client-socket.
Do this at the end of every single communication!
"""
if self.socket:
self.socket.close()
self.socket = None
self.host = None
# helper functions ##
def _send(socket, data, key=None):
try:
serialized = json.dumps(data)
# serialized=pickle.dumps(list(data))
except (TypeError, ValueError):
logging.debug(traceback.format_exc())
print(traceback.format_exc())
raise Exception('You can only send JSON-serializable data')
# send the length of the serialized data first
nonce = b''
tag = b''
if key is not None and key != '':
if AES is None:
raise SystemError
if type(key) != str:
raise TypeError
# des = DES.new(key.encode(), DES.MODE_ECB)
# padded_text = pad(serialized)
# serialized = des.encrypt(padded_text.encode('utf-8'))
# while len(key)<16:
# key=key+key
hash_object = hashlib.sha256(key.encode('utf-8'))
cipher = AES.new(hash_object.digest(), AES.MODE_EAX)
padded_text = pad(serialized)
serialized, tag = cipher.encrypt_and_digest(padded_text.encode('utf-8'))
nonce = cipher.nonce
else:
serialized = serialized.encode()
b = '%d,%d,%d\n' % (len(serialized), len(tag), len(nonce))
# socket.send(b.encode())
# send the serialized data
socket.sendall(b.encode()+tag+nonce+serialized)
def _recv(socket, key=None):
# read the length of the data, letter by letter until we reach EOL
length_str = b''
try:
char = socket.recv(1) # .decode()
while char != b'\n':
length_str += char
char = socket.recv(1) # .decode()
except Exception:
print(traceback.format_exc())
return False
try:
length_str = length_str.decode()
except Exception:
raise EnvironmentError(traceback.format_exc())
lens = length_str.split(',')
total = 0
tagTotal = 0
nonceTotal = 0
if len(lens) == 1:
total = int(lens[0])
# not encrypted
elif len(lens) == 3:
total = int(lens[0])
tagTotal = int(lens[1])
nonceTotal = int(lens[2])
# encrypted
else:
raise EnvironmentError
if tagTotal > 0:
tagView = readBlock(socket, tagTotal)
else:
tagView = b''
if nonceTotal > 0:
nonceView = readBlock(socket, nonceTotal)
else:
nonceView = b''
view = readBlock(socket, total)
if key is not None and key != '' and type(key) == str:
if len(tagView) != 0 and len(nonceView) != 0 and AES is not None:
try:
# des = DES.new(key.encode(), DES.MODE_ECB)
# decrypted = des.decrypt(view.tobytes())
hash_object = hashlib.sha256(key.encode('utf-8'))
cipher = AES.new(hash_object.digest(), AES.MODE_EAX, nonceView)
decrypted = cipher.decrypt_and_verify(view, tagView)
return deserializeJSON(decrypted)
except Exception:
tb = traceback.format_exc()
logging.error(tb)
raise WrongPasswordError("SOCKET PASSWORD ERROR, The provided password is wrong!")
else:
raise NoPasswordProtectionError(
'SOCKET PASSWORD ERROR, No password provided!\nCannot receive data')
else:
if len(tagView) == 0 and len(nonceView) == 0:
try:
return deserializeJSON(view)
except (TypeError, ValueError):
tb = traceback.format_exc()
logging.debug(tb)
raise PasswordProtectedError(
'JSON SOCKET ERROR, Data received was not in JSON format. Maybe the RTOC-Server is password-protected')
else:
raise PasswordProtectedError('SOCKET ERROR, The server is password protected')
[docs]def pad(text, padding=16):
while len(text) % padding != 0:
text += ' '
return text
[docs]def readBlock(socket, blockLength):
# use a memoryview to receive the data chunk by chunk efficiently
view = memoryview(bytearray(blockLength))
next_offset = 0
while blockLength - next_offset > 0:
recv_size = socket.recv_into(view[next_offset:], blockLength - next_offset)
next_offset += recv_size
return view.tobytes()
[docs]def deserializeJSON(json_bytes):
return json.loads(json_bytes.decode('utf-8'))