阅读 110

Redis使用zset有序集合做延迟队列

把所有需要在未来执行的任务都添加到有序集合里面,并将任务的执行时间设置为分值,另外再使用一个进程来查找有序集合里面是否存在可以立即执行的任务,如果有的话,就从有序集合里面移除那个任务,并将它添加到适当的任务队列里面。

--出自《Redis实战》

创建函数 addFutureJob,负责将延迟任务添加到有序集合job中。

有序集合里存储的元素,可以使用json格式保存。

内部结构可以类似如下这种:

array(
   "identifier"=>uniqid('j',true),   //标识符
   "list"=>"message:list",           //分配到某个具体队列
   "fun"=>null,                          //需要处理的函数名称,可以传递匿名函数
   "parameter"=>null              //传递参数
)

参数:
$job: 存储延迟任务的有序集合的名字,叫job

$queue: 当任务到达执行时间时,转存到具体的队列里执行

$fun: 负责执行的函数名称或匿名函数

$time: 延迟任务执行的具体时间

$parameter: 传递的参数

function addFutureJob($job,$queue,$fun,$time,$parameter=null){
        global $redis;
        $uniqid=uniqid('j',true);
        $json=json_encode(array(
           "identifier"=>$uniqid,
           "queue"=>$queue,
           "fun"=>$fun,
           "parameter"=>$parameter
        ));

        $result=$redis->zadd($job,$time,$json);

        if($result){
                return true;
        }else{
                return false;
        }

}

$queue="queue:list";    //执行任务队列 list结构
$job="job";  //存储的未来执行的任务 zset结构

$fun=function(){
        file_put_contents("/tmp/jujulafen2022","天气很好呀\n",FILE_APPEND);
};


$bool=addFutureJob($job,$queue,$fun,1624353251);


if($bool){
        echo "添加延迟任务成功";
}else{
        echo "添加延迟任务失败";
}

执行脚本后,将存储到job 这个有序集合里

另一个脚本中,读取job集合,检查是否有需要执行的任务

这个函数getQueue()的基本流程是这样:
(1)根据分数从小到大排列,读取第一个元素。如果元素不存在返回false
(2) 如果元素任务存在,并且它的分数(执行时间)小于等于当前时间,说明这个任务可以执行了。
(3) json转化成数组,读取任务的queue参数,将它添加到指定的队列里,然后从job中删除这个任务。
(4) 上述转移操作时,如果成功,记录日志。while继续循环检查job有序集合
(5) 如果转移操作失败,返回false
(6) 后续没有要执行的任务时,停止循环,返回false

function getQueue(){
   global $redis;
   $current=time();
   //查询范围,找出小于等于当前时间的任务
   
   while(true){
      //根据分数从小到大排序,找出第一个有序集合元素
      $value=$redis->zrange("job",0,0,true);
      //存在元素
      if($value){
        $time=array_values($value)[0];

        //如果小于等于当前时间,说明这个任务已经到执行时间了
        if($time<=time()){
            $job=array_keys($value)[0];
            $jobArr=json_decode($job,true);
                        //将这个任务,添加到指定的list列表里。
     
               $result=$redis->rpush($jobArr['queue'],$value);
            if($result){
                //从job队列移除这条任务
                $redis->zrem("job",$job);

                //记录日志

            }else{
                return false;
            }

        }else{
            //如果没有需要执行的任务,停止循环。
            break;
            return false;
        }

      }else{
         //队列为空时返回
         return false;
      }

   }

}

getQueue();

方案2:

此处循环读取也可以使用zrangeByScore()函数,根据分数范围进行读取返回集合内的指定元素。

$result=zrangeByScore("lafen",'-inf',time(),['withscores'=>true]);

其中time() 代表当前时间,如果在这个时间范围内有计划任务需要执行,就会返回结果。

可以遍历上面的返回结果,如果没有需要执行的计划任务,可以直接退出程序。等待下一次计划任务执行。

作者:辣粉和五斤

原文链接:https://www.jianshu.com/p/418781d8ae76

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐