古之立大事者,不惟有超世之才,亦必有坚忍不拔之志。

在Python中用Celery安排管理后台工作流

Python admin 267℃ 0评论

前言

还好这次没来ios的,刚接触了下dubbo的分布实现,没想到就被推了一篇python的分布实现技术,分享给大家吧,顺便自己也了解下。原文如下:

Orchestrating a Background Job Workflow in Celery for Python

By RUSTEM KAMUN 

正文开篇

现代Web应用程序及其底层系统比以往任何时候都更快,更灵敏。然而,仍然有很多情况下,您希望将繁重的任务的执行转移到整个系统架构的其他部分,而不是在主线程上处理它们。识别这些任务简单到检查它们是否类似属于以下类别:

  • 定期任务 – 您将计划在特定时间或间隔后运行的任务,例如每月报告生成或每天运行两次的web scraper。
  • 第三方任务——web应用程序必须快速地为用户提供服务,而不需要等待其他的操作在页面加载时完成。例如发送电子邮件或通知或传播更新到内部工具(例如收集A/B测试或系统日志记录的数据)。
  • 长时间运行的作业——在资源中花费昂贵的作业,用户在其计算结果时需要等待。例如复杂的工作流执行(DAG工作流程),图形生成,类似于任务的Map-Reduce,以及媒体内容的服务(视频,音频)。

执行后台任务的一个简单的解决方案是在单独的线程或进程中运行它。Python是一种高级的图灵完备的编程语言,,不幸的是,它没有在与Erlang、Go、Java、Scala或Akka的规模匹配上提供内置的并发性。那些都是基于Tony Hoare的通信顺序进程 (CSP)。另一方面,Python线程是由全局解释器锁(GIL)协调和调度的,它可以防止多个本机线程同时执行Python的编译器。摆脱GIL是Python开发人员中一个很有争议的话题,但这并不是本文的重点。Python中的并发编程是过时的,然而依旧欢迎您在由同为Toptal的作者Marcus McCurdy编写的Python多线程教程中阅读有关它的内容。因此,设计过程之间的通信始终是一个容易出错的过程,并导致代码耦合和糟糕的系统可维护性,更不用说它对可扩展性的负面影响。此外,Python进程是操作系统(OS)下的一个常规进程,并且与整个Python标准库一样,它也是重量级的。随着应用程序中的进程数量的增加,从一个这样的进程切换到另一个进程变得非常耗时。

为了更好地理解Python的并发性,看看David Beazley在PyCon’15上的这篇令人难以置信的演讲。

更好的解决方案是为分布式队列或其著名的被称为发布-订阅(publish-subscribe)的兄弟模式。如图1所示,有两种类型的应用程序,其中一种称为发布者,它发送消息,另一种称为订阅者,接收消息。这两个代理之间没有直接交互,甚至彼此都不知道。发布者向中央队列或代理发送消息,订阅者从该代理接收感兴趣的消息。这种方法有两个主要的优点:

可扩展性——代理不需要在网络中了解彼此。他们以话题(topic)为中心。因此,这意味着每个都可以继续正常工作,而不考虑其他的异步方式。

松耦合-每个代理都表示系统的一部分(服务,模块)。由于它们是松散耦合的,因此每个都可以单独扩展到数据中心之外。

在Python中用Celery安排管理后台工作流
图1:发布-订阅模式

什么是Celery

Celery 是Python世界中最受欢迎的后台工作管理者之一。Celery与像RabbitMQ或Redis这样的消息代理兼容,可以同时充当生产者和消费者。

Celery是基于分布式消息传递的异步任务队列/作业队列。它专注于实时操作,但也支持调度。执行单元,称为任务,在一个或多个使用多处理、Eventletgevent的工作服务器上并发执行。任务可以异步执行(在后台)或同步执行(等待准备就绪)。

– Celery Project

要开始使用Celery,只需按照官方文档中的指南一步步进行即可。本文的重点是让您很好地了解哪些用例可以被Celery涉及。在本文中,我们不仅将展示一些有趣的示例,还将尝试学习如何将Celery应用于实际的任务,如后台邮件、报告生成、日志记录和错误报告。我将分享我的基于超仿真的测试任务,最后,我将提供一些在官方文档中没有(很好)文档化的技巧,这些技巧花费了我数小时的研究来发现。

如果你以前没有使用Celery的经验,我建议你先按官方教程学完后再尝试。

吊一下胃口

如果这篇文章激起您的兴趣,并让您想立即投入代码,那么请遵循本文中使用的代码的GitHub库。这里的README文件将为您提供比较粗糙的方法来运行和使用示例应用程序。

使用Celery第一步

对于初学者,我们将通过一系列实际的例子,向读者展示简单而优雅的Celery如何解决看似不平凡的任务。所有示例将在Django框架内呈现; 然而,他们中的大多数可以很容易地移植到其他Python框架(Flask,Pyramid)。

项目布局是由Cookiecutter Django产生的;然而,我只保留了一些依赖项,在我看来,这些依赖关系促进了这些用例的开发和准备。我还删除了这篇文章和应用程序的不必要的模块,以减少噪音,使代码更容易理解。

  • celery_uncovered/{toyex,tricks,advex}包含我们将在本文中介绍的不同应用程序。每个应用程序都包含一系列由Celery理解所要求的级别组织的示例。
  • celery_uncovered/celery.py 定义了一个Celery实例。

文件: celery_uncovered/celery.py:

然后我们需要确保Celery将与Django一起开始。因此,我们导入该应用程序celery_uncovered/__init__.py

文件: celery_uncovered/__init__.py:

config/settings是我们的应用程序和Celery的配置源。根据执行环境,Django将启动相应的设置:local.py用于开发或test.py用于测试。如果你想的话,也可以通过创建一个新的python模块(例如 prod.py)定义自己的环境。Celery配置为前缀CELERY_。对于这篇文章,我配置了RabbitMQ作为代理和SQLite作为结果bac-end。

文件: config/local.py:

情景1 – 报告生成和导出

我们将介绍的第一个案例是报告生成和导出。在本例中,您将学习如何定义一个生成CSV报告的任务,并通过celerybeat定期调度。

用例说明:从GitHub按所选时间段(日,周,月)获取五百个最热的存储库,按主题(topics)分组,并将结果导出到CSV文件。

如果我们提供了一个HTTP服务,该服务将通过单击“生成报告”按钮触发该功能,那么应用程序将在发送HTTP响应之前停止并等待任务完成。这是不好的。我们希望我们的web应用程序是快速的,我们不希望当我们的后端计算结果时让我们的用户等待。与其等待结果生成,不如将任务通过Celery 中的注册队列排队,并将 task_id响应到前端。然后,前端将使用task_id以异步方式(例如AJAX)查询任务结果,并将保持用户对任务进度的更新。最后,当进程完成时,结果可以作为文件通过HTTP下载。

实现细节

首先,让我们将流程分解成最小的单位并创建管道:

1.Fetchers是负责从GitHub服务中获取存储库的workers 。

2.Aggregator 是负责将结果合并到一个列表中的workers 。

3.Importer 是在GitHub上生成最热门存储库的CSV报告的workers 。

在Python中用Celery安排管理后台工作流
 图2:Celery和Python的workers的流程图

获取存储库是使用GitHub Search API GET /search/repositories.的HTTP请求。然而,GitHub API服务有一个限制应该被处理:API每个请求最多返回100个存储库,而不是500个。我们可以同时发送5个请求,但是我们不想让用户等待5个单独的请求,因为该HTTP请求是一个I/O操作。相反,我们可以使用适当的页面参数来执行5个并发的HTTP请求。所以页面的范围是[1..5]。让我们在toyex/tasks.py module中定义一个名为fetch_hot_repos/3 -> list的任务:

 文件: celery_uncovered/toyex/local.py

因此,fetchhotrepos创建了一个对GitHub API的请求,并使用一个存储库列表对用户进行响应。它接收三个参数来定义我们的请求有效载荷:

  • since – 创建日期过滤存储库。(即从since日期起创建的库)
  • per_page – 每个请求返回的结果数(限制为100)。
  • page – 请求的页码(范围[1..5])。

注意:为了使用GitHub Search API,您需要一个OAuth令牌才能通过身份验证。在我们的例子中,它保存在设置中的GITHUB_OAUTH处。

接下来,我们需要定义一个主任务,负责汇总结果并将其导出为CSV文件: produce_hot_repo_report_task/2->filepath:

文件: celery_uncovered/toyex/local.py

此任务用于celery.canvas.group执行五个并发调用fetch_hot_repos/3。这些结果等待然后简化到一个存储库对象列表。然后我们的结果集按主题(topic )分组,最后导出到MEDIA_ROOT/目录下的生成的CSV文件中。

为了定期安排任务,您可能需要在配置文件中的计划列表中添加一个条目:

文件: config/local.py

试试看

为了启动和测试任务如何工作,首先我们需要启动Celery进程:

接下来,我们需要创建celery_uncovered/media/目录。然后,您将可以通过Shell或Celerybeat测试其功能:

Shell:

Celerybeat:

您可以在MEDIA_ROOT/目录下观看结果。

情景2 – 通过电子邮件报告服务器500错误

Celery最常见的用例之一是发送电子邮件通知。电子邮件通知是使用本地SMTP服务器或第三方SES的脱机I / O绑定操作。有许多用例涉及发送电子邮件,并且对于大多数用户,在接收到HTTP响应之前,用户不需要等待此过程完成。这就是为什么在后台执行这样的任务并立即响应用户的原因。

用例描述:通过Celery向管理员发送的50X错误报告。

Python和Django有必要的系统日志记录。我不会详细介绍Python的日志记录是如何工作的。但是,如果您以前从未尝试过,或者需要复习一下,请阅读内置的日志模块的文档。您肯定希望在生产环境中这样。Django有一个专门的日志程序处理程序,名为AdminEmailHandler,它为每一个日志信息发送电子邮件。

实现细节

其主要思想是扩展AdminEmailHandler类的send_mail的方法,使得它可以通过Celery发送邮件的方式。这可以如下图(图3:使用Celery和Python处理管理电子邮件)所示完成:

在Python中用Celery安排管理后台工作流

首先,我们需要创建一个叫 report_error_task 的任务,该任务使用所提供的subject和message调用mail_admins:

文件: celery_uncovered/toyex/tasks.py

接下来,我们实际上扩展了AdminEmailHandler,以便内部调用定义的Celery任务:

文件: celery_uncovered/toyex/admin_email.py

最后,我们需要设置日志记录。在Django中进行日志的配置非常简单。您需要的是覆盖 LOGGING,以便日志引擎开始使用新定义的处理程序:

文件 config/settings/local.py

请注意,我故意设置了处理程序过滤器 require_debug_true ,以便在应用程序在调试模式下运行时测试此功能。

试试看

为了测试它,我准备了一个Django视图,它在t localhost:8000/report-error中为 “division-by-zero” operation提供服务。您还需要启动MailHog Docker容器来测试邮件实是否真的发送了。

额外的细节

作为一个邮件测试工具,我设置了MailHog并配置了Django邮件,将其用于SMTP传送。有很多方法来部署和运行 MailHog。我决定和Docker container一起。您可以在相应的README文件中找到详细信息:

文件: docker/mailhog/README.md

要配置您的应用程序使用MailHog,您需要在您的配置中添加以下行:

文件: config/settings/local.py

默认的Celery任务之外

可以通过任何可调用函数创建Celery任务。默认情况下,任何用户自定义的Task都将被作为父类(抽象)的celery.app.task.Task 注入。这个类包含异步运行任务的功能(通过网络传递给一个Celery worker),或者同步(用于测试目的),创建签名和许多其他实用工具。在下一个示例中,我们将尝试扩展 Celery.app.task.Task。然后将它用作基类,以便为我们的任务添加一些有用的行为。

情景3 – 每个任务的文件记录

在我的一个项目中,我开发了一个应用程序,它为终端用户提供了一个提取、转换、加载(ETL)的工具,这个工具能够接收并过滤大量的分层数据。后端被分为两个模块:

  • 用Celery协调数据处理流水线
  • 用Go进行数据处理

芹菜部署了一个Celerybeat实例和40多个workers。有二十多个不同的任务组成了管道和编排活动。每个这样的任务可能会在某些时候失败。所有这些故障都被转储到每个workers的系统日志中。在某些时候,它开始变得不方便调试和维护Celery 层。最终,我们决定将任务日志隔离到任务特定的文件中。

用例说明:扩展Celery ,以便每个任务将其标准输出和错误记录到文件中。

Celery为Python应用程序提供了强大的控制,可以控制它在内部的工作。它附有一个熟悉的信号框架。使用Celery的应用程序可以订阅其中的一些,以增强某些操作的行为。我们将利用任务级别的信号,对各个任务生命周期进行详细跟踪。Celery总是带有日志记录的后端,我们将从中受益,而在一些地方只有略微覆盖重写,才能实现我们的目标。

实现细节

Celery已经支持每项任务的日志记录。为了保存到文件,需要将日志输出发送到适当的位置。在我们的例子中,任务的正确位置是一个务名称同名的文件。在Celery实例中,我们将使用动态推断的日志处理程序来覆盖内置的日志配置。可以订阅celeryd_after_setup 信号,然后在那里配置系统日志:

文件: celery_uncovered/toyex/celery_conf.py

请注意,对于在Celery应用程序中注册的每个任务,我们正在使用它的处理程序构建相应的日志记录器。每个handler 都是logging.FileHandler类型,因此每个这样的实例都会接收一个文件名作为输入。你所需要做的就是将这个模块导入到 celery_uncovered/celery.py 文件末尾:

可以通过调用get_task_logger(task_name)来接收特定的task logger。为了推广每个任务的这种行为,有必要用一些实用方法稍微扩展celery.current_app.Task

文件: celery_uncovered/tricks/celery_ext.py

现在,在调用task.log_msg("Hello, my name is: %s", task.request.id)的情况下,日志输出将被路由到任务名称下的相应文件。

试试看

为了启动和测试这个任务如何工作,首先启动Celery进程:

然后您将能够通过Shell测试功能:

最后,要查看结果,导航到celery_uncovered/logs目录并打开相应名为的celery_uncovered.tricks.tasks.add.log的日志文件。运行此任务多次后可能会看到类似的内容:

情景4 – 范围感知任务

让我们假设一个基于Celery和Django的面向国际用户的Python应用程序。用户可以设置他们使用您的应用程序的语言(语言环境)。

您必须设计一个多语言,区域设置的电子邮件通知系统。为了发送电子邮件通知,您已注册了由特定队列处理的特殊Celery任务。此任务将接收一些关键参数作为输入和当前用户区域设置,以便电子邮件将以用户选择的语言发送。

现在想象我们有很多这样的任务,但是这些任务中的每一个都接受一个locale参数。在这种情况下,在更高层次的抽象上解决它不是更好吗?在这里,我们看到如何做到这一点。

实现细节

同样,正如我们在任务日志记录中所做的那样,我们希望扩展一个基本任务类celery.current_app.Task,并覆盖一些负责调用任务的方法。为了演示,我将重写 celery.current_app.Task::apply_async 模块。这个模块有额外的任务,它将帮助您生成一个完全功能的替换。

文件: celery_uncovered/tricks/celery_ext.py

关键的线索是,在默认情况下,将当前的语言环境作为键值参数传递给调用任务。如果一个任务以某个地区作为参数调用,那么它就没有变化。

试试看

为了测试这个功能,我们来定义一个ScopeBasedTask类型的虚拟任务。它根据区域设置查找文件,并将其内容读为JSON:

文件: celery_uncovered/tricks/tasks.py

现在您需要做的是重复启动Celery,启动shell以及在不同方案下测试此任务的执行步骤。装置位于celery_uncovered/tricks/fixtures/locales/目录下。

结语

这篇文章旨在从不同的角度探索Celery。我在传统的例子中展示了Celery,例如邮件和报告生成,以及一些有趣的小众商业用例的共享技巧。Celery是建立在数据驱动的哲学基础上的,你的团队可以把它作为系统堆栈的一部分来简化他们的生活。如果您有基本的Python经验,开发基于Celery-based的服务并不是很复杂,您应该能够很快地找到它。默认配置对于大多数应用来说都是足够好的,但是如果需要的话,它们可以非常灵活。

我们的团队选择使用芹菜作为后台作业和长时间运行的任务的后端。我们广泛地使用它来做各种各样的用例,在这篇文章中只提到了几个。我们每天摄取和分析千兆字节的数据,但这只是水平扩展技术的开始。

了解基础知识

什么是Pub-Sub?

发布订阅(或生产者 – 消费者)模式是计算机系统中的分布式消息传递模式,其中发布者通过消息代理广播消息,并且订阅者监听消息。两者都可以是系统的隔离组件,既不知道也不与其他组件直接通信。

什么是Celery for Python?

芹菜是Python世界中最受欢迎的后台工作经理之一。Celery与几个消息经纪人(如RabbitMQ或Redis)兼容,可以兼顾生产者和消费者。

附录

扩展资料

图灵完备(Turing complete )–百度百科

转自 https://windcoder.com/zaipythonzhongyongceleryanpaiguanlihoutaigongzuoliu

转载请注明:北凉柿子 » 在Python中用Celery安排管理后台工作流

喜欢 (0)or分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址