• 技术文章 >php教程 >PHP源码

    php的memcache队列类

    PHP中文网PHP中文网2016-05-25 17:09:32原创393

    memcacheQueue.class.php

    <?php
    /*
     * memcache队列类
     * 支持多进程并发写入、读取
     * 边写边读,AB面轮值替换
     * @author lkk/lianq.net
     * @create on 9:25 2012-9-28
     *
     *
     * @example:
     *		$obj = new memcacheQueue('duilie');
     *		$obj->add('1asdf');
     *		$obj->getQueueLength();
     *		$obj->read(11);
     *		$obj->get(8);
     */
    
    class memcacheQueue{
    	public static	$client;			//memcache客户端连接
    	public			$access;			//队列是否可更新	
    	private 		$currentSide;		//当前轮值的队列面:A/B
    	private			$lastSide;			//上一轮值的队列面:A/B
    	private 		$sideAHead;			//A面队首值
    	private 		$sideATail;			//A面队尾值
    	private 		$sideBHead;			//B面队首值
    	private 		$sideBTail;			//B面队尾值
    	private			$currentHead;		//当前队首值
    	private			$currentTail;		//当前队尾值
    	private			$lastHead;			//上轮队首值
    	private			$lastTail;			//上轮队尾值	
    	private 		$expire;			//过期时间,秒,1~2592000,即30天内;0为永不过期
    	private			$sleepTime;			//等待解锁时间,微秒
    	private			$queueName;			//队列名称,唯一值
    	private			$retryNum;			//重试次数,= 10 * 理论并发数
    	
    	const	MAXNUM		= 2000;					//(单面)最大队列数,建议上限10K
    	const	HEAD_KEY	= '_lkkQueueHead_';		//队列首kye
    	const	TAIL_KEY	= '_lkkQueueTail_';		//队列尾key
    	const	VALU_KEY	= '_lkkQueueValu_';		//队列值key
    	const	LOCK_KEY	= '_lkkQueueLock_';		//队列锁key
    	const	SIDE_KEY	= '_lkkQueueSide_';		//轮值面key
    	
    	/*
    	 * 构造函数
    	 * @param	[config]	array	memcache服务器参数
    	 * @param	[queueName]	string	队列名称
    	 * @param	[expire]	string	过期时间
    	 * @return	NULL
    	 */
    	public function __construct($queueName ='',$expire='',$config =''){
    		if(empty($config)){
    			self::$client = memcache_pconnect('localhost',11211);
    		}elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
    			self::$client = memcache_pconnect($config['host'],$config['port']);
    		}elseif(is_string($config)){//"127.0.0.1:11211"
    			$tmp = explode(':',$config);
    			$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
    			$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
    			self::$client = memcache_pconnect($conf['host'],$conf['port']);		
    		}
    		if(!self::$client) return false;
    		
    		ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
    		set_time_limit(0);//取消脚本执行延时上限
    		
    		$this->access = false;
    		$this->sleepTime = 1000;
    		$expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
    		$this->expire = $expire;
    		$this->queueName = $queueName;
    		$this->retryNum = 10000;
    		
    		$side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
    		$this->getHeadNTail($queueName);
    		if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
    		if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
    		if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
    		if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
    	}
    	
    	/*
    	 * 获取队列首尾值
    	 * @param	[queueName]	string	队列名称
    	 * @return	NULL
    	 */
    	private function getHeadNTail($queueName){
    		$this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
    		$this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
    		$this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
    		$this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
    	}
    	
    	/*
    	 * 获取当前轮值的队列面
    	 * @return	string	队列面名称
    	 */
    	public function getCurrentSide(){
    		$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
    		if($currentSide == 'A'){
    			$this->currentSide = 'A';
    			$this->lastSide = 'B';	
    
    			$this->currentHead	= $this->sideAHead;
    			$this->currentTail	= $this->sideATail;
    			$this->lastHead		= $this->sideBHead;
    			$this->lastTail		= $this->sideBTail;			
    		}else{
    			$this->currentSide = 'B';
    			$this->lastSide = 'A';
    
    			$this->currentHead	= $this->sideBHead;
    			$this->currentTail	= $this->sideBTail;
    			$this->lastHead		= $this->sideAHead;
    			$this->lastTail		= $this->sideATail;						
    		}
    		
    		return $this->currentSide;
    	}
    	
    	/*
    	 * 队列加锁
    	 * @return boolean
    	 */
    	private function getLock(){
    		if($this->access === false){
    			while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
    				usleep($this->sleepTime);
    				@$i++;
    				if($i > $this->retryNum){//尝试等待N次
    					return false;
    					break;
    				}
    			}
    			return $this->access = true;
    		}
    		return false;
    	}
    	
    	/*
    	 * 队列解锁
    	 * @return NULL
    	 */
    	private function unLock(){
    		memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
    		$this->access = false;
    	}
    	
    	/*
    	 * 添加数据
    	 * @param	[data]	要存储的值
    	 * @return	boolean
    	 */
    	public function add($data){
    		$result = false;
    		if(!$this->getLock()){
    			return $result;
    		} 
    		$this->getHeadNTail($this->queueName);
    		$this->getCurrentSide();
    		
    		if($this->isFull()){
    			$this->unLock();
    			return false;
    		}
    		
    		if($this->currentTail < self::MAXNUM){
    			$value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
    			if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
    				$this->changeTail();
    				$result = true;
    			}
    		}else{//当前队列已满,更换轮值面
    			$this->unLock();
    			$this->changeCurrentSide();
    			return $this->add($data);
    		}
    
    		$this->unLock();
    		return $result;
    	}
    	
    	/*
    	 * 取出数据
    	 * @param	[length]	int	数据的长度
    	 * @return	array
    	 */
    	public function get($length=0){
    		if(!is_numeric($length)) return false;
    		if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
    		if(!$this->getLock()) return false;
    
    		if($this->isEmpty()){
    			$this->unLock();
    			return false;
    		}
    		
    		$keyArray	= $this->getKeyArray($length);
    		$lastKey	= $keyArray['lastKey'];
    		$currentKey	= $keyArray['currentKey'];
    		$keys		= $keyArray['keys'];
    		$this->changeHead($this->lastSide,$lastKey);
    		$this->changeHead($this->currentSide,$currentKey);
    		
    		$data	= @memcache_get(self::$client, $keys);
    		foreach($keys as $v){//取出之后删除
    			@memcache_delete(self::$client, $v, 0);
    		}
    		$this->unLock();
    
    		return $data;
    	}
    	
    	/*
    	 * 读取数据
    	 * @param	[length]	int	数据的长度
    	 * @return	array
    	 */
    	public function read($length=0){
    		if(!is_numeric($length)) return false;
    		if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
    		$keyArray	= $this->getKeyArray($length);
    		$data	= @memcache_get(self::$client, $keyArray['keys']);
    		return $data;
    	}
    	
    	/*
    	 * 获取队列某段长度的key数组
    	 * @param	[length]	int	队列长度
    	 * @return	array
    	 */
    	private function getKeyArray($length){
    		$result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
    		$this->getHeadNTail($this->queueName);
    		$this->getCurrentSide();
    		if(empty($length)) return $result;
    		
    		//先取上一面的key
    		$i = $result['lastKey'] = 0;
    		for($i=0;$i<$length;$i++){
    			$result['lastKey'] = $this->lastHead + $i;
    			if($result['lastKey'] >= $this->lastTail) break;
    			$result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];
    		}
    		
    		//再取当前面的key
    		$j = $length - $i;
    		$k = $result['currentKey'] = 0;
    		for($k=0;$k<$j;$k++){
    			$result['currentKey'] = $this->currentHead + $k;
    			if($result['currentKey'] >= $this->currentTail) break;
    			$result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];
    		}
    
    		return $result;
    	}
    	
    	/*
    	 * 更新当前轮值面队列尾的值
    	 * @return	NULL
    	 */
    	private function changeTail(){
    		$tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
    		memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
    		//memcache_increment(self::$client, $tail_key, 1);//队列尾+1
    		$v = memcache_get(self::$client, $tail_key) +1;
    		memcache_set(self::$client, $tail_key,$v,false,$this->expire);
    	}
    	
    	/*
    	 * 更新队列首的值
    	 * @param	[side]		string	要更新的面
    	 * @param	[headValue]	int		队列首的值
    	 * @return	NULL
    	 */
    	private function changeHead($side,$headValue){
    		if($headValue < 1) return false;
    		$head_key = $this->queueName .$side . self::HEAD_KEY;
    		$tail_key = $this->queueName .$side . self::TAIL_KEY;
    		$sideTail = memcache_get(self::$client, $tail_key);
    		if($headValue < $sideTail){
    			memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
    		}elseif($headValue >= $sideTail){
    			$this->resetSide($side);
    		}
    	}
    	
    	/*
    	 * 重置队列面,即将该队列面的队首、队尾值置为0
    	 * @param	[side]	string	要重置的面
    	 * @return	NULL
    	 */
    	private function resetSide($side){
    		$head_key = $this->queueName .$side . self::HEAD_KEY;
    		$tail_key = $this->queueName .$side . self::TAIL_KEY;
    		memcache_set(self::$client, $head_key,0,false,$this->expire);
    		memcache_set(self::$client, $tail_key,0,false,$this->expire);
    	}
    	
    	
    	/*
    	 * 改变当前轮值队列面
    	 * @return	string
    	 */
    	private function changeCurrentSide(){
    		$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
    		if($currentSide == 'A'){
    			memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
    			$this->currentSide = 'B';
    		}else{
    			memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
    			$this->currentSide = 'A';
    		}
    		return $this->currentSide;
    	}
    	
    	/*
    	 * 检查当前队列是否已满
    	 * @return	boolean
    	 */
    	public function isFull(){
    		$result = false;
    		if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
    			$result = true;
    		}
    		return $result;
    	}
    	
    	/*
    	 * 检查当前队列是否为空
    	 * @return	boolean
    	 */
    	public function isEmpty(){
    		$result = true;
    		if($this->sideATail > 0 || $this->sideBTail > 0){
    			$result = false;
    		}
    		return $result;
    	}
    	
    	/*
    	 * 获取当前队列的长度
    	 * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
    	 * @return	int
    	 */
    	public function getQueueLength(){
    		$this->getHeadNTail($this->queueName);
    		$this->getCurrentSide();
    
    		$sideALength = $this->sideATail - $this->sideAHead;
    		$sideBLength = $this->sideBTail - $this->sideBHead;
    		$result = $sideALength + $sideBLength;
    		
    		return $result;
    	}
    	
    
    	/*
    	 * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
    	 * @return	boolean
    	 */
    	public function clear(){
    		if(!$this->getLock()) return false;
    		for($i=0;$i<self::MAXNUM;$i++){
    			@memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);
    			@memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
    		}
    		$this->unLock();
    		$this->resetSide('A');
    		$this->resetSide('B');
    		return true;
    	}
    	
    	/*
    	 * 清除所有memcache缓存数据
    	 * @return	NULL
    	 */
    	public function memFlush(){
    		memcache_flush(self::$client);
    	}
    
    
    }
    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    专题推荐:php
    上一篇:写给所爱的人,爱人之间共用博客系统!--优化版。 下一篇:搜索引擎来源关键字分析程序
    千万级数据并发解决方案

    相关文章推荐

    • php 怎么跨域写cookie实现同步登陆代码• php调用js文件的好办法• 用于 php-5.2 的 php.ini 中文版• win2003系统下IIS6+php5+Zend+MySql+phpMyAdmin的安装与配置• php简单文件上传代码
    1/1

    PHP中文网