前言
还好这次没来ios的,刚接触了下dubbo的分布实现,没想到就被推了一篇python的分布实现技术,分享给大家吧,顺便自己也了解下。原文如下:
Orchestrating a Background Job Workflow in Celery for Python
By RUSTEM KAMUN
正文开篇
现代Web应用程序及其底层系统比以往任何时候都更快,更灵敏。然而,仍然有很多情况下,您希望将繁重的任务的执行转移到整个系统架构的其他部分,而不是在主线程上处理它们。识别这些任务简单到检查它们是否类似属于以下类别:
- 定期任务 – 您将计划在特定时间或间隔后运行的任务,例如每月报告生成或每天运行两次的web scraper。
- 第三方任务——web应用程序必须快速地为用户提供服务,而不需要等待其他的操作在页面加载时完成。例如发送电子邮件或通知或传播更新到内部工具(例如收集A/B测试或系统日志记录的数据)。
- 长时间运行的作业——在资源中花费昂贵的作业,用户在其计算结果时需要等待。例如复杂的工作流执行(DAG工作流程),图形生成,类似于任务的Map-Reduce,以及媒体内容的服务(视频,音频)。
执行后台任务的一个简单的解决方案是在单独的线程或进程中运行它。Python是一种高级的图灵完备的编程语言,,不幸的是,它没有在与Erlang、Go、Java、Scala或Akka的规模匹配上提供内置的并发性。那些都是基于Tony Hoare的通信顺序进程 (CSP)。另一方面,Python线程是由全局解释器锁(GIL)协调和调度的,它可以防止多个本机线程同时执行Python的编译器。摆脱GIL是Python开发人员中一个很有争议的话题,但这并不是本文的重点。Python中的并发编程是过时的,然而依旧欢迎您在由同为Toptal的作者Marcus McCurdy编写的Python多线程教程中阅读有关它的内容。因此,设计过程之间的通信始终是一个容易出错的过程,并导致代码耦合和糟糕的系统可维护性,更不用说它对可扩展性的负面影响。此外,Python进程是操作系统(OS)下的一个常规进程,并且与整个Python标准库一样,它也是重量级的。随着应用程序中的进程数量的增加,从一个这样的进程切换到另一个进程变得非常耗时。
为了更好地理解Python的并发性,看看David Beazley在PyCon’15上的这篇令人难以置信的演讲。
更好的解决方案是为分布式队列或其著名的被称为发布-订阅(publish-subscribe)的兄弟模式。如图1所示,有两种类型的应用程序,其中一种称为发布者,它发送消息,另一种称为订阅者,接收消息。这两个代理之间没有直接交互,甚至彼此都不知道。发布者向中央队列或代理发送消息,订阅者从该代理接收感兴趣的消息。这种方法有两个主要的优点:
可扩展性——代理不需要在网络中了解彼此。他们以话题(topic)为中心。因此,这意味着每个都可以继续正常工作,而不考虑其他的异步方式。
松耦合-每个代理都表示系统的一部分(服务,模块)。由于它们是松散耦合的,因此每个都可以单独扩展到数据中心之外。

什么是Celery
Celery 是Python世界中最受欢迎的后台工作管理者之一。Celery与像RabbitMQ或Redis这样的消息代理兼容,可以同时充当生产者和消费者。
Celery是基于分布式消息传递的异步任务队列/作业队列。它专注于实时操作,但也支持调度。执行单元,称为任务,在一个或多个使用多处理、Eventlet或gevent的工作服务器上并发执行。任务可以异步执行(在后台)或同步执行(等待准备就绪)。
要开始使用Celery,只需按照官方文档中的指南一步步进行即可。本文的重点是让您很好地了解哪些用例可以被Celery涉及。在本文中,我们不仅将展示一些有趣的示例,还将尝试学习如何将Celery应用于实际的任务,如后台邮件、报告生成、日志记录和错误报告。我将分享我的基于超仿真的测试任务,最后,我将提供一些在官方文档中没有(很好)文档化的技巧,这些技巧花费了我数小时的研究来发现。
如果你以前没有使用Celery的经验,我建议你先按官方教程学完后再尝试。
吊一下胃口
如果这篇文章激起您的兴趣,并让您想立即投入代码,那么请遵循本文中使用的代码的GitHub库。这里的README文件将为您提供比较粗糙的方法来运行和使用示例应用程序。
使用Celery第一步
对于初学者,我们将通过一系列实际的例子,向读者展示简单而优雅的Celery如何解决看似不平凡的任务。所有示例将在Django框架内呈现; 然而,他们中的大多数可以很容易地移植到其他Python框架(Flask,Pyramid)。
项目布局是由Cookiecutter Django产生的;然而,我只保留了一些依赖项,在我看来,这些依赖关系促进了这些用例的开发和准备。我还删除了这篇文章和应用程序的不必要的模块,以减少噪音,使代码更容易理解。
1 2 3 4 5 6 7 |
- celery_uncovered/ - celery_uncovered/__init__.py - celery_uncovered/{toyex,tricks,advex} - celery_uncovered/celery.py - config/settings/{base,local,test}.py - config/urls.py - manage.py |
celery_uncovered/{toyex,tricks,advex}
包含我们将在本文中介绍的不同应用程序。每个应用程序都包含一系列由Celery理解所要求的级别组织的示例。celery_uncovered/celery.py
定义了一个Celery实例。
文件: celery_uncovered/celery.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from __future__ import absolute_import import os from celery import Celery, signals # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') app = Celery('celery_uncovered') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() |
然后我们需要确保Celery将与Django一起开始。因此,我们导入该应用程序celery_uncovered/__init__.py
。
文件: celery_uncovered/__init__.py
:
1 2 3 4 5 6 7 8 9 10 |
from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa __all__ = ['celery_app'] __version__ = '0.0.1' __version_info__ = tuple([int(num) if num.isdigit() else num for num in __version__.replace('-', '.', 1).split('.')]) |
config/settings
是我们的应用程序和Celery的配置源。根据执行环境,Django将启动相应的设置:local.py
用于开发或test.py
用于测试。如果你想的话,也可以通过创建一个新的python模块(例如 prod.py
)定义自己的环境。Celery配置为前缀CELERY_
。对于这篇文章,我配置了RabbitMQ作为代理和SQLite作为结果bac-end。
文件: config/local.py
:
1 2 |
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='amqp://guest:guest@localhost:5672//') CELERY_RESULT_BACKEND = 'django-db+sqlite:///results.sqlite' |
情景1 – 报告生成和导出
我们将介绍的第一个案例是报告生成和导出。在本例中,您将学习如何定义一个生成CSV报告的任务,并通过celerybeat定期调度。
用例说明:从GitHub按所选时间段(日,周,月)获取五百个最热的存储库,按主题(topics)分组,并将结果导出到CSV文件。
如果我们提供了一个HTTP服务,该服务将通过单击“生成报告”按钮触发该功能,那么应用程序将在发送HTTP响应之前停止并等待任务完成。这是不好的。我们希望我们的web应用程序是快速的,我们不希望当我们的后端计算结果时让我们的用户等待。与其等待结果生成,不如将任务通过Celery 中的注册队列排队,并将 task_id
响应到前端。然后,前端将使用task_id
以异步方式(例如AJAX)查询任务结果,并将保持用户对任务进度的更新。最后,当进程完成时,结果可以作为文件通过HTTP下载。
实现细节
首先,让我们将流程分解成最小的单位并创建管道:
1.Fetchers是负责从GitHub服务中获取存储库的workers 。
2.Aggregator 是负责将结果合并到一个列表中的workers 。
3.Importer 是在GitHub上生成最热门存储库的CSV报告的workers 。

获取存储库是使用GitHub Search API GET /search/repositories
.的HTTP请求。然而,GitHub API服务有一个限制应该被处理:API每个请求最多返回100个存储库,而不是500个。我们可以同时发送5个请求,但是我们不想让用户等待5个单独的请求,因为该HTTP请求是一个I/O操作。相反,我们可以使用适当的页面参数来执行5个并发的HTTP请求。所以页面的范围是[1..5]。让我们在toyex/tasks.py
module中定义一个名为fetch_hot_repos/3 -> list
的任务:
文件: celery_uncovered/toyex/local.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@shared_task def fetch_hot_repos(since, per_page, page): payload = { 'sort': 'stars', 'order': 'desc', 'q': 'created:>={date}'.format(date=since), 'per_page': per_page, 'page': page, 'access_token': settings.GITHUB_OAUTH} headers = {'Accept': 'application/vnd.github.v3+json'} connect_timeout, read_timeout = 5.0, 30.0 r = requests.get( 'https://api.github.com/search/repositories', params=payload, headers=headers, timeout=(connect_timeout, read_timeout)) items = r.json()[u'items'] return items |
因此,fetchhotrepos创建了一个对GitHub API的请求,并使用一个存储库列表对用户进行响应。它接收三个参数来定义我们的请求有效载荷:
since
– 创建日期过滤存储库。(即从since日期起创建的库)per_page
– 每个请求返回的结果数(限制为100)。page
– 请求的页码(范围[1..5])。
注意:为了使用GitHub Search API,您需要一个OAuth令牌才能通过身份验证。在我们的例子中,它保存在设置中的GITHUB_OAUTH
处。
接下来,我们需要定义一个主任务,负责汇总结果并将其导出为CSV文件: produce_hot_repo_report_task/2->filepath:
文件: celery_uncovered/toyex/local.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
@shared_task def produce_hot_repo_report(period, ref_date=None): # 1. parse date ref_date_str = strf_date(period, ref_date=ref_date) # 2. fetch and join fetch_jobs = group([ fetch_hot_repos.s(ref_date_str, 100, 1), fetch_hot_repos.s(ref_date_str, 100, 2), fetch_hot_repos.s(ref_date_str, 100, 3), fetch_hot_repos.s(ref_date_str, 100, 4), fetch_hot_repos.s(ref_date_str, 100, 5) |