做了一些在线search,find简单的“教程”使用命名pipe道。 但是,当我做任何与后台工作,我似乎失去了大量的数据。
[[编辑:find一个更简单的解决scheme,请参阅回复post。 所以我提出的问题现在是学术 – 如果有人可能想要一个工作服务器]]
使用Ubuntu 10.04与Linux 2.6.32-25-generic#45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU / Linux
GNU bash,版本4.1.5(1) – 发行版(x86_64-pc-linux-gnu)。
我的bashfunction是:
function jqs { pipe=/tmp/__job_control_manager__ trap "rm -f $pipe; exit" EXIT SIGKILL if [[ ! -p "$pipe" ]]; then mkfifo "$pipe" fi while true do if read txt <"$pipe" then echo "$(date +'%Y'): new text is [[$txt]]" if [[ "$txt" == 'quit' ]] then break fi fi done }
我在后台运行这个:
> jqs& [1] 5336
现在我喂它:
for i in 1 2 3 4 5 6 7 8 do (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &) done
输出不一致。 我经常没有得到所有成功的回声。 我得到的文字与成功回声一样多,有时甚至更less。
如果我从“feed”中删除“&”,它似乎工作,但是我被阻塞,直到读取输出。 因此,我想让子stream程被阻止,但不是主stream程。
其目的是编写一个简单的作业控制脚本,以便最多可以并行运行10个作业,并排队等待以后处理,但可靠地知道它们确实运行。
下面是全职工作经理:
function jq_manage { export __gn__="$1" pipe=/tmp/__job_control_manager_"$__gn__"__ trap "rm -f $pipe" EXIT trap "break" SIGKILL if [[ ! -p "$pipe" ]]; then mkfifo "$pipe" fi while true do date jobs if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__)) then echo "Waiting for new job" if read new_job <"$pipe" then echo "new job is [[$new_job]]" if [[ "$new_job" == 'quit' ]] then break fi echo "In group $__gn__, starting job $new_job" eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &" fi else sleep 3 fi done } function jq { # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs) # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently export __gn__="$1" shift export __jN__="$1" shift export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l) if (($__jq__ '<' 1)) then eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &" fi pipe=/tmp/__job_control_manager_"$__gn__"__ echo $@ >$pipe }
调用
jq <name> <max processes> <command> jq abc 2 sleep 20
将启动一个进程。 这部分工作正常。 开始第二个,很好。 手一个接一个似乎工作正常。 但是,从循环开始10似乎失去了系统,就像上面更简单的例子。
任何暗示,我可以做什么来解决IPC数据的这种明显的损失将不胜感激。
问候,阿兰。
你的问题是if
语句如下:
while true do if read txt <"$pipe" .... done
发生什么事是你的作业队列服务器每次在循环中打开和关闭管道。 这意味着有些客户在写入管道时会出现“断管”错误 – 也就是说,在编写者打开管道后,管道的读者会消失。
为了解决这个问题,在服务器中改变你的循环在整个循环中打开管道一次:
while true do if read txt .... done < "$pipe"
这样做,管道打开一次,保持开放。
你将需要小心你在循环中运行的内容,因为循环中的所有处理都会将stdin附加到命名管道。 你将要确保你重定向从其他地方的循环内的所有进程的标准输入,否则他们可能会消耗管道中的数据。
编辑:现在的问题是,当你最后一个客户端关闭管道时,你正在读取EOF,你可以使用jilles方法来复制文件描述符,或者你可以确保你也是一个客户端,并保持写端打开管道:
while true do if read txt .... done < "$pipe" 3> "$pipe"
这将保持管道的写入侧在fd 3上打开。与stdin一样,此文件描述符也适用于此警告。 您将需要关闭它,所以任何子进程不会继承它。 这可能比stdin更重要,但它会更干净。
正如在其他答案中所说的,你需要始终保持FIFO的开放,以避免数据丢失。
但是,一旦所有的作者在fifo被打开之后离开(所以有一个作者),读取立即返回(并且poll()
返回POLLHUP
)。 清除这个状态的唯一方法就是重新开放fifo。
POSIX不提供这样的解决方案,但至少Linux和FreeBSD是这样做的:如果读取失败,再次打开fifo,同时保持原始描述符打开。 这是有效的,因为在Linux和FreeBSD中,“挂断”状态对于特定的打开文件描述来说是本地的,而在POSIX中对于fifo是全局的。
这可以在这样的shell脚本中完成:
while :; do exec 3<tmp/testfifo exec 4<&- while read x; do echo "input: $x" done <&3 exec 4<&3 exec 3<&- done
像卡米和丹尼斯·威廉姆森说,不要打破管道。
现在我有更小的例子,直接在命令行上:
服务器:
( for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9}; do if read s; then echo ">>$i--$s//"; else echo "<<$i"; fi; done < tst-fifo )&
客户:
( for i in {%a,#b}{1,2}{0,1}; do echo "Test-$i" > tst-fifo; done )&
可以用下面的代码替换关键字:
(echo "Test-$i" > tst-fifo&);
发送到管道的所有客户端数据都被读取,但是在读取所有数据之前,可能需要多次启动服务器的选项2。
但是尽管读取等待数据在管道中开始,一旦数据被压入,永远读空串。
任何方法来阻止这个?
再次感谢您的任何见解。
只是对于那些可能感兴趣的人来说,下面是两个测试服务器脚本的新版本,它们是由camh和jilles评论的[[re-ed]]。
现在两个版本的工作正如所希望的一样。
camh的管道管理版本:
function jqs # Job queue manager { pipe=/tmp/__job_control_manager__ trap "rm -f $pipe; exit" EXIT TERM if [[ ! -p "$pipe" ]]; then mkfifo "$pipe" fi while true do if read -u 3 txt then echo "$(date +'%Y'): new text is [[$txt]]" if [[ "$txt" == 'quit' ]] then break else sleep 1 # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand fi fi done 3< "$pipe" 4> "$pipe" # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF }
jille的管道管理版本:
function jqs # Job queue manager { pipe=/tmp/__job_control_manager__ trap "rm -f $pipe; exit" EXIT TERM if [[ ! -p "$pipe" ]]; then mkfifo "$pipe" fi exec 3< "$pipe" exec 4<&- while true do if read -u 3 txt then echo "$(date +'%Y'): new text is [[$txt]]" if [[ "$txt" == 'quit' ]] then break else sleep 1 # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand fi else # Close the pipe and reconnect it so that the next read does not end up returning EOF exec 4<&3 exec 3<&- exec 3< "$pipe" exec 4<&- fi done }
感谢大家的帮助。
一方面,这个问题比我想的还要糟糕:现在,在我的更复杂的例子(jq_manage)中似乎有一个例子,即管道中一遍又一遍地读取相同的数据(即使没有写入新的数据到它)。
另一方面,我找到了一个简单的解决方案(编辑丹尼斯的评论):
function jqn # compute the number of jobs running in that group { __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l) } function jq { __groupn__="$1"; shift # job group name (the pool within which to allocate $__jmax__ jobs) __jmax__="$1"; shift # maximum of job numbers to run concurrently jqn while (($__jqty__ '>=' $__jmax__)) do sleep 1 jqn done eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; $@) &" }
奇迹般有效。 不涉及插座或管道。 简单。
最多并行运行10个作业,排队等待以后处理,但可靠地知道他们确实运行
你可以用GNU并行来做到这一点。 你将不需要这个脚本。
http://www.gnu.org/software/parallel/man.html#options
您可以设置max-procs“作业数量,最多并行运行N个作业”。 有一个选项可以设置要使用的CPU内核的数量。 您可以将已执行作业的列表保存到日志文件,但这是一个测试版功能。