English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Explicação detalhada do pool de threads no Java e exemplo de código

Contexto técnico do pool de threads

Na programação orientada a objetos, a criação e destruição de objetos são muito custosas em termos de tempo, porque a criação de um objeto requer a obtenção de recursos de memória ou outros recursos adicionais. No Java, isso é ainda mais verdadeiro, pois o virtual machine tenta rastrear cada objeto para que possa realizar a coleta de lixo após a destruição do objeto.

Portanto, uma maneira de melhorar a eficiência do programa de serviço é reduzir ao máximo o número de criações e destruições de objetos, especialmente a criação e destruição de objetos que consomem muitos recursos. Como usar objetos existentes para fornecer serviços é um problema chave a ser resolvido, na verdade, isso é a razão pela qual algumas tecnologias de 'pooling de recursos' surgiram.

Por exemplo, muitos componentes genéricos comuns encontrados no Android geralmente não podem se livrar do conceito de 'pool', como bibliotecas de carregamento de imagens, bibliotecas de solicitações de rede, mesmo que o mecanismo de comunicação de mensagens do Android, Message, quando usa Message.obtain(), é um objeto do pool de Message, então este conceito é muito importante. A tecnologia de pool de threads que será introduzida neste artigo também se alinha com essa ideia.

Vantagens do pool de threads:

1.Reutiliza threads do pool de threads, reduzindo o overhead de criação e destruição de objetos,

2.Pode controlar eficazmente o número máximo de concorrências de threads, melhorar a utilização dos recursos do sistema, ao mesmo tempo em que evita a competição excessiva por recursos e o bloqueio;

3.Pode gerenciar multi-threading de forma simples, tornando o uso de threads simples e eficiente.

Framework de pool de threads Executor

Java tem pools de threads implementados pelo framework Executor, o framework Executor inclui classes: Executor, Executors, ExecutorService, ThreadPoolExecutor, Callable e Future, FutureTask, etc.

Executor: Interface de todos os pools de threads, possui apenas um método.

public interface Executor {  
 void execute(Runnable command);  
}

ExecutorService: Adiciona comportamento ao Executor, é a interface mais direta para a implementação do Executor.

Executors: Fornece uma série de métodos fábrica para criar pools de threads, os pools de threads retornados implementam a interface ExecutorService.

ThreadPoolExecutor: Implementação específica da classe de pool de threads, geralmente todas as implementações de pool de threads usadas são baseadas neste classe. O método de construção é o seguinte:

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

corePoolSize: O número de threads do núcleo do pool de threads, o número de threads em execução no pool de threads nunca ultrapassará corePoolSize, por padrão, pode continuar a viver. Pode ser configurado para allowCoreThreadTimeOut como True, neste caso, o número de threads do núcleo é 0, e neste momento, keepAliveTime controla o tempo de espera de todos os threads.

maximumPoolSize: O número máximo de threads permitido pelo pool de threads;

keepAliveTime: Refere-se ao tempo de espera de conclusão do thread ocioso;

unit: É uma enumeração que representa a unidade de keepAliveTime;

workQueue: Representa a fila deBlockingQueue<Runnable> que armazena tarefas.

BlockingQueue: A fila de bloqueio (BlockingQueue) é uma ferramenta principal usada no java.util.concurrent para controlar a sincronização de threads. Se a BlockingQueue estiver vazia, a operação de extração da BlockingQueue será bloqueada e entrará no estado de espera até que a BlockingQueue receba algo, então ela será acordada. Da mesma forma, se a BlockingQueue estiver cheia, qualquer tentativa de inserção de elementos também será bloqueada e entrará no estado de espera até que haja espaço na BlockingQueue, então continuará a operação. A fila de bloqueio é frequentemente usada nas situações de produtor e consumidor, onde o produtor é a thread que adiciona elementos à fila e o consumidor é a thread que pega elementos da fila. A fila de bloqueio é o contêiner onde o produtor armazena elementos, e o consumidor também só pega elementos do contêiner. As classes de implementação específicas incluem LinkedBlockingQueue, ArrayBlockingQueue, etc. Geralmente, o bloqueio e o acorda são implementados internamente através de Lock e Condition (aprendizado e uso do Lock e Condition).

O processo de trabalho do pool de threads é o seguinte:

Quando o pool de threads é criado, ele não possui nenhuma thread. A fila de tarefas é passada como parâmetro. No entanto, mesmo que a fila contenha tarefas, o pool de threads não as executará imediatamente.

Quando o método execute() é chamado para adicionar uma tarefa, o pool de threads faz as seguintes avaliações:

Se o número de threads em execução for menor que corePoolSize, crie imediatamente uma thread para executar essa tarefa;

Se o número de threads em execução for maior ou igual a corePoolSize, coloque essa tarefa na fila;

Se a fila estiver cheia e o número de threads em execução for menor que maximumPoolSize, ainda assim é necessário criar uma thread não nuclear para executar essa tarefa imediatamente;

如果队列满了,而且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会抛出异常RejectExecutionException。

Quando um thread completa uma tarefa, ele pega a próxima tarefa da fila para executar.

Quando um thread não tem nada a fazer, após um tempo determinado (keepAliveTime), o pool de threads avalia. Se o número de threads em execução for maior que corePoolSize, então essa thread será desligada. Portanto, após a conclusão de todas as tarefas do pool de threads, ele se encolherá até o tamanho de corePoolSize.

Criação e uso do pool de threads

A criação do pool de threads utiliza métodos estáticos da classe de utilitário Executors. A seguir estão alguns tipos comuns de pools de threads.

SingleThreadExecutor: thread de fundo único (seu buffer de fila é ilimitado)

public static ExecutorService newSingleThreadExecutor() {  
 return new FinalizableDelegatedExecutorService (
  new ThreadPoolExecutor(1, 1,         
  0L, TimeUnit.MILLISECONDS,         
  new LinkedBlockingQueue<Runnable>())); 
}

Crie um pool de threads de única linha. Este pool de threads possui apenas um thread de núcleo em funcionamento, o que é equivalente a executar todas as tarefas em série em uma única linha. Se este único thread for encerrado devido a uma exceção, um novo thread substituirá ele. Este pool de threads garante que todas as tarefas sejam executadas na ordem de submissão das tarefas.

FixedThreadPool: um pool de threads que possui apenas threads de núcleo, de tamanho fixo (seu buffer de fila é ilimitado).

public static ExecutorService newFixedThreadPool(int nThreads) {        
        return new ThreadPoolExecutor(nThreads, nThreads,                                      
            0L, TimeUnit.MILLISECONDS,                                        
            new LinkedBlockingQueue<Runnable>());    
}
Cria um pool de threads de tamanho fixo. Cada vez que uma tarefa é submetida, é criada uma nova thread até que o tamanho do pool atinja o tamanho máximo. Uma vez atingido o tamanho máximo, o tamanho do pool permanece inalterado. Se uma thread terminar devido a uma exceção, o pool de threads complementa com uma nova thread.

CachedThreadPool: um pool de threads ilimitado, com recolha automática de threads.

public static ExecutorService newCachedThreadPool() {   
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,           
   60L, TimeUnit.SECONDS,          
   new SynchronousQueue<Runnable>());  
}

se o tamanho do pool de threads exceder o necessário para processar as tarefas, algumas threads ociosas serão recolhidas (6um thread que não executa tarefas por 0 segundos) pode inteligentemente adicionar novas threads para lidar com o aumento do número de tarefas. Este pool de threads não faz limites ao tamanho do pool, o tamanho do pool depende completamente da capacidade do sistema operacional (ou JVM) de criar o maior tamanho de thread possível. SynchronousQueue é um buffer sem1da fila de bloqueio.

ScheduledThreadPool: um pool de threads fixo no tamanho, com threads ilimitadas. Este pool de threads suporta a execução programada e periódica de tarefas.

public static ExecutorService newScheduledThreadPool(int corePoolSize) {   
 return new ScheduledThreadPool(corePoolSize, 
    Integer.MAX_VALUE,             
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,             
    new DelayedWorkQueue()); 
}

Crie um pool de threads que executa periodicamente tarefas. Se estiver ocioso, a thread de pool não nuclear será recolhida dentro do tempo DEFAULT_KEEPALIVEMILLIS.

Existem dois métodos mais comuns de submissão de tarefas no pool de threads:

execute:

ExecutorService.execute(Runnable runable);

submit:

FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable, T Result);

FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

A implementação do submit(Callable callable), submit(Runnable runnable) é semelhante.

public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 FutureTask<T> ftask = newTaskFor(task);
 execute(ftask);
 return ftask;
}

Pode-se ver que o submit abre uma tarefa com retorno de resultado, que retorna um objeto FutureTask, assim sendo possível obter o resultado através do método get(). O submit também chama execute(Runnable runable) no final, o submit apenas encapsula um objeto Callable ou Runnable em um objeto FutureTask, porque o FutureTask é um Runnable, então pode ser executado em execute. Sobre como encapsular Callable e Runnable em um objeto FutureTask, veja o uso de Callable, Future e FutureTask.

O princípio de implementação do pool de threads

Se apenas falarmos sobre o uso de pools de threads, este blog não tem muita valor, no máximo é um processo de familiarização com as APIs relacionadas ao Executor. O processo de implementação do pool de threads não usa a palavra-chave Synchronized, usa Volatile, Lock e filas de sincronização (bloqueio), classes relacionadas a Atomic, FutureTask etc., porque o desempenho do último é melhor. O processo de compreensão pode aprender bem as ideias de controle concorrente no código-fonte.

No início, mencionamos os benefícios da thread pool, que podem ser resumidos em três pontos:

Reaproveitabilidade de threads

Controlar o número máximo de concorrências

Gerenciar threads

1.Processo de reaproveitabilidade de threads

Para entender o princípio da reaproveitabilidade de threads, é necessário primeiro entender o ciclo de vida da thread.

No ciclo de vida da thread, ela passa pelos seguintes estados: Novo (New), Pronto (Runnable), Em Execução (Running), Bloqueado (Blocked) e Morto (Dead)5Este estado.

A Thread cria uma nova thread usando new, este processo é inicializar algumas informações da thread, como nome da thread, id, grupo da thread etc., pode ser considerado um objeto comum. Após chamar o start() da Thread, o Java Virtual Machine criará uma pilha de chamadas e um contador de programa para ele, além de definir hasBeenStarted como true, após o chamado do método start haverá exceção.

As threads neste estado não começaram a executar, apenas indicam que a thread pode ser executada. Quando a thread começar a executar, depende do gerenciador de threads do JVM. Quando a thread obtém o CPU, o método run() será chamado. Não chame o método run() da Thread manualmente. Após a programação da CPU, a thread alternará entre pronto, em execução e bloqueada, até que o método run() termine ou a thread seja encerrada de outra forma, entrando no estado morto.

Portanto, o princípio da implementação da reaproveitabilidade de threads deve ser manter a thread em estado de vida (pronta, em execução ou bloqueada). Vamos ver como o ThreadPoolExecutor implementa a reaproveitabilidade de threads.

No ThreadPoolExecutor, a principal classe Worker é usada para controlar a reaproveitabilidade de threads. Vamos ver o código simplificado da classe Worker, o que facilita a compreensão:

private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable tarefa = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}

Worker是一个Runnable,同时拥有一个thread,这个thread就是要启动的线程。在创建Worker对象时,同时创建一个Thread对象,并将Worker自身作为参数传入TThread。这样当Thread的start()方法被调用时,实际运行的是Worker的run()方法,接着到runWorker()中,有一个while循环,不断从getTask()中获取Runnable对象,顺序执行。getTask()又是如何获取Runnable对象的呢?

依旧是简化后的代码:

private Runnable getTask() {
 if(一些特殊情况) {
  return null;
 }
Runnable r = workQueue.take();
return r;
}

这个workQueue就是在初始化ThreadPoolExecutor时存放任务的BlockingQueue队列,这个队列中存放的都是将要执行的Runnable任务。因为BlockingQueue是一个阻塞队列,BlockingQueue.take()如果得到的是空,则进入等待状态,直到BlockingQueue有新的对象被加入时唤醒阻塞的线程。所以一般情况下Thread的run()方法就不会结束,而是不断执行从workQueue中的Runnable任务,这样就实现了线程复用的原理。

2.控制最大并发数

那么Runnable是什么时候放入workQueue的?Worker又是何时创建的,Worker中的Thread又是何时调用start()来启动新线程执行Worker的run()方法的呢?从上面的分析可以看出,Worker中的runWorker()在执行任务时是一个接一个,串行进行的,那么并发是如何体现的呢?

容易想到是在execute(Runnable runnable)时执行上述一些任务。看看execute是如何操作的。

execute:

Código simplificado

public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
int c = ctl.get();
// Número de threads atuais < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// Inicie diretamente uma nova thread.
if (addWorker(command, true))
return;
c = ctl.get();
}
// Número de threads ativas >= corePoolSize
// runState for RUNNING e a fila não está cheia
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// .Verificação novamente do estado RUNNING
// .Se não estiver em estado RUNNING, remova a tarefa da workQueue e rejeite
if (!isRunning(recheck) && remove(command))
reject(command);// Rejeição de tarefas com a estratégia especificada pelo pool de threads
// Dois casos:
// 1.Rejeição de novas tarefas em estado não RUNNING
// 2.Falha ao iniciar uma nova thread devido à fila estar cheia (workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}

addWorker:

Código simplificado

private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core63; corePoolSize : maximumPoolSize)) {
return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}

Com base no código, veja novamente a situação de adição de tarefas no processo de trabalho do pool de threads mencionado anteriormente:

* Se o número de threads em execução for menor que corePoolSize, crie imediatamente uma thread para executar essa tarefa;  
* Se o número de threads em execução for maior ou igual a corePoolSize, coloque essa tarefa na fila;
* Se a fila estiver cheia e o número de threads em execução for menor que maximumPoolSize, ainda assim é necessário criar uma thread não nuclear para executar essa tarefa imediatamente;
* 如果队列满了,而且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会抛出异常RejectExecutionException。

这就是Android的AsyncTask在并行执行时超出最大任务数会抛出RejectExecutionException的原因,详见基于最新版本的AsyncTask源码解析及AsyncTask的阴暗面。

通过addWorker如果成功创建新的线程,则通过start()启动新线程,同时将firstTask作为这个Worker中的run()中执行的第一个任务。

虽然每个Worker的任务是串行处理,但如果创建了多个Worker,因为共用一个workQueue,所以就会并行处理。

因此,根据corePoolSize和maximumPoolSize来控制最大并发数。大致过程可以用下图表示。

上面的讲解和图可以帮助你很好地理解这个过程。

如果你是做Android开发的,并且对Handler原理比较熟悉,你可能会觉得这个图很熟悉,其中的一些过程和Handler,Looper,Message的使用中,非常相似。Handler.send(Message)相当于execute(Runnable),Looper中维护的Message队列相当于BlockingQueue,只不过需要自己通过同步来维护这个队列,Looper中的loop()函数循环从Message队列取Message和Worker中的runWork()不断从BlockingQueue取Runnable是同样的道理。

3.管理线程

通过线程池可以很好地管理线程的重用,控制并发数,以及销毁等过程,线程的重用和控制并发已经在前面讲解过,而线程的管理过程已经穿插在其中,也很容易理解。

在ThreadPoolExecutor中有一个ctl的AtomicInteger变量。通过这个变量保存了两个内容:

所有线程的数量 每个线程所处的状态 其中低29位存储线程数,高3位存储runState,通过位运算来得到不同的值。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//获得线程的状态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
//Obter o número de Workers
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// Verificar se a thread está em execução
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

Aqui, principalmente através de shutdown e shutdownNow() para analisar o processo de fechamento do pool de threads. Primeiro, o pool de threads tem cinco estados para controlar a adição e execução de tarefas. Introduziremos os seguintes três principais:

Estado RUNNING: O pool de threads está funcionando normalmente, pode aceitar novas tarefas e processar as tarefas na fila;

Estado SHUTDOWN: Não aceita novas tarefas, mas executa as tarefas na fila;

Estado STOP: Não aceita novas tarefas, não processa as tarefas na fila; o método shutdown define o runState como SHUTDOWN, terminará todas as threads ociosas, mas as threads que estão trabalhando não serão afetadas, então as tarefas na fila também não serão executadas.

O método shutdownNow define o runState como STOP. A diferença em relação ao método shutdown, este método terminará todas as threads, então as tarefas na fila também não serão executadas.

Resumo
Através da análise do código-fonte de ThreadPoolExecutor, compreendi globalmente o processo de criação, adição e execução de threads, familiarizar-me com esses processos tornará o uso do pool de threads mais fácil.

E algumas coisas aprendidas sobre controle de concorrência e o uso do modelo de produtor-consumidor para a tarefa de processamento, que serão muito úteis para entender ou resolver outros problemas futuros. Por exemplo, o mecanismo Handler no Android, e a fila de Messager no Looper que pode ser processada com uma BlookQueue também é aceitável, isso é o que se aprende lendo o código-fonte.

Aqui está a compilação de materiais sobre Pool de Threads Java, continuaremos a complementar materiais relacionados, obrigado pelo apoio da comunidade!

Você também pode gostar