编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

巧用GenericObjectPool创建自定义对象池

wxchong 2024-07-05 01:43:38 开源技术 49 ℃ 0 评论

作者:京东物流 高圆庆

1 前言

通常一个对象创建、销毁非常耗时的时候,我们不会频繁的创建和销毁它,而是考虑复用。复用对象的一种做法就是对象池,将创建好的对象放入池中维护起来,下次再用的时候直接拿池中已经创建好的对象继续用,这就是池化的思想。在java中,有很多池管理的概念,典型的如线程池,数据库连接池,socket连接池。本文章讲介绍apache提供的通用对象池框架GenericObjectPool,以及基于GenericObjectPool实现的sftp连接池在国际物流调度履约系统中的应用。

2 GenericObjectPool剖析

Apache Commons Pool是一个对象池的框架,他提供了一整套用于实现对象池化的API。它提供了三种对象池:GenericKeyedObjectPool,SoftReferenceObjectPool和GenericObjectPool,其中GenericObjectPool是我们最常用的对象池,内部实现也最复杂。GenericObjectPool的UML图如下所示:

2.1 核心接口ObjectPool

从图中可以看出,GenericObjectPool实现了ObjectPool接口,而ObjectPool就是对象池的核心接口,它定义了一个对象池应该实现的行为。

  • addObject方法:往池中添加一个对象
  • borrowObject方法:从池中借走到一个对象
  • returnObject方法:把对象归还给对象池
  • invalidateObject:验证对象的有效性
  • getNumIdle:返回对象池中有多少对象是空闲的,也就是能够被借走的对象的数量。
  • getNumActive:返回对象池中有对象对象是活跃的,也就是已经被借走的,在使用中的对象的数量。
  • clear:清理对象池。注意是清理不是清空,该方法要求的是,清理所有空闲对象,释放相关资源。
  • close:关闭对象池。这个方法可以达到清空的效果,清理所有对象以及相关资源。

2.2 对象工厂BasePooledObjectFactory

对象的创建需要通过对象工厂来创建,对象工厂需要实现BasePooledObjectFactory接口。ObjectPool接口中往池中添加一个对象,就需要使用对象工厂来创建一个对象。该接口说明如下:

public interface PooledObjectFactory<T> {

  /**
   * 创建一个可由池提供服务的实例,并将其封装在由池管理的PooledObject中。
   */
  PooledObject<T> makeObject() throws Exception;
  /**
   * 销毁池不再需要的实例
   */
  void destroyObject(PooledObject<T> p) throws Exception;

  /**
   * 确保实例可以安全地由池返回
   */
  boolean validateObject(PooledObject<T> p);

  /**
   * 重新初始化池返回的实例
   */
  void activateObject(PooledObject<T> p) throws Exception;

  /**
   * 取消初始化要返回到空闲对象池的实例
   */
  void passivateObject(PooledObject<T> p) throws Exception;
}

2.3 配置类GenericObjectPoolConfig

GenericObjectPoolConfig是封装GenericObject池配置的简单“结构”,此类不是线程安全的;它仅用于提供创建池时使用的属性。大多数情况,可以使用GenericObjectPoolConfig提供的默认参数就可以满足日常的需求,GenericObjectPoolConfig是一个抽象类,实际应用中需要新建配置类,然后继承它。

2.4 工作原理流程

  1. 构造方法
    当我们执行构造方法时,主要工作就是创建了一个存储对象的LinkedList类型容器,也就是概念意义上的“池”
  2. 从对象池中获取对象
    获取池中的对象是通过borrowObject()命令,源码比较复杂,简单而言就是去LinkedList中获取一个对象,如果不存在的话,要调用构造方法中第一个参数Factory工厂类的makeObject()方法去创建一个对象再获取,获取到对象后要调用validateObject方法判断该对象是否是可用的,如果是可用的才拿去使用。LinkedList容器减一
  3. 归还对象到线程池
    简单而言就是先调用validateObject方法判断该对象是否是可用的,如果可用则归还到池中,LinkedList容器加一,如果是不可以的则则调用destroyObject方法进行销毁

上面三步就是最简单的流程,由于取和还的流程步骤都在borrowObject和returnObject方法中固定的,所以我们只要重写Factory工厂类的makeObject()和validateObject以及destroyObject方法即可实现最简单的池的管理控制,通过构造方法传入该Factory工厂类对象则可以创建最简单的对象池管理类。这算是比较好的解耦设计模式,借和还的流程如下图所示:

3 开源框架如何使用GenericObjectPool

redis的java客户端jedis就是基于Apache Commons Pool对象池的框架来实现的。

3.1 对象工厂类JedisFactory

对象工厂类只需实现activateObject、destroyObject、makeObject、validateObject方法即可,源码如下:

class JedisFactory implements PooledObjectFactory<Jedis> {
    private final String host;
    private final int port;
    private final int timeout;
    private final int newTimeout;
    private final String password;
    private final int database;
    private final String clientName;

    public JedisFactory(String host, int port, int timeout, String password, int database) {
        this(host, port, timeout, password, database, (String)null);
    }

    public JedisFactory(String host, int port, int timeout, String password, int database, String clientName) {
        this(host, port, timeout, timeout, password, database, clientName);
    }

    public JedisFactory(String host, int port, int timeout, int newTimeout, String password, int database, String clientName) {
        this.host = host;
        this.port = port;
        this.timeout = timeout;
        this.newTimeout = newTimeout;
        this.password = password;
        this.database = database;
        this.clientName = clientName;
    }

    public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
        BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
        if (jedis.getDB() != (long)this.database) {
            jedis.select(this.database);
        }
    }

    public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
        BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
        if (jedis.isConnected()) {
            try {
                try {
                    jedis.quit();
                } catch (Exception var4) {
                }

                jedis.disconnect();
            } catch (Exception var5) {
            }
        }
    }

    public PooledObject<Jedis> makeObject() throws Exception {
        Jedis jedis = new Jedis(this.host, this.port, this.timeout, this.newTimeout);
        jedis.connect();
        if (null != this.password) {
            jedis.auth(this.password);
        }

        if (this.database != 0) {
            jedis.select(this.database);
        }

        if (this.clientName != null) {
            jedis.clientSetname(this.clientName);
        }

        return new DefaultPooledObject(jedis);
    }

    public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
    }

    public boolean validateObject(PooledObject<Jedis> pooledJedis) {
        BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();

        try {
            return jedis.isConnected() && jedis.ping().equals("PONG");
        } catch (Exception var4) {
            return false;
        }
    }
}

3.2 配置类JedisPoolConfig

public class JedisPoolConfig extends GenericObjectPoolConfig {
    public JedisPoolConfig() {
        this.setTestWhileIdle(true);
        this.setMinEvictableIdleTimeMillis(60000L);
        this.setTimeBetweenEvictionRunsMillis(30000L);
        this.setNumTestsPerEvictionRun(-1);
    }
}

4 国际物流履约系统中的应用

在国际物流履约系统中,我们和客户交互文件经常使用sftp服务器,因为创建sftp服务器的连接比较耗时,所以基于Apache Commons Pool对象池的框架来实现的我们自己的sftp链接池。

4.1 sftp对象池

SftpPool比较简单,直接继承GenericObjectPool。

public class SftpPool extends GenericObjectPool<Sftp> {
    public SftpPool(SftpFactory factory, SftpPoolConfig config, SftpAbandonedConfig abandonedConfig) {
        super(factory, config, abandonedConfig);
    }
}

4.2 对象工厂SftpFactory

这是基于Apache Commons Pool框架实现自定义对象池的核心类,代码如下:

public class SftpFactory extends BasePooledObjectFactory<Sftp> {

    private static final String CHANNEL_TYPE = "sftp";
    private static Properties sshConfig = new Properties();
    private String host;
    private int port;
    private String username;
    private String password;

    static {
        sshConfig.put("StrictHostKeyChecking", "no");
    }

    @Override
    public Sftp create() {
        try {
            JSch jsch = new JSch();
            Session sshSession = jsch.getSession(username, host, port);
            sshSession.setPassword(password);
            sshSession.setConfig(sshConfig);
            sshSession.connect();
            ChannelSftp channel = (ChannelSftp) sshSession.openChannel(CHANNEL_TYPE);
            channel.connect();
            log.info("sftpFactory创建sftp");
            return new Sftp(channel);
        } catch (JSchException e) {
            log.error("连接sftp失败:", e);
            throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);
        }
    }

    /**
     * @param sftp 被包装的对象
     * @return 对象包装器
     */
    @Override
    public PooledObject<Sftp> wrap(Sftp sftp) {
        return new DefaultPooledObject<>(sftp);
    }

    /**
     * 销毁对象
     * @param p 对象包装器
     */
    @Override
    public void destroyObject(PooledObject<Sftp> p) {
        log.info("开始销毁channelSftp");
        if (p!=null) {
            Sftp sftp = p.getObject();
            if (sftp!=null) {
                ChannelSftp channelSftp = sftp.getChannelSftp();
                if (channelSftp!=null) {
                    channelSftp.disconnect();
                    log.info("销毁channelSftp成功");
                }
            }
        }
    }

    /**
     * 检查连接是否可用
     *
     * @param p 对象包装器
     * @return {@code true} 可用,{@code false} 不可用
     */
    @Override
    public boolean validateObject(PooledObject<Sftp> p) {
        if (p!=null) {
            Sftp sftp = p.getObject();
            if (sftp!=null) {
                try {
                    sftp.getChannelSftp().cd("./");
                    log.info("验证连接是否可用,结果为true");
                    return true;
                } catch (SftpException e) {
                    log.info("验证连接是否可用,结果为false",e);
                    return false;
                }
            }
        }
        log.info("验证连接是否可用,结果为false");
        return false;
    }

    public static class Builder {
        private String host;
        private int port;
        private String username;
        private String password;
        public SftpFactory build() {
            return new SftpFactory(host, port, username, password);
        }
        public Builder host(String host) {
            this.host = host;
            return this;
        }
        public Builder port(int port) {
            this.port = port;
            return this;
        }
        public Builder username(String username) {
            this.username = username;
            return this;
        }
        public Builder password(String password) {
            this.password = password;
            return this;
        }
    }
}

4.3 配置类SftpPoolConfig

配置类继承了GenericObjectPoolConfig,可继承该类的默认属性,也可自定义配置参数。

public class SftpPoolConfig extends GenericObjectPoolConfig<Sftp> {
    public static class Builder {
        private int maxTotal;
        private int maxIdle;
        private int minIdle;
        private boolean lifo;
        private boolean fairness;
        private long maxWaitMillis;
        private long minEvictableIdleTimeMillis;
        private long evictorShutdownTimeoutMillis;
        private long softMinEvictableIdleTimeMillis;
        private int numTestsPerEvictionRun;
        private EvictionPolicy<Sftp> evictionPolicy; // 仅2.6.0版本commons-pool2需要设置
        private String evictionPolicyClassName;
        private boolean testOnCreate;
        private boolean testOnBorrow;
        private boolean testOnReturn;
        private boolean testWhileIdle;
        private long timeBetweenEvictionRunsMillis;
        private boolean blockWhenExhausted;
        private boolean jmxEnabled;
        private String jmxNamePrefix;
        private String jmxNameBase;
        public SftpPoolConfig build() {
            SftpPoolConfig config = new SftpPoolConfig();
            config.setMaxTotal(maxTotal);
            config.setMaxIdle(maxIdle);
            config.setMinIdle(minIdle);
            config.setLifo(lifo);
            config.setFairness(fairness);
            config.setMaxWaitMillis(maxWaitMillis);
            config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
            config.setEvictorShutdownTimeoutMillis(evictorShutdownTimeoutMillis);
            config.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
            config.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
            config.setEvictionPolicy(evictionPolicy);
            config.setEvictionPolicyClassName(evictionPolicyClassName);
            config.setTestOnCreate(testOnCreate);
            config.setTestOnBorrow(testOnBorrow);
            config.setTestOnReturn(testOnReturn);
            config.setTestWhileIdle(testWhileIdle);
            config.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
            config.setBlockWhenExhausted(blockWhenExhausted);
            config.setJmxEnabled(jmxEnabled);
            config.setJmxNamePrefix(jmxNamePrefix);
            config.setJmxNameBase(jmxNameBase);
            return config;
        }
}

4.4 SftpClient配置类

读取配置文件,创建SftpFactory、SftpPoolConfig、SftpPool,代码如下:

@Configuration
@ConditionalOnClass(SftpPool.class)
@EnableConfigurationProperties(SftpClientProperties.class)
public class SftpClientAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public ISftpClient sftpClient(SftpClientProperties sftpClientProperties) {
        if (sftpClientProperties.isMultiple()) {
            MultipleSftpClient multipleSftpClient = new MultipleSftpClient();
            sftpClientProperties.getClients().forEach((name, properties) -> {
                SftpFactory sftpFactory = createSftpFactory(properties);
                SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(properties);
                SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(properties);
                SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);
                ISftpClient sftpClient = new SftpClient(sftpPool);
                multipleSftpClient.put(name, sftpClient);
            });
            return multipleSftpClient;
        }
        SftpFactory sftpFactory = createSftpFactory(sftpClientProperties);
        SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(sftpClientProperties);
        SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(sftpClientProperties);
        SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);
        return new SftpClient(sftpPool);
    }

    public SftpFactory createSftpFactory(SftpClientProperties properties) {
        return new SftpFactory.Builder()
                .host(properties.getHost())
                .port(properties.getPort())
                .username(properties.getUsername())
                .password(properties.getPassword())
                .build();
    }

    public SftpPoolConfig createSftpPoolConfig(SftpClientProperties properties) {
        SftpClientProperties.Pool pool = properties.getPool();
        return new SftpPoolConfig.Builder()
                .maxTotal(pool.getMaxTotal())
                .maxIdle(pool.getMaxIdle())
                .minIdle(pool.getMinIdle())
                .lifo(pool.isLifo())
                .fairness(pool.isFairness())
                .maxWaitMillis(pool.getMaxWaitMillis())
                .minEvictableIdleTimeMillis(pool.getMinEvictableIdleTimeMillis())
                .evictorShutdownTimeoutMillis(pool.getEvictorShutdownTimeoutMillis())
                .softMinEvictableIdleTimeMillis(pool.getSoftMinEvictableIdleTimeMillis())
                .numTestsPerEvictionRun(pool.getNumTestsPerEvictionRun())
                .evictionPolicy(null)
                .evictionPolicyClassName(DefaultEvictionPolicy.class.getName())
                .testOnCreate(pool.isTestOnCreate())
                .testOnBorrow(pool.isTestOnBorrow())
                .testOnReturn(pool.isTestOnReturn())
                .testWhileIdle(pool.isTestWhileIdle())
                .timeBetweenEvictionRunsMillis(pool.getTimeBetweenEvictionRunsMillis())
                .blockWhenExhausted(pool.isBlockWhenExhausted())
                .jmxEnabled(pool.isJmxEnabled())
                .jmxNamePrefix(pool.getJmxNamePrefix())
                .jmxNameBase(pool.getJmxNameBase())
                .build();
    }

    public SftpAbandonedConfig createSftpAbandonedConfig(SftpClientProperties properties) {
        SftpClientProperties.Abandoned abandoned = properties.getAbandoned();
        return new SftpAbandonedConfig.Builder()
                .removeAbandonedOnBorrow(abandoned.isRemoveAbandonedOnBorrow())
                .removeAbandonedOnMaintenance(abandoned.isRemoveAbandonedOnMaintenance())
                .removeAbandonedTimeout(abandoned.getRemoveAbandonedTimeout())
                .logAbandoned(abandoned.isLogAbandoned())
                .requireFullStackTrace(abandoned.isRequireFullStackTrace())
                .logWriter(new PrintWriter(System.out))
                .useUsageTracking(abandoned.isUseUsageTracking())
                .build();
    }
}

4.5 对象SftpClient

SftpClient是实际工作的类,从SftpClient 中可获取到一个sftp链接,使用完成后,归还给sftpPool。SftpClient代码如下:

public class SftpClient implements ISftpClient {

    private SftpPool sftpPool;

    /**
     * 从sftp连接池获取连接并执行操作
     *
     * @param handler sftp操作
     */
    @Override
    public void open(ISftpClient.Handler handler) {
        Sftp sftp = null;
        try {
            sftp = sftpPool.borrowObject();
            ISftpClient.Handler policyHandler = new DelegateHandler(handler);
            policyHandler.doHandle(sftp);
        } catch (Exception e) {
            log.error("sftp异常:", e);
            throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);
        } finally {
            if (sftp != null) {
                sftpPool.returnObject(sftp);
            }
        }
    }

    @AllArgsConstructor
    static class DelegateHandler implements ISftpClient.Handler {

        private ISftpClient.Handler target;

        @Override
        public void doHandle(Sftp sftp) {
            try {
                target.doHandle(sftp);
            } catch (Exception e) {
                log.error("sftp异常:", e);
                throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);
            }
        }
    }
}

4.6 实战代码示例

通过sftp上传文件到XX服务器

 //通过SFTP上传到XX
((MultipleSftpClient) sftpClient).choose("XX");
sftpClient.open(sftp -> {
    boolean exist = sftp.isExist(inventoryPath);
    if(!exist){
        sftp.mkdirs(inventoryPath);
    }
    // 执行sftp操作
    InputStream is = new FileInputStream(oneColumnCSVFile);
    sftp.upload(inventoryPath, titleName, is);
    log.info("inventory upload over");
});

5 总结

通过本文的介绍可以知道,Apache Commons Pool定义了一个对象池的行为,提供了可扩展的配置类和对象工厂,封装了对象创建、从池中获取对象、归还对象的核心流程。还介绍了开源框架Jedis是如何基于GenericObjectPool来实现的连接池。最后介绍了国际物流履约系统中是如何基于GenericObjectPool来管理Sftp连接的。
掌握了GenericObjectPool的核心原理,我们就可以通过实现几个关键的接口,创建一个对象池管理工具,在项目中避免了对象的频繁创建和销毁,从而显著提升程序的性能。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表