237 lines
9.1 KiB
Python
237 lines
9.1 KiB
Python
import socket
|
|
import threading
|
|
import time
|
|
class PortMuxer:
|
|
debug=False
|
|
def print(self,*args,**kwds):
|
|
self.printlock.acquire()
|
|
print(threading.get_native_id(),end=" ")
|
|
print(*args,**kwds)
|
|
self.printlock.release()
|
|
def printd(self,*args,**kwds):
|
|
if self.debug:self.print(*args,**kwds)
|
|
def __init__(self,config):
|
|
self.printlock=threading.Lock()
|
|
self.config=config
|
|
self.listener_addrport=(self.config["listener.address"],self.config["listener.port"])
|
|
self.print("PortMuxer initing for %s:%s..."%self.listener_addrport)
|
|
self.rules=self.config["muxer.rules"]
|
|
self.listener_addr=self.config["listener.address"]
|
|
self.listener_port=self.config["listener.port"]
|
|
self.connects={}
|
|
self.print("Initing done.")
|
|
def handler_daemon(self,sid):
|
|
self.print("Daemon thread for %s:%s"%sid)
|
|
inf=self.connects[sid]
|
|
self.print("Start daemon")
|
|
inf[10].wait()
|
|
self.print("Connection close...")
|
|
inf[3].close()
|
|
inf[4].close()
|
|
inf[0][0].join()
|
|
inf[0][1].join()
|
|
inf[0][2].join()
|
|
inf[0][3].join()
|
|
self.print("Connect done for %s:%s"%sid)
|
|
del self.connects[sid]
|
|
self.print("Daemon thread done.")
|
|
def handler_client_rx(self,sid):
|
|
self.print("Client Rx thread for %s:%s"%sid)
|
|
inf=self.connects[sid]
|
|
connect=inf[3]
|
|
buffer=inf[6]
|
|
event=inf[8]
|
|
run=True
|
|
self.print("Start client rx.")
|
|
while run:
|
|
try:buf=connect.recv(self.config["thread.server.rx.recvlength"])
|
|
except OSError:
|
|
buf=None
|
|
run=False
|
|
if buf:
|
|
self.printd("Recv client packet length",len(buf))
|
|
buffer.append(buf)
|
|
event.set()
|
|
else:
|
|
self.print("Client close connect")
|
|
run=False
|
|
break
|
|
inf[5]=True
|
|
event.set()
|
|
inf[9].set()
|
|
inf[10].set()
|
|
self.print("Client Rx thread done.")
|
|
def handler_server_rx(self,sid):
|
|
self.print("Server Rx thread for %s:%s"%sid)
|
|
inf=self.connects[sid]
|
|
connect=inf[4]
|
|
buffer=inf[7]
|
|
event=inf[9]
|
|
run=True
|
|
self.print("Start server rx.")
|
|
while run:
|
|
try:buf=connect.recv(self.config["thread.server.rx.recvlength"])
|
|
except OSError:
|
|
buf=None
|
|
run=False
|
|
if buf:
|
|
self.printd("Recv server packet length",len(buf))
|
|
buffer.append(buf)
|
|
event.set()
|
|
else:
|
|
self.print("Server close connect")
|
|
run=False
|
|
break
|
|
inf[5]=True
|
|
event.set()
|
|
inf[8].set()
|
|
inf[10].set()
|
|
self.print("Client Tx thread done.")
|
|
def handler_server_tx(self,sid):
|
|
self.print("Server Tx thread for %s:%s"%sid)
|
|
inf=self.connects[sid]
|
|
connect=inf[4]
|
|
buffer=inf[6]
|
|
event=inf[8]
|
|
run=True
|
|
self.print("Start server tx.")
|
|
while run:
|
|
event.wait()
|
|
event.clear()
|
|
if inf[5]:
|
|
self.print("Server onnect close.")
|
|
run=False
|
|
break
|
|
else:
|
|
while buffer:
|
|
buf=buffer[0]
|
|
del buffer[0]
|
|
ssize=connect.send(buf)
|
|
self.printd("Send client packet length",len(buf))
|
|
inf[10].set()
|
|
self.print("Server Rx thread done.")
|
|
def handler_client_tx(self,sid):
|
|
self.print("Client Tx thread for %s:%s"%sid)
|
|
inf=self.connects[sid]
|
|
connect=inf[3]
|
|
buffer=inf[7]
|
|
event=inf[9]
|
|
run=True
|
|
self.print("Start client tx.")
|
|
while run:
|
|
event.wait()
|
|
event.clear()
|
|
if inf[5]:
|
|
self.print("Client connect close.")
|
|
run=False
|
|
break
|
|
else:
|
|
while buffer:
|
|
buf=buffer[0]
|
|
del buffer[0]
|
|
self.printd("Send client packet length",len(buf))
|
|
ssize=connect.send(buf)
|
|
inf[10].set()
|
|
self.print("Client Tx thread done.")
|
|
def handler_listener(self):
|
|
self.print("Listener thread for %s:%s..."%self.listener_addrport)
|
|
self.listener_socket=socket.socket()
|
|
self.listener_socket.bind(self.listener_addrport)
|
|
self.listener_socket.listen()
|
|
try:
|
|
self.run=True
|
|
self.print("Start accept.")
|
|
while self.run:
|
|
conn,addr=self.listener_socket.accept()
|
|
if self.run:
|
|
self.print("Accept connect from %s:%s.Checking header..."%addr)
|
|
spec=conn.recv(self.rules["header_length"])
|
|
self.print("Header",spec)
|
|
if spec:
|
|
if spec in self.rules:
|
|
targetap=self.rules[spec]
|
|
self.print("Match one target")
|
|
else:
|
|
self.print("No match header.Choose default...")
|
|
if None in self.rules:
|
|
targetap=self.rules[None]
|
|
else:
|
|
self.print("No default target.Close connect...")
|
|
continue
|
|
self.print("Target %s:%s"%targetap)
|
|
serversocket=socket.socket()
|
|
try:serversocket.connect(targetap)
|
|
except ConnectionRefusedError:
|
|
self.print("Cannot connect to target,Connection refused.Close connect")
|
|
conn.close()
|
|
continue
|
|
threadcr=threading.Thread(target=self.handler_client_rx,args=(addr,))
|
|
threadct=threading.Thread(target=self.handler_client_tx,args=(addr,))
|
|
threadsr=threading.Thread(target=self.handler_server_rx,args=(addr,))
|
|
threadst=threading.Thread(target=self.handler_server_tx,args=(addr,))
|
|
threaddm=threading.Thread(target=self.handler_daemon,args=(addr,))
|
|
self.connects[addr]=[[threadcr,threadct,threadsr,threadst],spec,targetap,conn,serversocket,False,[],[],threading.Event(),threading.Event(),threading.Event(),addr]
|
|
#[handlethreads/0,header/1,serveraddrport/2,clientsocket/3,serversocket/4,closeflag/5,csbuffer/6,scbuffer/7,csevent/8,scevent/9,closeevent/10,addr/11,cssize/12,scsize/13]
|
|
self.print("Start process threads...")
|
|
serversocket.send(spec)
|
|
threadcr.start()
|
|
threadct.start()
|
|
threadsr.start()
|
|
threadst.start()
|
|
threaddm.start()
|
|
self.print("Threads start done.")
|
|
else:
|
|
self.print("Accept close connect.Stop thread...")
|
|
break
|
|
else:
|
|
self.print("No header.Close connect....")
|
|
conn.close()
|
|
except Exception as err:
|
|
self.print("Error PortMuxer!!!")
|
|
raise err
|
|
finally:
|
|
self.print("Close PortMuxer sockets...")
|
|
self.listener_socket.close()
|
|
for addr in self.connects:
|
|
connect=self.connects[addr]
|
|
self.print("Close for",connect[11])
|
|
connect[3].close()
|
|
connect[4].close()
|
|
connect[5]=True
|
|
connect[8].set()
|
|
connect[9].set()
|
|
connect[10].set()
|
|
self.print("Sockets close done.")
|
|
self.print("Listener thread done.")
|
|
def __call__(self,wait=True):
|
|
self.print("Starting PortMuxer for %s:%s..."%self.listener_addrport)
|
|
self.listener_thread=threading.Thread(target=self.handler_listener)
|
|
self.listener_thread.start()
|
|
if wait:
|
|
try:
|
|
while True:self.listener_thread.join(timeout=self.config["main.wait.listener.sleeptime"])
|
|
except KeyboardInterrupt:
|
|
self.run=False
|
|
closesocket=socket.socket()
|
|
closesocket.connect(self.listener_addrport)
|
|
closesocket.close()
|
|
self.listener_socket.close()
|
|
self.print("PortMuxer for %s:%s exited."%self.listener_addrport)
|
|
portmuxer=PortMuxer({
|
|
"global.pause.sleeptime":0.6,
|
|
"thread.client.rx.recvlength":256*1024,
|
|
"thread.server.rx.recvlength":256*1024,
|
|
"main.wait.listener.sleeptime":0.5,
|
|
"listener.address":"0.0.0.0",
|
|
"listener.port":25567,
|
|
"muxer.rules":{
|
|
"header_length":2,
|
|
b"\x10\x00":("192.168.8.198",25565),
|
|
b"GE":("127.0.0.1",80),
|
|
b"PO":("127.0.0.1",80),
|
|
b"PU":("127.0.0.1",80),
|
|
b"OP":("127.0.0.1",80),
|
|
}
|
|
})
|
|
portmuxer()
|