require 'socket' module StreamingProxy # extend client sockets with a buffer and methods # to append to the buffer and send from the buffer module SocketBuffer MAX_BUF_SIZE = 1024*1024 def buffer @buffer ||= "" end def add_to_buffer(data) buffer << data if buffer.length > MAX_BUF_SIZE puts "buffer overflow (#{self.info})" @buffer = buffer[buffer.length - MAX_BUF_SIZE, MAX_BUF_SIZE] end end def send_from_buffer #if data = buffer.shift unless buffer.empty? bytes_written = syswrite(buffer) length = buffer.length if bytes_written < length puts "incomplete write: #{bytes_written}/#{length}" # incomplete write @buffer = buffer[bytes_written, length - bytes_written] else @buffer = '' end end end def info "#{addr[-1]}:#{addr[1]}" rescue 'closed ?' end end class Proxy attr_accessor :tcp_clients # create new proxy instance # host and port are the local hostname and port to listen on for client # connections, remote_* is the location of the remote stream def initialize(host, port, remote_host, remote_port, remote_path = '/') @remote_host = remote_host @remote_port = remote_port @remote_path = remote_path @host = host @port = port end # accept a new connection def accept_client(sock) client = sock.accept if client connect_to_remote if num_clients == 0 client.extend SocketBuffer tcp_clients << client client.add_to_buffer "#{@headers.join}\n" client.send_from_buffer puts "accepted new client (total clients now: #{num_clients})" end end def disconnect_all_clients tcp_clients.each { |c| disconnect_client(c) } end # disconnect the given client socket def disconnect_client(client) tcp_clients.delete client puts "closing connection (#{client.info}), remaining: #{num_clients}" client.close if num_clients == 0 puts "no clients left, disconnecting from remote" disconnect_from_remote end end # disconnects from the remote stream def disconnect_from_remote if @stream_srv @stream_srv.close @servers.delete(@stream_srv) @stream_srv = nil end end # connecto to the remote stream server def connect_to_remote @stream_srv = TCPSocket.new(@remote_host, @remote_port) @stream_srv.print "GET #{@remote_path} HTTP/1.1\n\n" @headers = [] while (line = @stream_srv.readline) && line.strip.length > 0 @headers << line end puts "received headers: \n#{@headers.join('')}" @servers << @stream_srv end def num_clients tcp_clients.size end # core method, uses select() to get sockets ready to read from / write to. def accept_and_process_clients(timeout=nil) ios = select(@servers, tcp_clients, tcp_clients) return if ios == nil # disconnect clients with errors ios[2].each do |sock| ios[0].delete(sock); disconnect_client(sock) end # input from sockets: accept new clients or read from input stream ios[0].each do |sock| if sock == @tcp_srv accept_client(sock) elsif sock == @stream_srv buf = sock.recv(65536) tcp_clients.each { |c| c.add_to_buffer buf } end end # output to sockets: write data to clients ios[1].each do |sock| begin sock.send_from_buffer rescue puts "error (client: #{sock.info}): #{$!}\n" disconnect_client(sock) end end end # runs the proxy def run @tcp_srv = TCPServer.new(@host, @port) @tcp_clients = [] @servers = [@tcp_srv] puts "listening on port ##{@port}" begin loop { accept_and_process_clients # dont loop too much - drops CPU load from 100% to near zero sleep 0.001 } rescue Interrupt, NoMemoryError, SystemExit => ex puts "Caught exception: #{ex.inspect} at #{ex.backtrace[0]} - shutting down..." rescue Exception => ex puts "Caught exception: #{ex.inspect} at #{ex.backtrace[0]} - ignoring and continuing..." sleep 1 retry ensure disconnect_all_clients @tcp_srv.close disconnect_from_remote end end end end #'192.168.2.102', 80, '/stream-location' trap('INT') {exit} StreamingProxy::Proxy.new( '192.168.2.104', 85, '192.168.2.102', 80, '/' ).run