Files
PortMuxer/main.py
2025-06-10 17:20:53 +08:00

237 lines
9.1 KiB
Python

import socket
import threading
import time
class PortMuxer:
debug=False
def print(self,*args,**kwds):
self.printlock.acquire()
print(threading.get_native_id(),end=" ")
print(*args,**kwds)
self.printlock.release()
def printd(self,*args,**kwds):
if self.debug:self.print(*args,**kwds)
def __init__(self,config):
self.printlock=threading.Lock()
self.config=config
self.listener_addrport=(self.config["listener.address"],self.config["listener.port"])
self.print("PortMuxer initing for %s:%s..."%self.listener_addrport)
self.rules=self.config["muxer.rules"]
self.listener_addr=self.config["listener.address"]
self.listener_port=self.config["listener.port"]
self.connects={}
self.print("Initing done.")
def handler_daemon(self,sid):
self.print("Daemon thread for %s:%s"%sid)
inf=self.connects[sid]
self.print("Start daemon")
inf[10].wait()
self.print("Connection close...")
inf[3].close()
inf[4].close()
inf[0][0].join()
inf[0][1].join()
inf[0][2].join()
inf[0][3].join()
self.print("Connect done for %s:%s"%sid)
del self.connects[sid]
self.print("Daemon thread done.")
def handler_client_rx(self,sid):
self.print("Client Rx thread for %s:%s"%sid)
inf=self.connects[sid]
connect=inf[3]
buffer=inf[6]
event=inf[8]
run=True
self.print("Start client rx.")
while run:
try:buf=connect.recv(self.config["thread.server.rx.recvlength"])
except OSError:
buf=None
run=False
if buf:
self.printd("Recv client packet length",len(buf))
buffer.append(buf)
event.set()
else:
self.print("Client close connect")
run=False
break
inf[5]=True
event.set()
inf[9].set()
inf[10].set()
self.print("Client Rx thread done.")
def handler_server_rx(self,sid):
self.print("Server Rx thread for %s:%s"%sid)
inf=self.connects[sid]
connect=inf[4]
buffer=inf[7]
event=inf[9]
run=True
self.print("Start server rx.")
while run:
try:buf=connect.recv(self.config["thread.server.rx.recvlength"])
except OSError:
buf=None
run=False
if buf:
self.printd("Recv server packet length",len(buf))
buffer.append(buf)
event.set()
else:
self.print("Server close connect")
run=False
break
inf[5]=True
event.set()
inf[8].set()
inf[10].set()
self.print("Client Tx thread done.")
def handler_server_tx(self,sid):
self.print("Server Tx thread for %s:%s"%sid)
inf=self.connects[sid]
connect=inf[4]
buffer=inf[6]
event=inf[8]
run=True
self.print("Start server tx.")
while run:
event.wait()
event.clear()
if inf[5]:
self.print("Server onnect close.")
run=False
break
else:
while buffer:
buf=buffer[0]
del buffer[0]
ssize=connect.send(buf)
self.printd("Send client packet length",len(buf))
inf[10].set()
self.print("Server Rx thread done.")
def handler_client_tx(self,sid):
self.print("Client Tx thread for %s:%s"%sid)
inf=self.connects[sid]
connect=inf[3]
buffer=inf[7]
event=inf[9]
run=True
self.print("Start client tx.")
while run:
event.wait()
event.clear()
if inf[5]:
self.print("Client connect close.")
run=False
break
else:
while buffer:
buf=buffer[0]
del buffer[0]
self.printd("Send client packet length",len(buf))
ssize=connect.send(buf)
inf[10].set()
self.print("Client Tx thread done.")
def handler_listener(self):
self.print("Listener thread for %s:%s..."%self.listener_addrport)
self.listener_socket=socket.socket()
self.listener_socket.bind(self.listener_addrport)
self.listener_socket.listen()
try:
self.run=True
self.print("Start accept.")
while self.run:
conn,addr=self.listener_socket.accept()
if self.run:
self.print("Accept connect from %s:%s.Checking header..."%addr)
spec=conn.recv(self.rules["header_length"])
self.print("Header",spec)
if spec:
if spec in self.rules:
targetap=self.rules[spec]
self.print("Match one target")
else:
self.print("No match header.Choose default...")
if None in self.rules:
targetap=self.rules[None]
else:
self.print("No default target.Close connect...")
continue
self.print("Target %s:%s"%targetap)
serversocket=socket.socket()
try:serversocket.connect(targetap)
except ConnectionRefusedError:
self.print("Cannot connect to target,Connection refused.Close connect")
conn.close()
continue
threadcr=threading.Thread(target=self.handler_client_rx,args=(addr,))
threadct=threading.Thread(target=self.handler_client_tx,args=(addr,))
threadsr=threading.Thread(target=self.handler_server_rx,args=(addr,))
threadst=threading.Thread(target=self.handler_server_tx,args=(addr,))
threaddm=threading.Thread(target=self.handler_daemon,args=(addr,))
self.connects[addr]=[[threadcr,threadct,threadsr,threadst],spec,targetap,conn,serversocket,False,[],[],threading.Event(),threading.Event(),threading.Event(),addr]
#[handlethreads/0,header/1,serveraddrport/2,clientsocket/3,serversocket/4,closeflag/5,csbuffer/6,scbuffer/7,csevent/8,scevent/9,closeevent/10,addr/11,cssize/12,scsize/13]
self.print("Start process threads...")
serversocket.send(spec)
threadcr.start()
threadct.start()
threadsr.start()
threadst.start()
threaddm.start()
self.print("Threads start done.")
else:
self.print("Accept close connect.Stop thread...")
break
else:
self.print("No header.Close connect....")
conn.close()
except Exception as err:
self.print("Error PortMuxer!!!")
raise err
finally:
self.print("Close PortMuxer sockets...")
self.listener_socket.close()
for addr in self.connects:
connect=self.connects[addr]
self.print("Close for",connect[11])
connect[3].close()
connect[4].close()
connect[5]=True
connect[8].set()
connect[9].set()
connect[10].set()
self.print("Sockets close done.")
self.print("Listener thread done.")
def __call__(self,wait=True):
self.print("Starting PortMuxer for %s:%s..."%self.listener_addrport)
self.listener_thread=threading.Thread(target=self.handler_listener)
self.listener_thread.start()
if wait:
try:
while True:self.listener_thread.join(timeout=self.config["main.wait.listener.sleeptime"])
except KeyboardInterrupt:
self.run=False
closesocket=socket.socket()
closesocket.connect(self.listener_addrport)
closesocket.close()
self.listener_socket.close()
self.print("PortMuxer for %s:%s exited."%self.listener_addrport)
portmuxer=PortMuxer({
"global.pause.sleeptime":0.6,
"thread.client.rx.recvlength":256*1024,
"thread.server.rx.recvlength":256*1024,
"main.wait.listener.sleeptime":0.5,
"listener.address":"0.0.0.0",
"listener.port":25567,
"muxer.rules":{
"header_length":2,
b"\x10\x00":("192.168.8.198",25565),
b"GE":("127.0.0.1",80),
b"PO":("127.0.0.1",80),
b"PU":("127.0.0.1",80),
b"OP":("127.0.0.1",80),
}
})
portmuxer()