import socket import threading import time class PortMuxer: debug=False def print(self,*args,**kwds): self.printlock.acquire() print(threading.get_native_id(),end=" ") print(*args,**kwds) self.printlock.release() def printd(self,*args,**kwds): if self.debug:self.print(*args,**kwds) def __init__(self,config): self.printlock=threading.Lock() self.config=config self.listener_addrport=(self.config["listener.address"],self.config["listener.port"]) self.print("PortMuxer initing for %s:%s..."%self.listener_addrport) self.rules=self.config["muxer.rules"] self.listener_addr=self.config["listener.address"] self.listener_port=self.config["listener.port"] self.connects={} self.print("Initing done.") def handler_daemon(self,sid): self.print("Daemon thread for %s:%s"%sid) inf=self.connects[sid] self.print("Start daemon") inf[10].wait() self.print("Connection close...") inf[3].close() inf[4].close() inf[0][0].join() inf[0][1].join() inf[0][2].join() inf[0][3].join() self.print("Connect done for %s:%s"%sid) del self.connects[sid] self.print("Daemon thread done.") def handler_client_rx(self,sid): self.print("Client Rx thread for %s:%s"%sid) inf=self.connects[sid] connect=inf[3] buffer=inf[6] event=inf[8] run=True self.print("Start client rx.") while run: try:buf=connect.recv(self.config["thread.server.rx.recvlength"]) except OSError: buf=None run=False if buf: self.printd("Recv client packet length",len(buf)) buffer.append(buf) event.set() else: self.print("Client close connect") run=False break inf[5]=True event.set() inf[9].set() inf[10].set() self.print("Client Rx thread done.") def handler_server_rx(self,sid): self.print("Server Rx thread for %s:%s"%sid) inf=self.connects[sid] connect=inf[4] buffer=inf[7] event=inf[9] run=True self.print("Start server rx.") while run: try:buf=connect.recv(self.config["thread.server.rx.recvlength"]) except OSError: buf=None run=False if buf: self.printd("Recv server packet length",len(buf)) buffer.append(buf) event.set() else: self.print("Server close connect") run=False break inf[5]=True event.set() inf[8].set() inf[10].set() self.print("Client Tx thread done.") def handler_server_tx(self,sid): self.print("Server Tx thread for %s:%s"%sid) inf=self.connects[sid] connect=inf[4] buffer=inf[6] event=inf[8] run=True self.print("Start server tx.") while run: event.wait() event.clear() if inf[5]: self.print("Server onnect close.") run=False break else: while buffer: buf=buffer[0] del buffer[0] ssize=connect.send(buf) self.printd("Send client packet length",len(buf)) inf[10].set() self.print("Server Rx thread done.") def handler_client_tx(self,sid): self.print("Client Tx thread for %s:%s"%sid) inf=self.connects[sid] connect=inf[3] buffer=inf[7] event=inf[9] run=True self.print("Start client tx.") while run: event.wait() event.clear() if inf[5]: self.print("Client connect close.") run=False break else: while buffer: buf=buffer[0] del buffer[0] self.printd("Send client packet length",len(buf)) ssize=connect.send(buf) inf[10].set() self.print("Client Tx thread done.") def handler_listener(self): self.print("Listener thread for %s:%s..."%self.listener_addrport) self.listener_socket=socket.socket() self.listener_socket.bind(self.listener_addrport) self.listener_socket.listen() try: self.run=True self.print("Start accept.") while self.run: conn,addr=self.listener_socket.accept() if self.run: self.print("Accept connect from %s:%s.Checking header..."%addr) spec=conn.recv(self.rules["header_length"]) self.print("Header",spec) if spec: if spec in self.rules: targetap=self.rules[spec] self.print("Match one target") else: self.print("No match header.Choose default...") if None in self.rules: targetap=self.rules[None] else: self.print("No default target.Close connect...") continue self.print("Target %s:%s"%targetap) serversocket=socket.socket() try:serversocket.connect(targetap) except ConnectionRefusedError: self.print("Cannot connect to target,Connection refused.Close connect") conn.close() continue threadcr=threading.Thread(target=self.handler_client_rx,args=(addr,)) threadct=threading.Thread(target=self.handler_client_tx,args=(addr,)) threadsr=threading.Thread(target=self.handler_server_rx,args=(addr,)) threadst=threading.Thread(target=self.handler_server_tx,args=(addr,)) threaddm=threading.Thread(target=self.handler_daemon,args=(addr,)) self.connects[addr]=[[threadcr,threadct,threadsr,threadst],spec,targetap,conn,serversocket,False,[],[],threading.Event(),threading.Event(),threading.Event(),addr] #[handlethreads/0,header/1,serveraddrport/2,clientsocket/3,serversocket/4,closeflag/5,csbuffer/6,scbuffer/7,csevent/8,scevent/9,closeevent/10,addr/11,cssize/12,scsize/13] self.print("Start process threads...") serversocket.send(spec) threadcr.start() threadct.start() threadsr.start() threadst.start() threaddm.start() self.print("Threads start done.") else: self.print("Accept close connect.Stop thread...") break else: self.print("No header.Close connect....") conn.close() except Exception as err: self.print("Error PortMuxer!!!") raise err finally: self.print("Close PortMuxer sockets...") self.listener_socket.close() for addr in self.connects: connect=self.connects[addr] self.print("Close for",connect[11]) connect[3].close() connect[4].close() connect[5]=True connect[8].set() connect[9].set() connect[10].set() self.print("Sockets close done.") self.print("Listener thread done.") def __call__(self,wait=True): self.print("Starting PortMuxer for %s:%s..."%self.listener_addrport) self.listener_thread=threading.Thread(target=self.handler_listener) self.listener_thread.start() if wait: try: while True:self.listener_thread.join(timeout=self.config["main.wait.listener.sleeptime"]) except KeyboardInterrupt: self.run=False closesocket=socket.socket() closesocket.connect(self.listener_addrport) closesocket.close() self.listener_socket.close() self.print("PortMuxer for %s:%s exited."%self.listener_addrport) portmuxer=PortMuxer({ "global.pause.sleeptime":0.6, "thread.client.rx.recvlength":256*1024, "thread.server.rx.recvlength":256*1024, "main.wait.listener.sleeptime":0.5, "listener.address":"0.0.0.0", "listener.port":25567, "muxer.rules":{ "header_length":2, b"\x10\x00":("192.168.8.198",25565), b"GE":("127.0.0.1",80), b"PO":("127.0.0.1",80), b"PU":("127.0.0.1",80), b"OP":("127.0.0.1",80), } }) portmuxer()