我是新来的消息队列,现在我在我的Linux服务器上使用ZeroMQ
。 我正在使用PHP
编写客户端和服务器。 这主要用于处理推送通知。
我在单个I / O线程的ZMQContext
实例上使用基本的REQ
– REP
forms通信模式,就像他们已经certificate的那样。
这是最小化的zeromqServer.php
代码:
include("someFile.php"); $context = new ZMQContext(1); // Socket to talk to clients $responder = new ZMQSocket($context, ZMQ::SOCKET_REP); $responder->bind("tcp://*:5555"); while (true) { $request = $responder->recv(); printf ("Received request: [%s]\n", $request); // ----------------------------------------------------------------- // Process push notifications here // sleep (1); // ----------------------------------------------------------------- // Send reply back to client $responder->send("Basic Reply"); }
这里是一个最小化的ZeroMQ
客户端 :
$context = new ZMQContext(); // Socket to talk to server echo "Connecting to hello world server…\n"; $requester = new ZMQSocket($context, ZMQ::SOCKET_REQ); $check = $requester->connect("tcp://localhost:5555"); var_dump($check); $requester->send("json string payload with data required to process push notifications."); //$reply = $requester->recv();
那么,我做了什么? 我使用linux命令运行zeromqServer.php
作为后台服务
nohup php zeromqServer.php &
这作为后台进程运行。 现在,当客户调用它时,它完成了所需的工作。
但是问题在于,每当任何文件发生更改时(包括在zeromqServer
文件中包含的文件),都需要重新启动进程。
而且,2-3天之后,它会停止工作。 这个过程不会停止,但它只是停止工作。
我觉得它一定是一些套接字的问题,也许套接字不再是开放的。 那时我必须重新启动zeromqServer.php
文件进程。
Q1:
可能是什么问题?
Q2:
做这件事的正确方法是什么?
A1:问题是你的服务器终于必须阻塞,因为客户端代码在REQ/REP
-pattern需要的时候没有检索到任何答案。 REP
端的zeromqserver.php
将不会尝试从关联客户端(在正式通信模式的REQ
端recv()
另一个消息,直到客户端被物理传送(进入内部缓冲区)并且recv()
来自zeromqserver.php
端的“回复”消息。
早期版本的ZeroMQ,ver。 2.1等,在将数据复制到O / S内核之前,曾经有一个节点的内部消息队列和内存管理的无限的,无限的,默认的限制大小,用于低级的I / O线程缓冲资源并从ZeroMQ内存占用释放。
新版本的ZeroMQ版本3.x +在缺省情况下只有1000个消息“short”,所以叫做HWM -s(又名High-Water-Mark-s),在这之后,相应的这种ZeroMQ资源开始阻止或放弃消息。
虽然显式增加HWM设置管理的反应性尝试看起来像是解决主要设计错误的一种肮脏的方法,但另一个ZeroMQ警告在这方面是公平的,因为要进一步小心这个方向(ØMQ并不保证套接字将接受很多是ZMQ_SNDHWM
消息,根据套接字上的消息流量, 实际的限制可能会 低 60-70%)。
忘记或无法做到这一点( 参考: OP代码已经显示):
//$reply = $requester->recv();
意味着你的REQ/REP
正式沟通模式“钟摆”变得不可逆转地僵持(永远)。
A2:基本REQ/REP
形式化沟通模式听起来很直接,但有一些危险的特征,观察到的阻塞只是其中之一。 一些额外的步骤可能会采取代码方式,部署XREQ/XREP
, DEALER/ROUTER
和其他工具,但设计应该修改,而不仅仅是由SLOC
,因为似乎有很多事情要实现,在设计代码之前。 其中一个主要的错误是假定一个send()
方法已经被排序后send()
一个消息。 在ZeroMQ中不是这种情况。
另外,代码设计应该假设消息的不确定性,并且正确处理丢失的消息问题和任何类型的分布式服务阻塞事件(死锁,活锁,缓冲区阈值溢出,旧/新API冲突,因为那里没有明确的保证任何消息对等体(在ZeroMQ中没有中央消息代理)在本地主机端实现了与ZeroMQ API /协议完全相同的版本 – 所以确实有很多新的观点,代码设计)
如果你能相信并相信一个实际操作的经验,那么你最好的下一步应该是下载并阅读Pieter HINTJENS的书“Code Connected,Volume 1” ,Pieter对分布式处理有很多见解,包括许多关于可靠模式的提示和方向,因为您会喜欢实现。
阅读这本书,这是值得你花时间的,如果你留在分布式软件设计中,你很可能会多次重读这本书,所以不要犹豫,现在就跳到这个硕士硕士的400多页的食谱,Pieter HINTJENS出的问题是。
只是从上述图60中的一幅图中,忘记了个体原型的重用,并认识到需要一个适当的端到端分布式系统设计的观点,包括阻塞避免和死锁解决策略:
只是有一些想法,看看下面的代码示例,从一个简单的分布式消息,其中一个aMiniRESPONDER
进程使用多个ZeroMQ渠道。
学习如何防止(设计方面)和处理(deux-ex-machina型)其他碰撞。
PHP自己拥有所有适合这种类型算法的语法构造函数,但是从开始到结束,架构和设计都掌握在你的手中。
只是要意识到,ZeroMQ信令架构设置/系统部分/ ZeroMQ优雅终止努力的碰撞感知{ try:, except:, finally: }
风格有[SoW]
,请按照行号检查[SoW]
:
14544 - 14800 // a safe infrastructure setup on aMiniRESPONDER side ~ 256 SLOCs 15294 - 15405 // a safe infrastructure graceful termination ~ 110 SLOCs
相比之下, aMiniRESPONDER
例子的事件处理部分的核心逻辑
14802 - 15293 // aMiniRESPONDER logic, incl. EXC-HANDLERs ~ 491 SLOCs
苛刻? 是的,但是非常强大,可扩展,快速,而且在正确使用方面的确值得。 不要犹豫,花时间和精力去获取和管理你在这个领域的知识。 所有的进一步的软件项目可能只是从这个专业投资中受益。
我只能部分回答这个问题。 我不知道为什么这个过程会在2-3天后挂起。
总的来说,PHP脚本在脚本执行时加载一次,似乎没有任何解决方法。 但是,您现在的代码可以重写如下:
someFile.php
$context = new ZMQContext(1); // Socket to talk to clients $responder = new ZMQSocket($context, ZMQ::SOCKET_REP); $responder->bind("tcp://*:5555"); while (true) { $request = $responder->recv(); printf ("Received request: [%s]\n", $request); $subscriptParams = [ "Request" => $request //Add more parameters here as needed ]; $result = shell_exec("php work.php ".base64_encode(serialize($subscriptParams))); // Send reply back to client $responder->send("Basic Reply"); }
work.php
if (!isset($argv[0])) { die("Need an argument"); } $params = unserialize(base64_decode($argv[0])); //Validate parameters $request = $params["Request"]; // Do some 'work' //Process push notifications here! sleep (1);
这是我的假设:
脚本的设置部分,例如设置$context
和$responder
将永远保持不变(或者因为更改而重启脚本所需的停机时间)。
循环的开始和结束保持不变,我的想法是shell_exec
将返回响应者可以使用的响应作为实际响应。
更多的澄清:
我正在使用serialize
来传递一个work.php
将需要使用的参数数组。 我正在做一个base64
编码解码,因为我想确保整个参数适合$argv[0]
而不是分裂在参数中找到的潜在空间。
相同的serialize -> base64_encode
和base64_decode -> deserialize
组合可以用于work.php
的结果。
请注意,我没有亲自尝试过这个代码,所以我不能保证它的工作。 我只是不明白为什么它不应该工作(确保php
是在你的路径或调用shell_exec
/usr/bin/php
中,如果不是)。
值得注意的是,这个解决方案将比在一个文件中包含所有代码慢得多,但这是在每次迭代时刷新脚本的代价。
Q2:做这件事的正确方法是什么?
在我看来,做任何事情的正确方法是在开始时选择适合您的最佳选择,以免花费时间来获得二级成绩。 我没有反对ZeroMQ
尽管我遵循的逻辑,程序员应该总是努力做最干净的代码,并使用最好的工具。 在使用PHP创建消息队列的情况下,使用Pheanstalk
更多成功: https : //github.com/pda/pheanstalk
它是一个强烈推荐的在线开源选项,在linux上完美工作。 安装队列非常简单,我写了一个关于如何在以下主题中安装pheanst的完整答案: 无法获取Beanstalkd队列以用于PHP
Pheanstalk使用beanstalkd库,这是非常轻量级和高效率。 要按照您的问题建议消息队列,可以使用两个简单的PHP脚本来完成:
消息制作者:
<?php $pheanstalk = new Pheanstalk('127.0.0.1:11300'); $pheanstalk ->useTube("my_queue") ->put("Hello World"); ?>
工作者脚本:
<?php if ($job = $pheanstalk ->watch('testtube') ->ignore('default') ->reserve())//retreives the job if there is one in the queue { echo $job->getData();//echos the message $pheanstalk->delete($job);//deletes the job from the queue } } ?>
消息生产者将被包含在用户创建消息的页面中,并且将被发送到由beanstalkd生成的队列。 工人脚本可以用不同的方式来设计。 您可以将它放置在一个while循环中,以便每秒搜索一个新的队列,甚至可以让多个工作人员搜索队列。 Pheanstalk是非常有效的,强烈推荐。