Abbiamo già visto nei capitoli precedenti, diversi esempi di network programming. Questi esempi mostravano come creare una connessione e comunicare tramite essa, ma non avevano a che fare con una delle caratteristiche principali del network programming: il fatto che la network comunication è asincrona.
Nel capitolo Networking abbiamo visto l'esempio di un server server Echo che rispondeva alle richieste del client e come avevamo notato, l'implementazione permetteva di gestire le richieste di un solo client alla volta; come se fossimo in un ufficio con un solo sportello e un solo addetto a servire i clienti: il cliente si mette in fila, aspetta il suo turno fino a quando il cliente che era arrivato prima di lui non ha finito, solo allora viene chiamato per essere servito. Questa non è chiaramente un'implementazione di server che vogliamo. Nei server più clienti che si connettono devono essere serviti contemporaneamente.
Ora che abbiamo visto un modo per creare un thread, utilizziamolo nel codice del nostro server per gestire ogni client che si connette con un nuovo thread e rendere così in server capace di gestire la connessione di più client in contemporanea.
Di seguito la versione multithread del nuovo serverio.checksound.networking.MultiEchoServer e la classeio.checksound.networking.EchoServerClientHandler che implementa java.lang.Runnable.
Per far si che il server riesca a servire più clienti contemporaneamente bisogna instanziare un thread per ogni ogni nuova connessione dei client: il main thread è in un ciclo che aspetta le connessioni dei client sul metodo bloccante, accept, della java.net.ServerSocket. Quando un client si connette al server, il metodo accept della ServerSocket, ritorna un'istanza di java.net.Socket e questa viene passata a un thread che viene creato per gestire la comunicazione con il client: in questo modo il main thread del server non rimane bloccato a servire il client (di questo se ne occupa il nuovo thread creato) e può in questo modo ritornare al metodo acceptdella java.net.ServerSocketper aspettare le connessione di eventuali nuovi client.
packageio.checksound.networking;importjava.io.IOException;importjava.io.PrintWriter;importjava.net.Socket;importjava.util.Scanner;publicclassEchoServerClientHandlerimplementsRunnable {privateSocket socket;publicEchoServerClientHandler(Socket socket) {this.socket= socket; }publicvoidrun() {try {Scanner in =newScanner(socket.getInputStream());PrintWriter out =newPrintWriter(socket.getOutputStream());// leggo e scrivo nella connessione finche' non ricevo "quit"while (true) {String line =in.nextLine();if (line.equals("quit")) {break; } else {out.println("Received:"+ line);out.flush(); } }// chiudo gli stream e il socketin.close();out.close();socket.close(); } catch (IOException e) {System.err.println(e.getMessage()); } }}
PIU' AVANZATO (per ora non necessario): Si potrebbe in alternativa, invece di instanziare direttamente i thread, utilizzare un oggetto dell'interfaccia java.util.concurrent.Executor per eseguire un oggetto che implementa l'interfaccia java.lang.Runnable.
Un Executor è utilizzato al posto della creazione esplicita di thread. Per esempio piuttosto che invocare new Thread(new RunnableTask()).start() per ogni task da eseguire, si potrebbe usare:
Riprendendo l'esempio Ulteriori esempi di networking sul io.checksound.networking.DateServer creiamo ora la versione multithread, io.checksound.networking.DateServerWithThreads:
packageio.checksound.networking;importjava.net.*;importjava.io.*;importjava.util.Date;/** * This program is a server that takes connection requests on * the port specified by the constant LISTENING_PORT. When a * connection is opened, the program sends the current time to * the connected socket. The program will continue to receive * and process connections until it is killed (by a CONTROL-C, * for example). * * This version of the program creates a new thread for * every connection request. */publicclassDateServerWithThreads {publicstaticfinalint LISTENING_PORT =32007;publicstaticvoidmain(String[] args) {ServerSocket listener; // Listens for incoming connections.Socket connection; // For communication with the connecting program./* Accept and process connections forever, or until some error occurs. */try { listener =newServerSocket(LISTENING_PORT);System.out.println("Listening on port "+ LISTENING_PORT);while (true) {// Accept next connection request and handle it. connection =listener.accept(); ConnectionHandler handler =newConnectionHandler(connection);handler.start(); } }catch (Exception e) {System.out.println("Sorry, the server has shut down.");System.out.println("Error: "+ e);return; } } // end main() /** * Defines a thread that handles the connection with one * client. */privatestaticclassConnectionHandlerextendsThread {Socket client;ConnectionHandler(Socket socket) { client = socket; }publicvoidrun() {String clientAddress =client.getInetAddress().toString();try {System.out.println("Connection from "+ clientAddress );Date now =newDate(); // The current date and time.PrintWriter outgoing; // Stream for sending data. outgoing =newPrintWriter( client.getOutputStream() );outgoing.println( now.toString() );outgoing.flush(); // Make sure the data is actually sent!client.close(); }catch (Exception e){System.out.println("Error on connection with: "+ clientAddress +": "+ e); } } }} //end class DateServerWithThreads
Nel main thread del server c'è il loop per accettare le connessioni, e ad ogni nuova connessione viene creato un thread che risponde al client, inviandogli la stringa della data corrente.
ESERCIZIO: modificate il codice del server per utilizzare java.io.ObjectOutputStream e inviare l'oggetto di tipo java.util.Date. Da modificare anche il codice del client io.checksound.networking.DateClient.
Utilizzare un Thread Pool
Esempio di versione che usa un thread pool, io.checksound.networking.DateServerWithThreadPool:
packageio.checksound.networking;importjava.net.*;importjava.io.*;importjava.util.Date;importjava.util.concurrent.ArrayBlockingQueue;/** * This program is a server that takes connection requests on * the port specified by the constant LISTENING_PORT. When a * connection is opened, the program sends the current time to * the connected socket. The program will continue to receive * and process connections until it is killed (by a CONTROL-C, * for example). * * This version of the program uses a thread pool of worker * threads that handle the connections. */publicclassDateServerWithThreadPool {publicstaticfinalint LISTENING_PORT =32007;privatestaticfinalint THREAD_POOL_SIZE =5;privatestaticfinalint QUEUE_CAPACITY =10; /** * The connectionQueue is used to send connected sockets from the * main program to the worker threads. When a connection request * is received, the connected socket is placed into the queue. * Worker threads retrieve sockets from the queue as they become * available. This is an ArrayBlockingQueue, with a limited * capacity, to prevent the number of clients who are waiting * for service in the queue from becoming too large. */privatestaticArrayBlockingQueue<Socket> connectionQueue;publicstaticvoidmain(String[] args) {ServerSocket listener; // Listens for incoming connections.Socket connection; // For communication with the connecting program./* Create a listening socket, create the thread pool, then accept and * process connection requests forever. Note that the connection queue * MUST be created before the threads are created, since a thread tries * to use the queue as soon as it is started. Once created, the thread * will immediately block until a socket becomes available in the queue. */try { listener =newServerSocket(LISTENING_PORT); connectionQueue =newArrayBlockingQueue<Socket>(QUEUE_CAPACITY);for (int i =0; i < THREAD_POOL_SIZE; i++) {newConnectionHandler(); // Create the thread; it starts itself. }System.out.println("Listening on port "+ LISTENING_PORT);while (true) {// Accept next connection request and put it in the queue. connection =listener.accept();try {connectionQueue.put(connection); // Blocks if queue is full. }catch (InterruptedException e) { } } }catch (Exception e) {System.out.println("Sorry, the server has shut down.");System.out.println("Error: "+ e);return; } } // end main() /** * Defines one of the threads in the thread pool. Each thread runs * in an infinite loop in which it takes a connection from the connection * queue and handles communication with that client. The thread starts * itself in its constructor. The constructor also sets the thread * to be a daemon thread. (A program will end if all remaining * threads are daemon threads.) */privatestaticclassConnectionHandlerextendsThread {ConnectionHandler() {setDaemon(true);start(); }publicvoidrun() {while (true) {Socket client;try { client =connectionQueue.take(); }catch (InterruptedException e) {continue; // (If interrupted, just go back to start of while loop.) }String clientAddress =client.getInetAddress().toString();try {System.out.println("Connection from "+ clientAddress );System.out.println("Handled by thread "+this);Date now =newDate(); // The current date and time.PrintWriter outgoing; // Stream for sending data. outgoing =newPrintWriter( client.getOutputStream() );outgoing.println( now.toString() );outgoing.flush(); // Make sure the data is actually sent!client.close(); }catch (Exception e){System.out.println("Error on connection with: "+ clientAddress +": "+ e); } } } }} //end class DateServerWithThreadPool
La coda in questo programma è di tipo ArrayBlockingQueue<Socket>. Quindi, ha una capacità limitata, l'operazione di put() sarà bloccante se la coda ha raggiunto il limite. Ma noi non volevamo impedire il blocking nel main program? Quando il main program è bloccato, il server non accetta più connessioni, e i client che cercano di connettersi, devono aspettare. Sarebbe forse meglio utilizzare unaLinkedBlockingQueue e quindi avere una capacità illimitata?
In effetti, connessioni in una blocking queue devono aspettare comunque; esse non sono servite. Se la coda diventa eccessivamente lunga, le connessioni nella coda devono aspettare per un tempo molto lungo prima di essere servite. Se la coda cresce troppo, vuol dire che il server sta ricevendo connessioni più velocemente di quelle che riesce a servire. Può succedere per diversi motivi: il tuo server potrebbe semplicemente non essere potente a sufficienza per gestire il traffico che sta arrivando. Hai bisogno di comprare un nuovo server. O magari il thread pool non ha un numero sufficiente di thread per utilizzare il tuo server; dovresti incrementare la dimensione del thread pool per sfruttare le capacità del server. O magari il tuo server è sotto attacco di tipo "Denial Of Service", in questo caso qualcuno sta mandando deliberatamente richieste al tuo server di più di quelle che questo possa gestire, in modo da impedire agli altri utenti di utilizzare il servizio.
In ogni caso, un ArrayBlockingQueue con una capacità limitata è la scelta corretta.