Recently, I am writing an FTP upload tool and using Apache's FTPClient. In order to improve upload efficiency, I have adopted a multi-threaded approach, but frequent creation and destruction of FTPClient objects by each thread will inevitably cause unnecessary overhead. Therefore, it is best to use an FTPClient connection pool here. I carefully looked through the Apache API and found that it did not have an implementation of FTPClientPool, so I had to write an FTPClientPool myself. The following will introduce the entire process of developing a connection pool for your reference.
About Object Pool
Some objects have relatively high overhead, such as database connections. In order to reduce the performance consumption caused by frequent creation and destruction of objects, we can use the technology of object pooling to achieve object reuse. Object pool provides a mechanism that can manage the life cycle of objects in the object pool, provides a method to obtain and release objects, and allows clients to easily use objects in the object pool.
If we want to implement an object pool ourselves, we generally need to complete the following functions:
1. If there are objects available in the pool, the object pool should be able to return to the client.
2. After the client puts the objects back into the pool, they can reuse them.
3. Object pools can create new objects to meet the growing needs of clients
4. There is a mechanism to properly close the pool to end the life cycle of the object
Apache's Object Pool Toolkit
In order to facilitate us to develop our own object pool, the common-pool toolkit provided by Apache contains some interfaces and implementation classes for developing common object pools. The two most basic interfaces are ObjectPool and PoolableObjectFactory.
There are several basic methods in the ObjectPool interface:
1. addObject(): Add object to the pool
2. borrowObject(): The client borrows an object from the pool
3. returnObject(): The client returns an object to the pool
4. close(): close the object pool, clean memory and release resources, etc.
5. setFactory(ObjectFactory factory): a factory is needed to create objects in the pool.
Several basic methods in the PoolableObjectFactory interface:
1. makeObject(): Make an object
2. DestoryObject(): Destroy an object
3. validateObject(): Verify whether an object is still available
Through the above two interfaces, we can implement an object pool by ourselves.
Example: Develop an FTPClient object pool
Recently, a project is being developed that requires uploading files in HDFS to a group of FTP servers. In order to improve upload efficiency, we naturally consider using multi-threaded methods for uploading. The tool I uploaded ftp is the FTPClient in the Apache common-net package, but Apache does not provide FTPClientPool. So in order to reduce the number of times the FTPClient is created and destroyed, we developed an FTPClientPool ourselves to reuse the FTPClient connection.
Through the above introduction, we can use the common-pool package provided by Apache to assist us in developing connection pools. To develop a simple object pool, you only need to implement the ObjectPool and PoolableObjectFactory interfaces in the common-pool package. Let’s take a look at the implementation I wrote:
Write an ObjectPool interface implementation FTPClientPool
import java.io.IOException;import java.util.NoSuchElementException;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import org.apache.commons.net.ftp.FTPClient;import org.apache.commons.pool.ObjectPool;import org.apache.commons.pool.PoolableObjectFactory;/*** Implement an FTPClient connection pool* @author heaven*/public class FTPClientPool implements ObjectPool<FTPClient>{ private static final int DEFAULT_POOL_SIZE = 10; private final BlockingQueue<FTPClient> pool; private final FtpClientFactory factory; /** * Initializing the connection pool, a factory needs to be injected to provide the FTPClient instance* @param factory * @throws Exception */ public FTPClientPool(FtpClientFactory factory) throws Exception{ this(DEFAULT_POOL_SIZE, factory); } /** * * @param maxPoolSize * @param factory * @throws Exception */ public FTPClientPool(int poolSize, FtpClientFactory factory) throws Exception { this.factory = factory; pool = new ArrayBlockingQueue<FTPClient>(poolSize*2); initPool(poolSize); } /** * Initializing the connection pool, a factory needs to be injected to provide the FTPClient instance* @param maxPoolSize * @throws Exception */ private void initPool(int maxPoolSize) throws Exception { for(int i=0;i<maxPoolSize;i++){ //Add object to the pool addObject(); } } /* (non-Javadoc) * @see org.apache.commons.pool.ObjectPool#borrowObject() */ public FTPClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException { FTPClient client = pool.take(); if (client == null) { client = factory.makeObject(); addObject(); }else if(!factory.validateObject(client)){//Verify not to pass//Make the object invalidateObject(client); //Make and add new objects to the pool client = factory.makeObject(); addObject(); } return client; } /* (non-Javadoc) * @see org.apache.commons.pool.ObjectPool#returnObject(java.lang.Object) */ public void returnObject(FTPClient client) throws Exception { if ((client != null) && !pool.offer(client,3,TimeUnit.SECONDS)) { try { factory.destroyObject(client); } catch (IOException e) { e.printStackTrace(); } } } public void invalidateObject(FTPClient client) throws Exception { //Remove invalidate client pool.remove(client); } /* (non-Javadoc) * @see org.apache.commons.pool.ObjectPool#addObject() */ public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException { //Insert the object to the queue pool.offer(factory.makeObject(),3,TimeUnit.SECONDS); } public int getNumIdle() throws UnsupportedOperationException { return 0; } public int getNumActive() throws UnsupportedOperationException { return 0; } public void clear() throws Exception, UnsupportedOperationException { } /* (non-Javadoc) * @see org.apache.commons.pool.ObjectPool#close() */ public void close() throws Exception { while(pool.iterator().hasNext()){ FTPClient client = pool.take(); factory.destroyObject(client); } } public void setFactory(PoolableObjectFactory<FTPClient> factory) throws IllegalStateException, UnsupportedOperationException { }}Write another PoolableObjectFactory interface implementation FTPClientFactory
import java.io.IOException;import org.apache.commons.net.ftp.FTPClient;import org.apache.commons.net.ftp.FTPReply;import org.apache.commons.pool.PoolableObjectFactory;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.hdfstoftp.util.FTPClientException;/*** FTPClient factory class, provides the creation and destruction of FTPClient instances through the FTPClient factory* @author heaven*/public class FtpClientFactory implements PoolableObjectFactory<FTPClient> {private static Logger logger = LoggerFactory.getLogger("file"); private FTPClientConfigure config; //Pause a parameter object to the factory to facilitate the configuration of FTPClient's relevant parameters public FtpClientFactory(FTPClientConfigure config){ this.config=config; } /* (non-Javadoc) * @see org.apache.commons.pool.PoolableObjectFactory#makeObject() */ public FTPClient makeObject() throws Exception { FTPClient ftpClient = new FTPClient(); ftpClient.setConnectTimeout(config.getClientTimeout()); try { ftpClient.connect(config.getHost(), config.getPort()); int reply = ftpClient.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftpClient.disconnect(); logger.warn("FTPServer refused connection"); return null; } boolean result = ftpClient.login(config.getUsername(), config.getPassword()); if (!result) { throw new FTPClientException("ftpClient login failed! userName:" + config.getUsername() + " ; password:" + config.getPassword()); } ftpClient.setFileType(config.getTransferFileType()); ftpClient.setBufferSize(1024); ftpClient.setControlEncoding(config.getEncoding()); if (config.getPassiveMode().equals("true")) { ftpClient.enterLocalPassiveMode(); } } catch (IOException e) { e.printStackTrace(); } catch (FTPClientException e) { e.printStackTrace(); } return ftpClient; } /* (non-Javadoc) * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) */ public void destroyObject(FTPClient ftpClient) throws Exception { try { if (ftpClient != null && ftpClient.isConnected()) { ftpClient.logout(); } } catch (IOException io) { io.printStackTrace(); } finally { // Note that you must disconnect the connection in the finally code, otherwise it will cause ftp connection to be occupied try { ftpClient.disconnect(); } catch (IOException io) { io.printStackTrace(); } } } /* (non-Javadoc) * @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object) */ public boolean validateObject(FTPClient ftpClient) { try { return ftpClient.sendNoOp(); } catch (IOException e) { throw new RuntimeException("Failed to validate client: " + e, e); } } public void activateObject(FTPClient ftpClient) throws Exception { } public void passivateObject(FTPClient ftpClient) throws Exception { }}Finally, it is best to pass a parameter object to the factory to facilitate us to set some parameters of FTPClient
package org.apache.commons.pool.impl.contrib;/** * FTPClient configuration class, encapsulates the relevant configuration of FTPClient* * @author heaven */public class FTPClientConfigure { private String host; private int port; private String username; private String password; private String passiveMode; private String encoding; private int clientTimeout; private int threadNum; private int transferFileType; private boolean renameUploaded; private int retryTimes; public String getHost() { return host; } public void setHost(String host) { this. host = host; } public int getPort() { return port; } public void setPort(int port) { this. port = port; } public String getUsername() { return username; } public void setUsername(String username) { this. username = username; } public String getPassword() { return password; } public void setPassword(String password) { this. password = password; } public String getPassiveMode() { return passiveMode; } public void setPassiveMode(String passiveMode) { this. passiveMode = passiveMode; } public String getEncoding() { return encoding; } public void setEncoding(String encoding) { this. encoding = encoding; } public int getClientTimeout() { return clientTimeout; } public void setClientTimeout( int clientTimeout) { this. clientTimeout = clientTimeout; } public int getThreadNum() { return threadNum; } public void setThreadNum( int threadNum) { this. threadNum = threadNum; } public int getTransferFileType() { return transferFileType; } public void setTransferFileType( int transferFileType) { this. transferFileType = transferFileType; } public boolean isRenameUploaded() { return renameUploaded; } public void setRenameUploaded( boolean renameUploaded) { this. renameUploaded = renameUploaded; } public int getRetryTimes() { return retryTimes; } public void setRetryTimes( int retryTimes) { this. retryTimes = retryTimes; } @Override public String toString() { return "FTPClientConfig [host=" + host + "/n port=" + port + "/n username=" + username + "/n password=" + password + "/n passiveMode=" + passiveMode + "/n encoding=" + encoding + "/n clientTimeout=" + clientTimeout + "/n threadNum=" + threadNum + "/n transferFileType=" + transferFileType + "/n renameUploaded=" + renameUploaded + "/n retryTimes=" + retryTimes + "]" ; }}The FTPClientPool connection pool class manages the life cycle of the FTPClient object, and is responsible for the lending, planning, and pool destruction. The FTPClientPool class depends on the FtpClientFactory class, which is used to create and destroy objects by this engineering class; the FtpClientFactory also relies on the FTPClientConfigure class, which is responsible for encapsulating the configuration parameters of the FTPClient. At this point, our FTPClient connection pool has been developed.
It should be noted that an ArrayBlockingQueue is used in FTPClientPool to manage and store FTPClient objects. For blocking queues, please refer to my article: [Java Concurrency] BlockingQueue
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.