首頁>技術>

tomcat如何建立連線,處理請求

學習探討tomcat如何建立網路連線協議,並處理客戶端過來的請求

建立http網路連線,指定通訊協議

tomcat在建立時,會建立連線物件,負責處理客戶端的請求,基於socket

connector 連線 protocol 協議 endpoint終端 socket插座,埠連線

建立初始化

connector -> protocol -> endpoint -> socket

接收請求建立任務

acceptor.socket.acceptor()->

socketWrapper(攜帶通訊資訊)

​ -> poller(socketWrapper)

​ -> execute(socketWrapper) 建立執行緒

建立聯結器

Conector類

org.apache.catalina.connector.Connector

空參構造connector() -> connector(http/1.1)

/** * Defaults to using HTTP/1.1 NIO implementation. */public Connector() {    this("HTTP/1.1");}
指定網路連線協議http11

org.apache.coyote.http11.Http11NioProtocol

-> new Http11NioProtocol()

public Http11NioProtocol() {       super(new NioEndpoint());   }
指定服務終端處理模型非阻塞nio

org.apache.tomcat.util.net.NioEndpoint

-> new NioEndPoint()

建立之後如何被啟動?見springboot啟動tomcat方式

終端處理執行緒和執行緒池初始化

啟動之後

NioEndpoint執行bind()方法,

一些初始化,繫結埠

@Override    public void bind() throws Exception {        initServerSocket();        setStopLatch(new CountDownLatch(1));        // Initialize SSL if needed        initialiseSsl();        selectorPool.open(getName());    }		//socket相關  initServerSocket()具體如下	// Separated out to make it easier for folks that extend NioEndpoint to    // implement custom [server]sockets    protected void initServerSocket() throws Exception {            //.......        	//根據平臺不同,反回具體底層類物件(windows,linux,unix)            serverSock = ServerSocketChannel.open();            socketProperties.setProperties(serverSock.socket());            //繫結地址和埠號            InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());            serverSock.socket().bind(addr,getAcceptCount());        	//.......    }

NioEndpoint初始化之後,呼叫start()執行startInternal()

程式碼如下

// Create worker collectionif (getExecutor() == null) {    //建立執行緒池    createExecutor();}initializeConnectionLatch();// Start poller thread// 建立客戶端佇列(客戶端過來的請求)poller = new Poller();Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");pollerThread.setPriority(threadPriority);pollerThread.setDaemon(true);pollerThread.start();//建立接收遠端請求執行緒startAcceptorThread();
初始化執行緒池配置

-> createExecutor() 用於處理使用者請求

指定 備用執行緒,對大執行緒數,佇列型別,超時時間,和執行緒工廠

public void createExecutor() {        internalExecutor = true;        TaskQueue taskqueue = new TaskQueue();        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);        taskqueue.setParent( (ThreadPoolExecutor) executor);    }
建立Poller執行緒
poller = new Poller();Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");pollerThread.setPriority(threadPriority);pollerThread.setDaemon(true);pollerThread.start();
建立Acceptor執行緒
protected void startAcceptorThread() {        acceptor = new Acceptor<>(this);        String threadName = getName() + "-Acceptor";        acceptor.setThreadName(threadName);        Thread t = new Thread(acceptor, threadName);        t.setPriority(getAcceptorThreadPriority());        t.setDaemon(getDaemon());        t.start();    }
處理請求的相關物件(執行緒)Acceptor

org.apache.tomcat.util.net.Acceptor

Acceptor 負責迴圈等待遠端請求,將請求以socket形式攜帶資訊,呼叫setSocketOptions()將socket包裝配置為socketWrapper,

setSocketOptions: 對socket包裝處理配置,使用poller物件註冊到佇列,讓poller執行緒做後續的處理

Acceptor 類的run方法:

public void run() {        int errorDelay = 0;		//......以下省略部分程式碼        try {            // Loop until we receive a shutdown command            // 一直迴圈等待遠端請求            while (!stopCalled) {                // Accept the next incoming connection from the server socket                // 1 接收請求                socket = endpoint.serverSocketAccept();                                    // setSocketOptions() will hand the socket off to                // 2 處理請求,setSocketOptions() 內部呼叫poller 將新請求任務放入佇列                if (!endpoint.setSocketOptions(socket)) {                    endpoint.closeSocket(socket);                }                                    }        } finally {            stopLatch.countDown();        }        state = AcceptorState.ENDED;    }
Poller

org.apache.tomcat.util.net.NioEndpoint.Poller

Poller負責接收包裝後的socket請求,放入佇列,

並在run方法中迴圈去poll()請求任務,將與流讀寫有關的元件IOChannel Selector socketWrapper 繫結關聯

再透過selector獲取selectionKeys

迭代迴圈獲取對應的socket,提交任務(執行緒),執行緒讀寫處理socketWrapper等後續操作

public void run() {            // Loop until destroy() is called            while (true) {                // poller佇列任務處理  將IOChannel Selector socketWrapper 關聯 				hasEvents = events();                //......省略				                Iterator<SelectionKey> iterator =                    keyCount > 0 ? selector.selectedKeys().iterator() : null;                // Walk through the collection of ready keys and dispatch                // 非阻塞io api 任務處理                while (iterator != null && iterator.hasNext()) {                    SelectionKey sk = iterator.next();                    iterator.remove();                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();                    // Attachment may be null if another thread has called                    // cancelledKey()                    if (socketWrapper != null) {                        // 如果有等待處理的任務,則處理                        processKey(sk, socketWrapper);                        //processKey內部會呼叫processSocket方法,最終用執行緒池提交任務                    }                }                // Process timeouts                timeout(keyCount,hasEvents);            }            getStopLatch().countDown();        }
其他

events佇列

private final SynchronizedQueue<PollerEvent> events =                new SynchronizedQueue<>(); //事件佇列(socket請求)//註冊請求到佇列public void rigister(final NioSocketWrapper socketWrapper){    event = new PollerEvent(socketWrapper, OP_REGISTER);	addEvent(event);}private void addEvent(PollerEvent event) {    events.offer(event);    if (wakeupCounter.incrementAndGet() == 0) {        selector.wakeup();    }}

events()繫結及後面的 processSocket()最終提交實際處理任務到執行緒

/**         * Processes events in the event queue of the Poller.         *         * @return <code>true</code> if some events were processed,         *   <code>false</code> if queue was empty         */        public boolean events() {            boolean result = false;            PollerEvent pe = null;            for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {                result = true;                NioSocketWrapper socketWrapper = pe.getSocketWrapper();                SocketChannel sc = socketWrapper.getSocket().getIOChannel();                int interestOps = pe.getInterestOps();                if (sc == null) {                    log.warn(sm.getString("endpoint.nio.nullSocketChannel"));                    socketWrapper.close();                } else if (interestOps == OP_REGISTER) {                    try {                        //註冊繫結                        sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);                    } catch (Exception x) {                        log.error(sm.getString("endpoint.nio.registerFail"), x);                    }                } else {                    final SelectionKey key = sc.keyFor(getSelector());                    if (key == null) {                        // The key was cancelled (e.g. due to socket closure)                        // and removed from the selector while it was being                        // processed. Count down the connections at this point                        // since it won't have been counted down when the socket                        // closed.                        socketWrapper.close();                    } else {                        final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();                        if (attachment != null) {                            // We are registering the key to start with, reset the fairness counter.                            try {                                int ops = key.interestOps() | interestOps;                                attachment.interestOps(ops);                                key.interestOps(ops);                            } catch (CancelledKeyException ckx) {                                cancelledKey(key, socketWrapper);                            }                        } else {                            cancelledKey(key, socketWrapper);                        }                    }                }                if (running && !paused && eventCache != null) {                    pe.reset();//清空任務socketWrapper                    eventCache.push(pe);                }            }            return result;        }

setSocketOptions 中的socket任務註冊

protected boolean setSocketOptions(SocketChannel socket) {        NioSocketWrapper socketWrapper = null;        try {            // Allocate channel and wrapper            NioChannel channel = null;            if (nioChannels != null) {                channel = nioChannels.pop();            }            //...... 部分省略            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);                        socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());            poller.register(socketWrapper);            return true;        } catch (Throwable t) {            ExceptionUtils.handleThrowable(t);            try {                log.error(sm.getString("endpoint.socketOptionsError"), t);            } catch (Throwable tt) {                ExceptionUtils.handleThrowable(tt);            }            if (socketWrapper == null) {                destroySocket(socket);            }        }

來源:https://www.tuicool.com/articles/qaiaYjF

12
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • netty 解決拆包和粘包問題的 4 種方法,你會幾種?