用Bash中的fifo队列inotifywait

我写了一个使用inotify-tools和inotify接口的小Bash脚本。 我的问题是,这个函数中的一个命令可以阻止执行,直到完成。 这样的function卡住了。

为了解决这个问题,我想排队检测到的文件(通过closures事件),并从另一个函数读取队列。 有没有人有一个线索如何在Bash做到这一点?

下面的variables是查找目录或分配文件名的简单string。

inotifywait -mrq -e close --format %w%f /some/dir/ | while read FILE do NAME=$(echo $CAP)_"`date +"%F-%H-%M-%S"`.pcap" logger -i "$FILE was just closed" # cp "$FILE" "$DATA/$CAP/$ENV/$NAME" rsync -avz --stats --log-file=/root/rsync.log "$FILE" "$DATA/$CAP/$ENV/$NAME" >> /root/rsync_stats.log RESULT=$? if [ $RESULT -eq 0 ] ; then logger -i "Success: $FILE copied to SAN $DATA/$CAP/$ENV/$NAME, code $RESULT" else logger -i "Fail: $FILE copy failed to SAN for $DATA/$CAP/$ENV/$NAME, code $RESULT" fi rm "$FILE" RESULT=$? if [ $RESULT -eq 0 ] ; then logger -i "Success: deletion successfull for $FILE, code $RESULT" else logger -i "Fail: deletion failed for $FILE on SSD, code $RESULT" fi do_something() logger -i "$NAME was handled" # for stdout echo "`date`: Moved file" done 

我正在将这些文件复制到一个有时会有时间差异的SAN卷上。 这就是为什么这个function会卡住一段时间的原因。 我用Rsyncreplace了cp,因为我需要吞吐量统计信息。 Cp(来自coreutils)显然不会这样做。

一些想法:

1)您可以使用命名管道作为有限大小的队列:

 mkfifo pipe your_message_source | while read MSG do #collect files in a pipe echo "$MSG" >> pipe done & while read MSG do #Do your blocking work here done < pipe 

当管道的缓冲区被填充时,这会阻塞echo "$MSG" >> pipe (你可以用ulimit -p (乘以512)得到该缓冲区的大小,这对某些情况可能是足够的。

2)你可以使用一个文件作为消息队列,文件将其锁定在每个操作上:

  #Feeder part your_message_source | while read MSG do ( flock 9 echo "$MSG" >> file_based_queue ) 9> file_based_queue done & # Worker part while : do #Lock shared queue and cut'n'paste it's content to the worker's private queue ( flock 9 cp file_based_queue workers_queue truncate -s0 file_based_queue ) 9> file_based_queue #process private queue while read MSG do #Do your blocking work here done < workers_queue done 

如果您在(flock …)9> file_based_queue子shell中的工作循环中并且同时在flock命令之后,则只会阻止inotifywait。 您可以将RAMdisk(/ dev / shm)中的队列放在最短的时间,以便您不会错过FS事件。

3)或者你也可以使用一些bash接口(或者用有接口的语言来执行脚本)支持数据库的消息队列或SysV消息队列。

这是一个使用文件作为FIFO队列的样本,具有无限大小,在系统重启时保持不变,并允许多个读者和作者。

 #!/bin/bash # manages a FIFO queue on a system file. # every message is a line of text. # supports multiple writers and multiple readers. # # Requires: bash, inotify-tools: /usr/bin/inotifywait, # ed, util-linux: /usr/bin/flock set -e # Retrieves one element # param: # pipe_name # writes to stdout: # element_string # returns: # true on succes, false on error or end of data _pipe_pull() { local pipe="${1}" local msg pid _pipe_pop() { local fd1 ( if ! flock --timeout 1 --exclusive ${fd1}; then echo "Error: _pipe_pop can't get a lock." >&2 return 1 fi [ ! -s "${pipe}" ] || \ ed -s "${pipe}" <<< $'1p\n1d\nw' ) {fd1}< "${pipe}" : } msg="" while [ -z "${msg}" ]; do if [ ! -s "${pipe}" ]; then inotifywait -e modify "${pipe}" > /dev/null 2>&1 & pid="${!}" wait "${pid}" || return 1 fi msg="$(_pipe_pop)" || \ return 1 if [ "${msg}" = $'\x04' ]; then echo "_pipe_pull: end of data." >&2 return 1 fi done printf '%s\n' "${msg}" : } # Adds multiple elements at once # param: # pipe_name elem_1 ... elem_N # returns: # true on succes, false on error _pipe_push() { local pipe="${1}" local fd1 shift ( if ! flock --timeout 10 --exclusive ${fd1}; then echo "Error: _pipe_push can't get a lock." >&2 return 1 fi printf '%s\n' "${@}" >> "${pipe}" ) {fd1}< "${pipe}" } pipe_file_1="$(mktemp /tmp/pipe-XXXXXX.txt)" # submit first reader process while msg="$(_pipe_pull "${pipe_file_1}")"; do printf 'Reader 1:%s\n' "${msg}" done & # submit second reader process while msg="$(_pipe_pull "${pipe_file_1}")"; do printf 'Reader 2:%s\n' "${msg}" done & # submit first writer process for i in {1..10}; do _pipe_push "${pipe_file_1}" "Next comes ${i}" "${i}" done & pid1="${!}" # submit second writer process for i in {11..20}; do _pipe_push "${pipe_file_1}" "${i}" "Previous was ${i}" done & pid2="${!}" # submit third writer process for i in {21..30}; do _pipe_push "${pipe_file_1}" "${i}" done & pid3="${!}" # waiting for the end of writer processes wait ${pid1} ${pid2} ${pid3} # signal end of data to two readers _pipe_push "${pipe_file_1}" $'\x04' $'\x04' # waiting for the end of reader processes wait # cleaning rm -vf "${pipe_file_1}" :