python的分布式任务huey如何实现异步化任务讲解
本文我们来分享一个python的轻型的任务队列程序,他可以让python的分布式任务huey实现异步化任务,感兴趣的朋友可以看看。
一个轻型的任务队列,功能和相关的broker没有celery强大,重在轻型,而且代码读起来也比较的简单。
关于huey的介绍: (比celery轻型,比mrq、rq要好用 !)
a lightweight alternative.
written in python
no deps outside stdlib, except redis (or roll your own backend)
support for django
supports:
multi-threaded task execution
scheduled execution at a given time
periodic execution, like a crontab
retrying tasks that fail
task result storage
安装:
代码如下 |
|
Installing
代码如下 |
|
Installing
huey can be installed very easily using pip.
pip install huey
huey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.
pip install redis
Using git
If you want to run the very latest, feel free to pull down the repo from github and install by hand.
git clone https://github.com/coleifer/huey.git
cd huey
python setup.py install
You can run the tests using the test-runner:
python setup.py test |
huey can be installed very easily using pip.
pip install huey
huey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.
pip install redis
代码如下 |
|
from huey import RedisHuey, crontab
huey = RedisHuey('my-app', host='redis.myapp.com')
@huey.task()
def add_numbers(a, b):
return a b
@huey.periodic_task(crontab(minute='0', hour='3'))
def nightly_backup():
sync_all_data() |
Using git
If you want to run the very latest, feel free to pull down the repo from github and install by hand.
git clone https://github.com/coleifer/huey.git
cd huey
python setup.py install
You can run the tests using the test-runner:
python setup.py test |
关于huey的api,下面有详细的介绍及参数介绍的。
代码如下 |
|
from huey import RedisHuey, crontab
huey = RedisHuey('my-app', host='redis.myapp.com')
@huey.task()
def add_numbers(a, b):
return a b
@huey.periodic_task(crontab(minute='0', hour='3'))
def nightly_backup():
sync_all_data() |
juey作为woker的时候,一些cli参数。
常用的是:
-l 关于日志文件的执行 。
-w workers的数目,-w的数值大了,肯定是增加任务的处理能力
-p --periodic 启动huey worker的时候,他会从tasks.py里面找到 需要crontab的任务,会派出几个线程专门处理这些事情。
-n 不启动关于crontab里面的预周期执行,只有你触发的时候,才会执行周期星期的任务。
--threads 意思你懂的。
1
代码如下 |
|
# 原文:
代码如下 |
|
# 原文:
The following table lists the options available for the consumer as well as their default values.
-l, --logfile
Path to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.
-v, --verbose
Verbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.
-q, --quiet
Only log errors. The default loglevel for the consumer is INFO.
-w, --workers
Number of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.
-p, --periodic
Indicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice.
-n, --no-periodic
Indicate that this consumer process should not enqueue periodic tasks.
-d, --delay
When using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds.
-m, --max-delay
The maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds.
-b, --backoff
The amount to back-off when polling for results. Must be greater than one. Default is 1.15.
-u, --utc
Indicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option.
--localtime
Indicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False.
Examples
Running the consumer with 8 threads, a logfile for errors only, and a very short polling interval:
huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0 |
The following table lists the options available for the consumer as well as their default values.
-l, --logfile
Path to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.
-v, --verbose
Verbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.
-q, --quiet
Only log errors. The default loglevel for the consumer is INFO.
-w, --workers
Number of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.
代码如下 |
|
# config.py
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
huey = Huey(queue) |
-p, --periodic
Indicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice.
-n, --no-periodic
Indicate that this consumer process should not enqueue periodic tasks.
-d, --delay
When using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds.
-m, --max-delay
The maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds.
-b, --backoff
The amount to back-off when polling for results. Must be greater than one. Default is 1.15.
-u, --utc
Indicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option.
--localtime
Indicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False.
Examples
Running the consumer with 8 threads, a logfile for errors only, and a very short polling interval:
huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0 |
任务队列huey 是靠着redis来实现queue的任务存储,所以需要咱们提前先把redis-server和redis-py都装好。 安装的方法就不说了,自己搜搜吧。
我们首先创建下huey的链接实例 :
代码如下 |
|
# config.py
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
huey = Huey(queue) |
然后就是关于任务的,也就是你想让谁到任务队列这个圈子里面,和celey、rq,mrq一样,都是用tasks.py表示的。
代码如下 |
|
from config import huey # import the huey we instantiated in config.py
代码如下 |
|
from config import huey # import the huey we instantiated in config.py
@huey.task()
def count_beans(num):
print '-- counted %s beans --' % num |
@huey.task()
def count_beans(num):
print '-- counted %s beans --' % num
|
代码如下 |
|
main.py
from config import huey # import our "huey" object
from tasks import count_beans # import our task
if __name__ == '__main__':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print 'Enqueued job to count %s beans' % beans
Ensure you have Redis running locally
Ensure you have installed huey
Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).
Run the main program: python main.py |
再来一个真正去执行的 。 main.py 相当于生产者,tasks.py相当于消费者的关系。 main.py负责喂数据。
代码如下 |
|
main.py
from config import huey # import our "huey" object
from tasks import count_beans # import our task
if __name__ == '__main__':
代码如下 |
|
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
from huey.backends.redis_backend import RedisDataStore # ADD THIS LINE
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
result_store = RedisDataStore('results', host='localhost', port=6379) # ADDED
huey = Huey(queue, result_store=result_store) # ADDED result store |
beans = raw_input('How many beans? ')
count_beans(int(beans))
print 'Enqueued job to count %s beans' % beans
Ensure you have Redis running locally
代码如下 |
|
>>> from main import count_beans
>>> res = count_beans(100)
>>> res # what is "res" ?
>>> res.get() # get the result of this task
'Counted 100 beans' |
Ensure you have installed huey
Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).
Run the main program: python main.py |
和celery、rq一样,他的结果获取是需要在你的config.py或者主代码里面指明他的存储的方式,现在huey还仅仅是支持redis,但相对他的特点和体积,这已经很足够了 !
只是那几句话而已,导入RedisDataStore库,申明下存储的地址。
代码如下 |
|
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
from huey.backends.redis_backend import RedisDataStore # ADD THIS LINE
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
result_store = RedisDataStore('results', host='localhost', port=6379) # ADDED
huey = Huey(queue, result_store=result_store) # ADDED result store |
这个时候,我们在ipython再次去尝试的时候,会发现可以获取到tasks.py里面的return值了 其实你在main.py里面获取的时候,他还是通过uuid从redis里面取出来的。
代码如下 |
|
>>> from main import count_beans
>>> res = count_beans(100)
>>> res # what is "res" ?
>>> res.get() # get the result of this task
'Counted 100 beans' |
huey也是支持celey的延迟执行和crontab的功能 。 这些功能很是重要,可以自定义的优先级或者不用再借助linux本身的crontab。
用法很简单,多加一个delay的时间就行了,看了下huey的源码,他默认是立马执行的。当然还是要看你的线程是否都是待执行的状态了。
代码如下 |
|
>>> import datetime
代码如下 |
|
>>> import datetime
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> res
>>> res.get() # this returns None, no data is ready
>>> res.get() # still no data...
>>> res.get(blocking=True) # ok, let's just block until its ready
'Counted 100 beans' |
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> res
>>> res.get() # this returns None, no data is ready
>>> res.get() # still no data...
>>> res.get(blocking=True) # ok, let's just block until its ready
'Counted 100 beans' |
代码如下 |
|
# tasks.py
from datetime import datetime
from config import huey
@huey.task(retries=3, retry_delay=10)
def try_thrice():
print 'trying....%s' % datetime.now()
raise Exception('nope') |
再来一个重试retry的介绍,huey也是有retry,这个很是实用的东西。 如果大家有看到我的上面文章关于celery重试机制的介绍,应该也能明白huey是个怎么个回事了。 是的,他其实也是在tasks里具体函数的前面做了装饰器,装饰器里面有个func try 异常重试的逻辑 。 大家懂的。
代码如下 |
|
# tasks.py
from datetime import datetime
代码如下 |
|
# count some beans
res = count_beans(10000000)
res.revoke()
The same applies to tasks that are scheduled in the future:
res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()
@huey.task(crontab(minute='*'))
def print_time():
print datetime.now() |
from config import huey
@huey.task(retries=3, retry_delay=10)
def try_thrice():
print 'trying....%s' % datetime.now()
raise Exception('nope')
|
huey是给你反悔的机会饿 ~ 也就是说,你做了deley的计划任务后,如果你又想取消,那好看,直接revoke就可以了。
代码如下 |
|
# count some beans
res = count_beans(10000000)
res.revoke()
The same applies to tasks that are scheduled in the future:
res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()
代码如下 |
|
from config import huey
from tasks import count_beans
if __name__ == '__main__':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans) |
@huey.task(crontab(minute='*'))
def print_time():
print datetime.now() |
task() - 透明的装饰器,让你的函数变得优美点。
periodic_task() - 这个是周期性的任务
crontab() - 启动worker的时候,附带的crontab的周期任务。
BaseQueue - 任务队列
BaseDataStore - 任务执行后,可以把 结果塞入进去。 BAseDataStore可以自己重写。
官方的huey的git库里面是提供了相关的测试代码的:
main.py
代码如下 |
|
from config import huey
from tasks import count_beans
if __name__ == '__main__':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans) |
任务.py
代码如下 |
|
随机导入
代码如下 |
|
import random
import time
from huey import crontab
from config import huey
@huey.task()
def count_beans(num):
print "start..."
print('-- counted %s beans --' % num)
time.sleep(3)
print "end..."
return 'Counted %s beans' % num
@huey.periodic_task(crontab(minute='*/5'))
def every_five_mins():
print('Consumer prints this every 5 mins')
@huey.task(retries=3, retry_delay=10)
def try_thrice():
if random.randint(1, 3) == 1:
print('OK')
else:
print('About to fail, will retry in 10 seconds')
raise Exception('Crap something went wrong')
@huey.task()
def slow(n):
time.sleep(n)
print('slept %s' % n) |
导入时间
从 Huey 导入 crontab
从配置导入huey
代码如下 |
|
#!/bin/bash
echo "HUEY CONSUMER"
echo "-------------"
echo "In another terminal, run 'python main.py'"
echo "Stop the consumer using Ctrl C"
PYTHONPATH=.:$PYTHONPATH
python ../../huey/bin/huey_consumer.py main.huey --threads=2
=> |
@huey.task()
def count_beans(num):
打印“开始...”
print('-- 计算了 %s 颗豆子 --' % num)
时间.睡眠(3)
代码如下 |
|
[xiaorui@devops /tmp ]$ git clone https://github.com/coleifer/huey.git
Cloning into 'huey'...
remote: Counting objects: 1423, done.
remote: Compressing objects: 100% (9/9), done.
Receiving objects: 34% (497/1423), 388.00 KiB | 29.00 KiB/s KiB/s
Receiving objects: 34% (498/1423), 628.00 KiB | 22.00 KiB/s
remote: Total 1423 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (1423/1423), 2.24 MiB | 29.00 KiB/s, done.
Resolving deltas: 100% (729/729), done.
Checking connectivity... done.
[xiaorui@devops /tmp ]$cd huey/examples/simple
[xiaorui@devops simple (master)]$ ll
total 40
-rw-r--r-- 1 xiaorui wheel 79B 9 8 08:49 README
-rw-r--r-- 1 xiaorui wheel 0B 9 8 08:49 __init__.py
-rw-r--r-- 1 xiaorui wheel 56B 9 8 08:49 config.py
-rwxr-xr-x 1 xiaorui wheel 227B 9 8 08:49 cons.sh
-rw-r--r-- 1 xiaorui wheel 205B 9 8 08:49 main.py
-rw-r--r-- 1 xiaorui wheel 607B 9 8 08:49 tasks.py
[xiaorui@devops simple (master)]$ |
打印“结束...”
return '计数 %s beans' % num
@huey.periodic_task(crontab(分钟='*/5'))
def every_ Five_mins():
print('消费者每 5 分钟打印一次')
@huey.task(重试=3, retry_delay=10)
def try_thrice():
if random.randint(1, 3) == 1:
打印('确定')
其他:
print('即将失败,10秒后重试')
引发异常('糟糕,出了问题')
@huey.task()
def 慢(n):
time.sleep(n)
print('睡了 %s' % n) |
表>
运行.sh
代码如下 |
|
#!/bin/bash
回声“休伊消费者”
回声“-------------”
echo "在另一个终端中,运行 'python main.py'"
echo "使用 Ctrl C 停止消费者"
PYTHONPATH=.:$PYTHONPATH
python ../../huey/bin/huey_consumer.py main.huey --threads=2
=> |
表>
我们可以先clone下huey的代码库。里面有一个examples例子目录,可以看到他是支持django的,但这不是重点!
代码如下 |
|
[xiaorui@devops /tmp ]$ git 克隆 https://github.com/coleifer/huey.git
克隆成“huey”...
远程:计数对象:1423,完成。
远程:压缩对象:100% (9/9),完成。
接收对象:34% (497/1423), 388.00 KiB | 29.00 KiB/秒 KiB/秒
接收对象:34% (498/1423), 628.00 KiB | 22.00 KiB/秒
远程:总计 1423(增量 0),重用 0(增量 0)
接收对象:100% (1423/1423),2.24 MiB | 29.00 KiB/s,完成。
解决增量:100% (729/729),完成。
检查连接...完成。
[xiaorui@devops /tmp ]$cdhuey/examples/simple
[xiaorui@devops 简单(大师)]$ ll
共 40 个
-rw-r--r-- 1 小锐轮 79B 9 8 08:49 自述文件
-rw-r--r-- 1 小睿轮 0B 9 8 08:49 __init__.py
-rw-r--r-- 1 小睿轮 56B 9 8 08:49 config.py
-rwxr-xr-x 1 小睿轮 227B 9 8 08:49 cons.sh
-rw-r--r-- 1 小睿轮 205B 9 8 08:49 main.py
-rw-r--r-- 1 小睿轮 607B 9 8 08:49tasks.py
[xiaorui@devops简单(主)]$ |
表>