I Thread e networking

Abbiamo già visto nei capitoli precedenti, diversi esempi di network programming. Questi esempi mostravano come creare una connessione e comunicare tramite essa, ma non avevano a che fare con una delle caratteristiche principali del network programming: il fatto che la network comunication è asincrona.

Nel capitolo Networking abbiamo visto l'esempio di un server server Echo che rispondeva alle richieste del client e come avevamo notato, l'implementazione permetteva di gestire le richieste di un solo client alla volta; come se fossimo in un ufficio con un solo sportello e un solo addetto a servire i clienti: il cliente si mette in fila, aspetta il suo turno fino a quando il cliente che era arrivato prima di lui non ha finito, solo allora viene chiamato per essere servito. Questa non è chiaramente un'implementazione di server che vogliamo. Nei server più clienti che si connettono devono essere serviti contemporaneamente.

Ora che abbiamo visto un modo per creare un thread, utilizziamolo nel codice del nostro server per gestire ogni client che si connette con un nuovo thread e rendere così in server capace di gestire la connessione di più client in contemporanea.

Di seguito la versione multithread del nuovo serverio.checksound.networking.MultiEchoServer e la classeio.checksound.networking.EchoServerClientHandler che implementa java.lang.Runnable.

Per far si che il server riesca a servire più clienti contemporaneamente bisogna instanziare un thread per ogni ogni nuova connessione dei client: il main thread è in un ciclo che aspetta le connessioni dei client sul metodo bloccante, accept, della java.net.ServerSocket. Quando un client si connette al server, il metodo accept della ServerSocket, ritorna un'istanza di java.net.Socket e questa viene passata a un thread che viene creato per gestire la comunicazione con il client: in questo modo il main thread del server non rimane bloccato a servire il client (di questo se ne occupa il nuovo thread creato) e può in questo modo ritornare al metodo accept della java.net.ServerSocket per aspettare le connessione di eventuali nuovi client.

package io.checksound.networking;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;

public class EchoServerClientHandler implements Runnable {
	private Socket socket;

	public EchoServerClientHandler(Socket socket) {
		this.socket = socket;
	}

	public void run() {
		try {
			Scanner in = new Scanner(socket.getInputStream());
			PrintWriter out = new PrintWriter(socket.getOutputStream());
			// leggo e scrivo nella connessione finche' non ricevo "quit"
			while (true) {
				String line = in.nextLine();
				if (line.equals("quit")) {
					break;
				} else {
					out.println("Received:" + line);
					out.flush();
				}
			}
			// chiudo gli stream e il socket
			in.close();
			out.close();
			socket.close();
		} catch (IOException e) {
			System.err.println(e.getMessage());
		}
	}
}
package io.checksound.networking;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiEchoServer {
	private int port;

	public MultiEchoServer(int port) {
		this.port = port;
	}

	public void startServer() {
		//  ExecutorService executor = Executors.newCachedThreadPool();
		ServerSocket serverSocket;
		try {
			serverSocket = new ServerSocket(port);
		} catch (IOException e) {
			System.err.println(e.getMessage());
			// porta non disponibile
			return;
		}
		System.out.println("Server ready");
		while (true) {
			try {

				Socket socket = serverSocket.accept();
				System.out.println("Received client	connection");
				
				Thread t = new Thread(new EchoServerClientHandler(socket));
				t.start();
				
				// executor.submit(new EchoServerClientHandler(socket));
			} catch (IOException e) {
				break;
				// entrerei qui se serverSocket venisse chiuso
			}
		}
		// executor.shutdown();
	}

	public static void main(String[] args) {
		MultiEchoServer echoServer = new MultiEchoServer(1337);
		echoServer.startServer();
	}
}

PIU' AVANZATO (per ora non necessario): Si potrebbe in alternativa, invece di instanziare direttamente i thread, utilizzare un oggetto dell'interfaccia java.util.concurrent.Executor per eseguire un oggetto che implementa l'interfaccia java.lang.Runnable.

Un Executor è utilizzato al posto della creazione esplicita di thread. Per esempio piuttosto che invocare new Thread(new RunnableTask()).start() per ogni task da eseguire, si potrebbe usare:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

Vedi doc java.util.concurrent.Executor.

Riprendendo l'esempio Ulteriori esempi di networking sul io.checksound.networking.DateServer creiamo ora la versione multithread, io.checksound.networking.DateServerWithThreads:

package io.checksound.networking;

import java.net.*;
import java.io.*;
import java.util.Date;

/**
 * This program is a server that takes connection requests on
 * the port specified by the constant LISTENING_PORT.  When a
 * connection is opened, the program sends the current time to
 * the connected socket.  The program will continue to receive
 * and process connections until it is killed (by a CONTROL-C,
 * for example). 
 * 
 * This version of the program creates a new thread for
 * every connection request.
 */
public class DateServerWithThreads {

    public static final int LISTENING_PORT = 32007;

    public static void main(String[] args) {

        ServerSocket listener;  // Listens for incoming connections.
        Socket connection;      // For communication with the connecting program.

        /* Accept and process connections forever, or until some error occurs. */

        try {
            listener = new ServerSocket(LISTENING_PORT);
            System.out.println("Listening on port " + LISTENING_PORT);
            while (true) {
                // Accept next connection request and handle it.
                connection = listener.accept(); 
                ConnectionHandler handler = new ConnectionHandler(connection);
                handler.start();
            }
        }
        catch (Exception e) {
            System.out.println("Sorry, the server has shut down.");
            System.out.println("Error:  " + e);
            return;
        }

    }  // end main()


    /**
     *  Defines a thread that handles the connection with one
     *  client.
     */
    private static class ConnectionHandler extends Thread {
        Socket client;
        
        ConnectionHandler(Socket socket) {
            client = socket;
        }
        
        public void run() {
            String clientAddress = client.getInetAddress().toString();
            try {
                System.out.println("Connection from " + clientAddress );
                Date now = new Date();  // The current date and time.
                PrintWriter outgoing;   // Stream for sending data.
                outgoing = new PrintWriter( client.getOutputStream() );
                outgoing.println( now.toString() );
                outgoing.flush();  // Make sure the data is actually sent!
                client.close();
            }
            catch (Exception e){
                System.out.println("Error on connection with: " 
                        + clientAddress + ": " + e);
            }
        }
    }


} //end class DateServerWithThreads

Nel main thread del server c'è il loop per accettare le connessioni, e ad ogni nuova connessione viene creato un thread che risponde al client, inviandogli la stringa della data corrente.

ESERCIZIO: modificate il codice del server per utilizzare java.io.ObjectOutputStream e inviare l'oggetto di tipo java.util.Date. Da modificare anche il codice del client io.checksound.networking.DateClient.

Utilizzare un Thread Pool

Esempio di versione che usa un thread pool, io.checksound.networking.DateServerWithThreadPool:

package io.checksound.networking;

import java.net.*;
import java.io.*;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * This program is a server that takes connection requests on
 * the port specified by the constant LISTENING_PORT.  When a
 * connection is opened, the program sends the current time to
 * the connected socket.  The program will continue to receive
 * and process connections until it is killed (by a CONTROL-C,
 * for example). 
 * 
 * This version of the program uses a thread pool of worker
 * threads that handle the connections.
 */
public class DateServerWithThreadPool {

    public static final int LISTENING_PORT = 32007;

    private static final int THREAD_POOL_SIZE = 5;

    private static final int QUEUE_CAPACITY = 10;

    /**
     * The connectionQueue is used to send connected sockets from the
     * main program to the worker threads.  When a connection request
     * is received, the connected socket is placed into the queue.
     * Worker threads retrieve sockets from the queue as they become
     * available.  This is an ArrayBlockingQueue, with a limited
     * capacity, to prevent the number of clients who are waiting
     * for service in the queue from becoming too large.
     */
    private static ArrayBlockingQueue<Socket> connectionQueue;

    public static void main(String[] args) {

        ServerSocket listener;  // Listens for incoming connections.
        Socket connection;      // For communication with the connecting program.

        /* Create a listening socket, create the thread pool, then accept and 
         * process connection requests forever.  Note that the connection queue
         * MUST be created before the threads are created, since a thread tries
         * to use the queue as soon as it is started.  Once created, the thread
         * will immediately block until a socket becomes available in the queue.
         */

        try {

            listener = new ServerSocket(LISTENING_PORT);

            connectionQueue = new ArrayBlockingQueue<Socket>(QUEUE_CAPACITY);
            
            for (int i = 0; i < THREAD_POOL_SIZE; i++) {
                new ConnectionHandler();  // Create the thread; it starts itself.
            }

            System.out.println("Listening on port " + LISTENING_PORT);
            while (true) {
                    // Accept next connection request and put it in the queue.
                connection = listener.accept();
                try {
                    connectionQueue.put(connection); // Blocks if queue is full.
                }
                catch (InterruptedException e) {
                }
            }
        }
        catch (Exception e) {
            System.out.println("Sorry, the server has shut down.");
            System.out.println("Error:  " + e);
            return;
        }

    }  // end main()


    /**
     *  Defines one of the threads in the thread pool.  Each thread runs
     *  in an infinite loop in which it takes a connection from the connection
     *  queue and handles communication with that client.  The thread starts
     *  itself in its constructor.  The constructor also sets the thread
     *  to be a daemon thread.  (A program will end if all remaining
     *  threads are daemon threads.)
     */
    private static class ConnectionHandler extends Thread {
        ConnectionHandler() {
            setDaemon(true);
            start();
        }
        public void run() {
            while (true) {
                Socket client;
                try {
                    client = connectionQueue.take();
                }
                catch (InterruptedException e) {
                    continue; // (If interrupted, just go back to start of while loop.)
                }
                String clientAddress = client.getInetAddress().toString();
                try {
                    System.out.println("Connection from " + clientAddress );
                    System.out.println("Handled by thread " + this);
                    Date now = new Date();  // The current date and time.
                    PrintWriter outgoing;   // Stream for sending data.
                    outgoing = new PrintWriter( client.getOutputStream() );
                    outgoing.println( now.toString() );
                    outgoing.flush();  // Make sure the data is actually sent!
                    client.close();
                }
                catch (Exception e){
                    System.out.println("Error on connection with: " 
                            + clientAddress + ": " + e);
                }
            }
        }
    }


} //end class DateServerWithThreadPool

La coda in questo programma è di tipo ArrayBlockingQueue<Socket>. Quindi, ha una capacità limitata, l'operazione di put() sarà bloccante se la coda ha raggiunto il limite. Ma noi non volevamo impedire il blocking nel main program? Quando il main program è bloccato, il server non accetta più connessioni, e i client che cercano di connettersi, devono aspettare. Sarebbe forse meglio utilizzare unaLinkedBlockingQueue e quindi avere una capacità illimitata?

In effetti, connessioni in una blocking queue devono aspettare comunque; esse non sono servite. Se la coda diventa eccessivamente lunga, le connessioni nella coda devono aspettare per un tempo molto lungo prima di essere servite. Se la coda cresce troppo, vuol dire che il server sta ricevendo connessioni più velocemente di quelle che riesce a servire. Può succedere per diversi motivi: il tuo server potrebbe semplicemente non essere potente a sufficienza per gestire il traffico che sta arrivando. Hai bisogno di comprare un nuovo server. O magari il thread pool non ha un numero sufficiente di thread per utilizzare il tuo server; dovresti incrementare la dimensione del thread pool per sfruttare le capacità del server. O magari il tuo server è sotto attacco di tipo "Denial Of Service", in questo caso qualcuno sta mandando deliberatamente richieste al tuo server di più di quelle che questo possa gestire, in modo da impedire agli altri utenti di utilizzare il servizio.

In ogni caso, un ArrayBlockingQueue con una capacità limitata è la scelta corretta.

ESEMPI

Last updated