使用Amazon SWF在服务器之间传递消息?
我正在努力研究如何将Boto和SWF结合使用来做到这一点。 我没有完成一些完整的代码,但是我所追求的是如果任何人都可以更多地解释涉及的内容。
正如你所看到的,我真的很困惑这一切,如果有人可以对此有所了解,我将不胜感激。
我想你会问一些非常好的问题,这些问题突出了SWF如何能够作为服务。 总之,你不要告诉你的服务器之间协调工作。 您的决策者在SWF服务的帮助下为您编排所有这些内容。
你的工作流程的执行过程如下:
将凭据提供给boto.swf的代码有很多种方法。 为了这个练习的目的,我建议在运行下面的代码之前将它们导出到环境中:
export AWS_ACCESS_KEY_ID=<your access key> export AWS_SECRET_ACCESS_KEY=<your secret key>
1)注册域,工作流和活动执行以下操作:
# ab_setup.py import boto.swf.layer2 as swf DOMAIN = 'stackoverflow' ACTIVITY1 = 'serverAActivity' ACTIVITY2 = 'serverBActivity' VERSION = '1.0' swf.Domain(name=DOMAIN).register() swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register() swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register() swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
2)实施和运行决策者和工作者。
# ab_decider.py import time import boto.swf.layer2 as swf DOMAIN = 'stackoverflow' ACTIVITY1 = 'serverAActivity' ACTIVITY2 = 'serverBActivity' VERSION = '1.0' class ABDecider(swf.Decider): domain = DOMAIN task_list = 'default_tasks' version = VERSION def run(self): history = self.poll() # Print history to familiarize yourself with its format. print history if 'events' in history: # Get a list of non-decision events to see what event came in last. workflow_events = [e for e in history['events'] if not e['eventType'].startswith('Decision')] decisions = swf.Layer1Decisions() # Record latest non-decision event. last_event = workflow_events[-1] last_event_type = last_event['eventType'] if last_event_type == 'WorkflowExecutionStarted': # At the start, get the worker to fetch the first assignment. decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()), ACTIVITY1, VERSION, task_list='a_tasks') elif last_event_type == 'ActivityTaskCompleted': # Take decision based on the name of activity that has just completed. # 1) Get activity's event id. last_event_attrs = last_event['activityTaskCompletedEventAttributes'] completed_activity_id = last_event_attrs['scheduledEventId'] - 1 # 2) Extract its name. activity_data = history['events'][completed_activity_id] activity_attrs = activity_data['activityTaskScheduledEventAttributes'] activity_name = activity_attrs['activityType']['name'] # 3) Optionally, get the result from the activity. result = last_event['activityTaskCompletedEventAttributes'].get('result') # Take the decision. if activity_name == ACTIVITY1: # Completed ACTIVITY1 just came in. Kick off ACTIVITY2. decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()), ACTIVITY2, VERSION, task_list='b_tasks', input=result) elif activity_name == ACTIVITY2: # server B completed activity. We're done. decisions.complete_workflow_execution() self.complete(decisions=decisions) return True
工作人员要简单得多,如果你不想要的话,你不需要使用继承。
# ab_worker.py import os import time import boto.swf.layer2 as swf DOMAIN = 'stackoverflow' ACTIVITY1 = 'serverAActivity' ACTIVITY2 = 'serverBActivity' VERSION = '1.0' class MyBaseWorker(swf.ActivityWorker): domain = DOMAIN version = VERSION task_list = None def run(self): activity_task = self.poll() print activity_task if 'activityId' in activity_task: # Get input. # Get the method for the requested activity. try: self.activity(activity_task.get('input')) except Exception, error: self.fail(reason=str(error)) raise error return True def activity(self, activity_input): raise NotImplementedError class WorkerA(MyBaseWorker): task_list = 'a_tasks' def activity(self, activity_input): result = str(time.time()) print 'worker a reporting time: %s' % result self.complete(result=result) class WorkerB(MyBaseWorker): task_list = 'b_tasks' def activity(self, activity_input): result = str(os.getpid()) print 'worker b returning pid: %s' % result self.complete(result=result)
3)运行你的决策者和工人。 您的决策者和工作人员可以从不同的主机或同一台机器上运行。 打开四个终端,运行你的演员:
首先你的决定
$ python -i ab_decider.py >>> while ABDecider().run(): pass ...
然后工人A,你可以从服务器A做到这一点:
$ python -i ab_workers.py >>> while WorkerA().run(): pass
然后工作者B,可能来自服务器B,但如果你从笔记本电脑全部运行它,它也将工作:
$ python -i ab_workers.py >>> while WorkerB().run(): pass ...
4)最后,启动工作流程。
$ python Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) [GCC 4.4.3] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import boto.swf.layer2 as swf >>> workflows = swf.Domain(name='stackoverflow').workflows() >>> workflows [<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>] >>> execution = workflows[0].start(task_list='default_tasks') >>>
切换回来看看你的演员发生了什么。 一分钟不活动之后,他们可能会断开服务。 如果发生这种情况,按箭头键+输入重新进入轮询循环。
您现在可以转到您的AWS管理控制台的SWF面板,查看执行过程如何执行并查看其历史记录。 或者,您可以通过命令行查询它。
>>> execution.history() [{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': '1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': {'startToCloseTimeout': '300', 'taskList': {'name': ...
这只是一个串行执行活动的工作流示例,但决策者也可以安排和协调活动的并行执行 。
我希望这至少能让你开始。 对于一个稍微复杂的连续工作流程示例,我建议您查看一下 。
我没有任何示例代码可以共享,但是您绝对可以使用SWF来协调跨两台服务器的脚本执行。 这个主要的想法是创建三个与SWF交谈的代码:
第一个组件决定器调用两个SWF API:PollForDecisionTask和RespondDecisionTaskCompleted。 轮询请求将为决策者组件提供执行工作流的当前历史记录,基本上是脚本运行者的“我在哪里”状态信息。 你编写看这些事件的代码,找出哪个脚本应该执行。 执行脚本的这些“命令”将以活动任务的调度的形式出现,该任务作为调用RespondDecisionTaskCompleted的一部分返回。
您编写的第二个组件,活动工作者,每个调用两个SWF API:PollForActivityTask和RespondActivityTaskCompleted。 轮询请求将为活动工作者指示它应该执行它所了解的脚本,SWF称为活动任务。 从轮询请求返回给SWF的信息可以包括作为活动任务计划的一部分发送给SWF的单个特定于执行的数据。 您的每个服务器都将独立轮询SWF以执行活动任务,以指示在该主机上执行本地脚本。 工作人员完成脚本的执行后,通过RespondActivityTaskCompleted API调用SWF。
从您的活动工作人员到SWF的回调将导致将新的历史记录发送给我已经提到的决策器组件。 它会查看历史记录,看到第一个脚本已经完成,并安排第二个脚本执行。 一旦看到第二个完成,就可以使用另一种类型的决定来“关闭”工作流程。
您可以通过调用StartWorkflowExecution API来启动在每个主机上执行脚本的整个过程。 这创建了SWF中整个过程的记录,并将第一个历史记录发送给决策者进程,以安排第一个主机上第一个脚本的执行。
希望这会提供更多关于如何使用SWF实现这种类型的工作流的上下文。 如果你还没有,我会看看SWF页面上的开发指南了解更多信息。
您可以使用SNS,当脚本A完成时,它应该触发SNS,并且会触发到服务器B的通知
好例子,
另外,如果您不想将您的凭证导出到环境中,则可以在您的课程中调用:
swf.set_default_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)