commit ae5b670df4a255faf67296bd0fac0b9dcf095209 Author: UnknownMp Date: Tue Jun 10 17:20:53 2025 +0800 第一次提交 diff --git a/main.py b/main.py new file mode 100644 index 0000000..0a8b52a --- /dev/null +++ b/main.py @@ -0,0 +1,236 @@ +import socket +import threading +import time +class PortMuxer: + debug=False + def print(self,*args,**kwds): + self.printlock.acquire() + print(threading.get_native_id(),end=" ") + print(*args,**kwds) + self.printlock.release() + def printd(self,*args,**kwds): + if self.debug:self.print(*args,**kwds) + def __init__(self,config): + self.printlock=threading.Lock() + self.config=config + self.listener_addrport=(self.config["listener.address"],self.config["listener.port"]) + self.print("PortMuxer initing for %s:%s..."%self.listener_addrport) + self.rules=self.config["muxer.rules"] + self.listener_addr=self.config["listener.address"] + self.listener_port=self.config["listener.port"] + self.connects={} + self.print("Initing done.") + def handler_daemon(self,sid): + self.print("Daemon thread for %s:%s"%sid) + inf=self.connects[sid] + self.print("Start daemon") + inf[10].wait() + self.print("Connection close...") + inf[3].close() + inf[4].close() + inf[0][0].join() + inf[0][1].join() + inf[0][2].join() + inf[0][3].join() + self.print("Connect done for %s:%s"%sid) + del self.connects[sid] + self.print("Daemon thread done.") + def handler_client_rx(self,sid): + self.print("Client Rx thread for %s:%s"%sid) + inf=self.connects[sid] + connect=inf[3] + buffer=inf[6] + event=inf[8] + run=True + self.print("Start client rx.") + while run: + try:buf=connect.recv(self.config["thread.server.rx.recvlength"]) + except OSError: + buf=None + run=False + if buf: + self.printd("Recv client packet length",len(buf)) + buffer.append(buf) + event.set() + else: + self.print("Client close connect") + run=False + break + inf[5]=True + event.set() + inf[9].set() + inf[10].set() + self.print("Client Rx thread done.") + def handler_server_rx(self,sid): + self.print("Server Rx thread for %s:%s"%sid) + inf=self.connects[sid] + connect=inf[4] + buffer=inf[7] + event=inf[9] + run=True + self.print("Start server rx.") + while run: + try:buf=connect.recv(self.config["thread.server.rx.recvlength"]) + except OSError: + buf=None + run=False + if buf: + self.printd("Recv server packet length",len(buf)) + buffer.append(buf) + event.set() + else: + self.print("Server close connect") + run=False + break + inf[5]=True + event.set() + inf[8].set() + inf[10].set() + self.print("Client Tx thread done.") + def handler_server_tx(self,sid): + self.print("Server Tx thread for %s:%s"%sid) + inf=self.connects[sid] + connect=inf[4] + buffer=inf[6] + event=inf[8] + run=True + self.print("Start server tx.") + while run: + event.wait() + event.clear() + if inf[5]: + self.print("Server onnect close.") + run=False + break + else: + while buffer: + buf=buffer[0] + del buffer[0] + ssize=connect.send(buf) + self.printd("Send client packet length",len(buf)) + inf[10].set() + self.print("Server Rx thread done.") + def handler_client_tx(self,sid): + self.print("Client Tx thread for %s:%s"%sid) + inf=self.connects[sid] + connect=inf[3] + buffer=inf[7] + event=inf[9] + run=True + self.print("Start client tx.") + while run: + event.wait() + event.clear() + if inf[5]: + self.print("Client connect close.") + run=False + break + else: + while buffer: + buf=buffer[0] + del buffer[0] + self.printd("Send client packet length",len(buf)) + ssize=connect.send(buf) + inf[10].set() + self.print("Client Tx thread done.") + def handler_listener(self): + self.print("Listener thread for %s:%s..."%self.listener_addrport) + self.listener_socket=socket.socket() + self.listener_socket.bind(self.listener_addrport) + self.listener_socket.listen() + try: + self.run=True + self.print("Start accept.") + while self.run: + conn,addr=self.listener_socket.accept() + if self.run: + self.print("Accept connect from %s:%s.Checking header..."%addr) + spec=conn.recv(self.rules["header_length"]) + self.print("Header",spec) + if spec: + if spec in self.rules: + targetap=self.rules[spec] + self.print("Match one target") + else: + self.print("No match header.Choose default...") + if None in self.rules: + targetap=self.rules[None] + else: + self.print("No default target.Close connect...") + continue + self.print("Target %s:%s"%targetap) + serversocket=socket.socket() + try:serversocket.connect(targetap) + except ConnectionRefusedError: + self.print("Cannot connect to target,Connection refused.Close connect") + conn.close() + continue + threadcr=threading.Thread(target=self.handler_client_rx,args=(addr,)) + threadct=threading.Thread(target=self.handler_client_tx,args=(addr,)) + threadsr=threading.Thread(target=self.handler_server_rx,args=(addr,)) + threadst=threading.Thread(target=self.handler_server_tx,args=(addr,)) + threaddm=threading.Thread(target=self.handler_daemon,args=(addr,)) + self.connects[addr]=[[threadcr,threadct,threadsr,threadst],spec,targetap,conn,serversocket,False,[],[],threading.Event(),threading.Event(),threading.Event(),addr] + #[handlethreads/0,header/1,serveraddrport/2,clientsocket/3,serversocket/4,closeflag/5,csbuffer/6,scbuffer/7,csevent/8,scevent/9,closeevent/10,addr/11,cssize/12,scsize/13] + self.print("Start process threads...") + serversocket.send(spec) + threadcr.start() + threadct.start() + threadsr.start() + threadst.start() + threaddm.start() + self.print("Threads start done.") + else: + self.print("Accept close connect.Stop thread...") + break + else: + self.print("No header.Close connect....") + conn.close() + except Exception as err: + self.print("Error PortMuxer!!!") + raise err + finally: + self.print("Close PortMuxer sockets...") + self.listener_socket.close() + for addr in self.connects: + connect=self.connects[addr] + self.print("Close for",connect[11]) + connect[3].close() + connect[4].close() + connect[5]=True + connect[8].set() + connect[9].set() + connect[10].set() + self.print("Sockets close done.") + self.print("Listener thread done.") + def __call__(self,wait=True): + self.print("Starting PortMuxer for %s:%s..."%self.listener_addrport) + self.listener_thread=threading.Thread(target=self.handler_listener) + self.listener_thread.start() + if wait: + try: + while True:self.listener_thread.join(timeout=self.config["main.wait.listener.sleeptime"]) + except KeyboardInterrupt: + self.run=False + closesocket=socket.socket() + closesocket.connect(self.listener_addrport) + closesocket.close() + self.listener_socket.close() + self.print("PortMuxer for %s:%s exited."%self.listener_addrport) +portmuxer=PortMuxer({ + "global.pause.sleeptime":0.6, + "thread.client.rx.recvlength":256*1024, + "thread.server.rx.recvlength":256*1024, + "main.wait.listener.sleeptime":0.5, + "listener.address":"0.0.0.0", + "listener.port":25567, + "muxer.rules":{ + "header_length":2, + b"\x10\x00":("192.168.8.198",25565), + b"GE":("127.0.0.1",80), + b"PO":("127.0.0.1",80), + b"PU":("127.0.0.1",80), + b"OP":("127.0.0.1",80), + } + }) +portmuxer()