ruby实战-mysql数据库共享连接池实现
开源库ruby-mysql
ruby-mysql库是一个纯ruby版本的MsSQL connector
开源项目地址
https://github.com/tmtm/ruby-mysql
下载依赖包
gem install ruby-mysql
项目中引入
require ‘mysql’
ruby实现数据库连接池
本次实现是在第一个版本的基础上进行了升级,ruby-mysql提供的api,本身不保证线程安全性,,原版本在多线程同时进行数据库操作时存安全隐患
升级版连接池操作逻辑如下
- 初始创建一批连接,加入队列
- 数据库操作前从队列中取得一个连接
- 连接数达上限后稍作等待,超时则失败抛出异常
- 获取连接后测试连接有效性,失效则重新获取连接
- 数据库操作完成后立即归还连接至队列中
技术点说明
- 线程池对外仅暴露execute方法
- 数据库操作全部封装在代码块中执行
源码如下
class DbConnectionPool
  INIT_POOL_SIZE = 20
  def initialize
    @pool = []
    0.upto(INIT_POOL_SIZE - 1) do
      @pool << mysql_connect
    end
    @mutex = Mutex.new
    @actual_pool_size = INIT_POOL_SIZE
  end
  def execute
    return unless block_given?
    begin
      conn = get_conn
      test conn
      yield conn
      add_to_pool conn
    rescue Exception => e
      @mutex.synchronize {
        @actual_pool_size -= 1
      }
      conn.close
      LogUtil.error 'DbConnectionPool execute'
      LogUtil.error e.backtrace.inspect
      raise e
    end
    LogUtil.info "actual_pool_size: #{@actual_pool_size}"
  end
  private
  def mysql_connect
    Mysql.connect(DatabaseConfig::HOST, 'root', 'ant', 'yecai', 3306,
                  Mysql::OPT_CONNECT_TIMEOUT=>1000,
                  Mysql::OPT_READ_TIMEOUT=>1000,
                  Mysql::OPT_WRITE_TIMEOUT=>1000)
  end
  def get_conn
    Timeout.timeout(3) do
      while @actual_pool_size >= 30
        sleep 2
      end
    end
    @mutex.synchronize {
      if @pool.size > 0
        return @pool.pop
      else
        @actual_pool_size += 1
        return mysql_connect
      end
    }
  end
  def add_to_pool(conn)
    @mutex.synchronize {
      @pool.push conn
    }
  end
  def test(conn)
    begin
      conn.query('select 1')
    rescue Exception => e
      LogUtil.error 'reconnect'
      conn.close
      conn = mysql_connect
    end
    conn
  end
end
使用方式
通常创建一个封装数据库操作的基类 DaoBase
class DaoBase
  def initialize
    autowired(DbConnectionPool)
  end
  def execute(&action)
    @db_connection_pool.execute &action
  end
end
通过依赖注入的方式引入DbConnectionPool实例,提供方法execute
构造器中完成对数据库连接池对象的注入,关于使用autowaired方法实现依赖注入请参考我的另一篇博文20行ruby代码实现依赖注入框架
上层Dao实现举例
用户数据表
CREATE TABLE `v1_users` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(64) NOT NULL,
  `user_name` varchar(64) NOT NULL DEFAULT '',
  `lv` int(11) NOT NULL DEFAULT '1',
  `exp` int(11) NOT NULL DEFAULT '0',
  `mtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
每一个用户数据为该表一条记录,程序逻辑需要实现用户创建,用户查询操作,确保易维护性,数据库操作封装到一个独立的Dao类UserDataDao中,实现如下
class UserDataDao < DaoBase
  def create_user(user_id, user_name)
    execute do |conn|
      stmt = conn.prepare('insert into v1_users(user_id, user_name) values(?, ?)')
      stmt.execute user_id, user_name
    end
  end
  def has_user?(user_id)
    users = nil
    execute do |conn|
      users = conn.prepare('select user_id from v1_users where user_id = ?').execute(user_id).fetch
    end
    !users.nil? && users.size == 1
  end
  def update_user_name(user_id, user_name)
    execute do |conn|
      conn.prepare('update v1_users set user_name = ? where user_id = ?')
          .execute(user_name, user_id)
    end
  end
  def get_user_lv(user_id)
    r = nil
    execute do |conn|
      r = conn.prepare('select lv, exp from v1_users where user_id = ?').execute(user_id).fetch
    end
    return r[0].to_i, r[1].to_i
  end
end