• 技术文章 >后端开发 >php教程

    php自己实现memcached的队列类_PHP教程

    2016-07-13 09:54:51原创265

    php自己实现memcached的队列类


    add('1asdf');
     *      $obj->getQueueLength();
     *      $obj->read(11);
     *      $obj->get(8);
     */
    
    class memcacheQueue{
        public static   $client;            //memcache客户端连接
        public          $access;            //队列是否可更新   
        private         $currentSide;       //当前轮值的队列面:A/B
        private         $lastSide;          //上一轮值的队列面:A/B
        private         $sideAHead;         //A面队首值
        private         $sideATail;         //A面队尾值
        private         $sideBHead;         //B面队首值
        private         $sideBTail;         //B面队尾值
        private         $currentHead;       //当前队首值
        private         $currentTail;       //当前队尾值
        private         $lastHead;          //上轮队首值
        private         $lastTail;          //上轮队尾值 
        private         $expire;            //过期时间,秒,1~2592000,即30天内;0为永不过期
        private         $sleepTime;         //等待解锁时间,微秒
        private         $queueName;         //队列名称,唯一值
        private         $retryNum;          //重试次数,= 10 * 理论并发数
    
        const   MAXNUM      = 2000;                 //(单面)最大队列数,建议上限10K
        const   HEAD_KEY    = '_lkkQueueHead_';     //队列首kye
        const   TAIL_KEY    = '_lkkQueueTail_';     //队列尾key
        const   VALU_KEY    = '_lkkQueueValu_';     //队列值key
        const   LOCK_KEY    = '_lkkQueueLock_';     //队列锁key
        const   SIDE_KEY    = '_lkkQueueSide_';     //轮值面key
    
        /*
         * 构造函数
         * @param   [config]    array   memcache服务器参数
         * @param   [queueName] string  队列名称
         * @param   [expire]    string  过期时间
         * @return  NULL
         */
        public function __construct($queueName ='',$expire='',$config =''){
            if(empty($config)){
                self::$client = memcache_pconnect('localhost',11211);
            }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
                self::$client = memcache_pconnect($config['host'],$config['port']);
            }elseif(is_string($config)){//"127.0.0.1:11211"
                $tmp = explode(':',$config);
                $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
                $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
                self::$client = memcache_pconnect($conf['host'],$conf['port']);     
            }
            if(!self::$client) return false;
    
            ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
            set_time_limit(0);//取消脚本执行延时上限
    
            $this->access = false;
            $this->sleepTime = 1000;
            $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
            $this->expire = $expire;
            $this->queueName = $queueName;
            $this->retryNum = 10000;
    
            $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
            $this->getHeadNTail($queueName);
            if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
            if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
            if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
            if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
        }
    
        /*
         * 获取队列首尾值
         * @param   [queueName] string  队列名称
         * @return  NULL
         */
        private function getHeadNTail($queueName){
            $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
            $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
            $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
            $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
        }
    
        /*
         * 获取当前轮值的队列面
         * @return  string  队列面名称
         */
        public function getCurrentSide(){
            $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
            if($currentSide == 'A'){
                $this->currentSide = 'A';
                $this->lastSide = 'B';  
    
                $this->currentHead  = $this->sideAHead;
                $this->currentTail  = $this->sideATail;
                $this->lastHead     = $this->sideBHead;
                $this->lastTail     = $this->sideBTail;         
            }else{
                $this->currentSide = 'B';
                $this->lastSide = 'A';
    
                $this->currentHead  = $this->sideBHead;
                $this->currentTail  = $this->sideBTail;
                $this->lastHead     = $this->sideAHead;
                $this->lastTail     = $this->sideATail;                     
            }
    
            return $this->currentSide;
        }
    
        /*
         * 队列加锁
         * @return boolean
         */
        private function getLock(){
            if($this->access === false){
                while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
                    usleep($this->sleepTime);
                    @$i++;
                    if($i > $this->retryNum){//尝试等待N次
                        return false;
                        break;
                    }
                }
                return $this->access = true;
            }
            return false;
        }
    
        /*
         * 队列解锁
         * @return NULL
         */
        private function unLock(){
            memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
            $this->access = false;
        }
    
        /*
         * 添加数据
         * @param   [data]  要存储的值
         * @return  boolean
         */
        public function add($data){
            $result = false;
            if(!$this->getLock()){
                return $result;
            } 
            $this->getHeadNTail($this->queueName);
            $this->getCurrentSide();
    
            if($this->isFull()){
                $this->unLock();
                return false;
            }
    
            if($this->currentTail < self::MAXNUM){
                $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
                if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
                    $this->changeTail();
                    $result = true;
                }
            }else{//当前队列已满,更换轮值面
                $this->unLock();
                $this->changeCurrentSide();
                return $this->add($data);
            }
    
            $this->unLock();
            return $result;
        }
    
        /*
         * 取出数据
         * @param   [length]    int 数据的长度
         * @return  array
         */
        public function get($length=0){
            if(!is_numeric($length)) return false;
            if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
            if(!$this->getLock()) return false;
    
            if($this->isEmpty()){
                $this->unLock();
                return false;
            }
    
            $keyArray   = $this->getKeyArray($length);
            $lastKey    = $keyArray['lastKey'];
            $currentKey = $keyArray['currentKey'];
            $keys       = $keyArray['keys'];
            $this->changeHead($this->lastSide,$lastKey);
            $this->changeHead($this->currentSide,$currentKey);
    
            $data   = @memcache_get(self::$client, $keys);
            foreach($keys as $v){//取出之后删除
                @memcache_delete(self::$client, $v, 0);
            }
            $this->unLock();
    
            return $data;
        }
    
        /*
         * 读取数据
         * @param   [length]    int 数据的长度
         * @return  array
         */
        public function read($length=0){
            if(!is_numeric($length)) return false;
            if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
            $keyArray   = $this->getKeyArray($length);
            $data   = @memcache_get(self::$client, $keyArray['keys']);
            return $data;
        }
    
        /*
         * 获取队列某段长度的key数组
         * @param   [length]    int 队列长度
         * @return  array
         */
        private function getKeyArray($length){
            $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
            $this->getHeadNTail($this->queueName);
            $this->getCurrentSide();
            if(empty($length)) return $result;
    
            //先取上一面的key
            $i = $result['lastKey'] = 0;
            for($i=0;$i<$length;$i++){
                $result['lastKey'] = $this->lastHead + $i;
                if($result['lastKey'] >= $this->lastTail) break;
                $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];
            }
    
            //再取当前面的key
            $j = $length - $i;
            $k = $result['currentKey'] = 0;
            for($k=0;$k<$j;$k++){
                $result['currentKey'] = $this->currentHead + $k;
                if($result['currentKey'] >= $this->currentTail) break;
                $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];
            }
    
            return $result;
        }
    
        /*
         * 更新当前轮值面队列尾的值
         * @return  NULL
         */
        private function changeTail(){
            $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
            memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
            //memcache_increment(self::$client, $tail_key, 1);//队列尾+1
            $v = memcache_get(self::$client, $tail_key) +1;
            memcache_set(self::$client, $tail_key,$v,false,$this->expire);
        }
    
        /*
         * 更新队列首的值
         * @param   [side]      string  要更新的面
         * @param   [headValue] int     队列首的值
         * @return  NULL
         */
        private function changeHead($side,$headValue){
            if($headValue < 1) return false;
            $head_key = $this->queueName .$side . self::HEAD_KEY;
            $tail_key = $this->queueName .$side . self::TAIL_KEY;
            $sideTail = memcache_get(self::$client, $tail_key);
            if($headValue < $sideTail){
                memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
            }elseif($headValue >= $sideTail){
                $this->resetSide($side);
            }
        }
    
        /*
         * 重置队列面,即将该队列面的队首、队尾值置为0
         * @param   [side]  string  要重置的面
         * @return  NULL
         */
        private function resetSide($side){
            $head_key = $this->queueName .$side . self::HEAD_KEY;
            $tail_key = $this->queueName .$side . self::TAIL_KEY;
            memcache_set(self::$client, $head_key,0,false,$this->expire);
            memcache_set(self::$client, $tail_key,0,false,$this->expire);
        }
    
        /*
         * 改变当前轮值队列面
         * @return  string
         */
        private function changeCurrentSide(){
            $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
            if($currentSide == 'A'){
                memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
                $this->currentSide = 'B';
            }else{
                memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
                $this->currentSide = 'A';
            }
            return $this->currentSide;
        }
    
        /*
         * 检查当前队列是否已满
         * @return  boolean
         */
        public function isFull(){
            $result = false;
            if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
                $result = true;
            }
            return $result;
        }
    
        /*
         * 检查当前队列是否为空
         * @return  boolean
         */
        public function isEmpty(){
            $result = true;
            if($this->sideATail > 0 || $this->sideBTail > 0){
                $result = false;
            }
            return $result;
        }
    
        /*
         * 获取当前队列的长度
         * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
         * @return  int
         */
        public function getQueueLength(){
            $this->getHeadNTail($this->queueName);
            $this->getCurrentSide();
    
            $sideALength = $this->sideATail - $this->sideAHead;
            $sideBLength = $this->sideBTail - $this->sideBHead;
            $result = $sideALength + $sideBLength;
    
            return $result;
        }
    
        /*
         * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
         * @return  boolean
         */
        public function clear(){
            if(!$this->getLock()) return false;
            for($i=0;$iqueueName.'A'. self::VALU_KEY .$i, 0);
                @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
            }
            $this->unLock();
            $this->resetSide('A');
            $this->resetSide('B');
            return true;
        }
    
        /*
         * 清除所有memcache缓存数据
         * @return  NULL
         */
        public function memFlush(){
            memcache_flush(self::$client);
        }
    
    }
    


    www.bkjia.comtruehttp://www.bkjia.com/PHPjc/994954.htmlTechArticlephp自己实现memcached的队列类 add(1asdf); * $obj-getQueueLength(); * $obj-read(11); * $obj-get(8); */class memcacheQueue{ public static $client; //memcache客户端连接 pu...

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    专题推荐:队列
    上一篇:php操作redies封装的类_PHP教程 下一篇:PHP单文件上传原理及上传函数的封装_PHP教程
    Web大前端开发直播班

    相关文章推荐

    • Destoon实现多表查询示例_php实例• ThinkPHP模板判断输出Defined标签用法详解_php实例• ThinkPHP2.0读取MSSQL提示Incorrect syntax near the keyword 'AS'的解决方法_php实例• php图片缩放实现方法_php实例• php堆排序(heapsort)练习_php实例

    全部评论我要评论

  • 取消发布评论发送
  • 1/1

    PHP中文网