我目前正在试验一个树莓派。 我正在运行Snort,它是数据包检测软件。 在Snort引发警报的情况下,我想执行一个(Python)脚本。
Snort被执行,在一个覆盆子pi如下:
sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf
我创build了一个python脚本,当被调用时,它控制着一个覆盆子pi的GPIO引脚。 把它放在更多的背景下; 当树莓派接收到一个ping / ICMP数据包时,一个红色的警报灯将被同一台设备点亮和控制。
snort规则当前有效,当ICMP数据包到达时,警报输出到控制台。 然而,我不知道如何让snort执行python脚本
以下是3个选项,希望能够工作:
PIPE
的“严格” subprocess
方法 pexpect
方法 – “Pexpect是一个纯粹的Python模块,用于产生子应用程序,控制它们,并在输出中响应预期的模式。 – 而不是这是唯一的非标准包,你将不得不单独从默认的Python安装。 select
来读取文件描述符的方法 每种方法都捆绑在一个try_[SOME APPROACH]
函数中。 您应该能够更新顶部的3个参数,然后在底部对一个方法进行评论/取消注释,以便一蹴而就。
独立测试两半可能是值得的。 换句话说,snort +我的rpi.py
(下图)。 然后,如果这样的话,我的timed_printer.py
(下面)和你的python脚本来切换RPi GPIO。 如果他们都独立工作,那么您可以确信,要使整个工作流程正常运行,不需要做太多的工作。
码
import subprocess _cmd_lst = ['python', '-u', 'timed_printer.py'] # sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf _rpi_lst = ['python', '-u', 'rpi.py'] # python script that toggles RPi _alert = 'TIME' # The keyword you're looking for # in snort output #=============================================================================== # Simple helper function that calls the RPi toggle script def toggle_rpi(): subprocess.call(_rpi_lst) def try_subprocess(cmd_lst, alert, rpi_lst): p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=subprocess.PIPE, bufsize=1) try: while True: for line in iter(p.stdout.readline, b''): print("try_subprocess() read: %s" % line.strip()) if alert in line: print("try_subprocess() found alert: %s" % alert) toggle_rpi() except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...") except Exception as ex: print ex finally: print("Cleaning up...") p.kill() print("Goodbye.") def try_pexpect(cmd_lst, alert, rpi_lst): import pexpect # http://pexpect.sourceforge.net/pexpect.html p = pexpect.spawn(' '.join(cmd_lst)) try: while True: p.expect(alert) # This blocks until <alert> is found in the output of cmd_str print("try_pexpect() found alert: %s" % alert) toggle_rpi() except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...") except Exception as ex: print ex finally: print("Cleaning up...") p.close(force=True) print("Goodbye.") def try_pty(cmd_lst, alert, rpi_lst, MAX_READ=2048): import pty, os, select mfd, sfd = pty.openpty() p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=sfd, bufsize=1) try: while True: rlist, _, _, = select.select([mfd], [], []) if rlist: data = os.read(mfd, MAX_READ) print("try_pty() read: %s" % data.strip()) if not data: print("try_pty() got EOF -- exiting") break if alert in data: print("try_pty() found alert: %s" % alert) toggle_rpi() elif p.poll() is not None: print("try_pty() had subprocess end -- exiting") break except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...") except Exception as ex: print ex finally: print("Cleaning up...") os.close(sfd) os.close(mfd) p.kill() print("Goodbye.") #=============================================================================== try_subprocess(_cmd_lst, _alert, _rpi_lst) #try_pexpect(_cmd_lst, _alert, _rpi_lst) #try_pty(_cmd_lst, _alert, _rpi_lst)
测试笔记
为了模拟你的snort脚本(一个脚本“挂起”,然后打印一些东西,然后回到挂起等),我写了这个简单的python脚本,我叫timed_printer.py
:
import time while True: print("TIME: %s" % time.time()) time.sleep(5)
而我的rpi.py
文件很简单:
print("TOGGLING OUTPUT PIN")
这里没有明确的输出清除,试图最好地模拟正常输出。
最后的考虑
第一种方法是一次只读一整行。 所以如果你希望你的alert
被包含在一行中,你会没事的。
第二种方法( pexpect
)会阻塞,直到遇到alert
。
第三种方法将尽快读取数据 ,我应该指出,不一定是完整的。 如果您看到try_pty() read:
snort输出行的碎片,导致您错过警报,则需要添加某种缓冲解决方案。
文件
subprocess
pexpect
pty
, select
参考文献: 1,2
您可以改为将警报记录到文件中,然后执行类似的操作,但是只需要调用python脚本而不是notify-send。
snort中的实验内容可能很难弄清楚,因为在出现问题时没有太多的支持。
如果管道输出延迟接收警报,直到snort的stdout缓冲区被刷新:
#!/usr/bin/env python from __future__ import print_function from subprocess import Popen, PIPE, STDOUT snort_process = Popen(['snort', '-A', 'console', '-c', 'snort.conf'], stdout=PIPE, stderr=STDOUT, bufsize=1, universal_newlines=True, close_fds=True) with snort_process.stdout: for line in iter(snort_process.stdout.readline, ''): #XXX run python script here: # subprocess.call([sys.executable or 'python', '-m', 'your_module']) print(line, end='') rc = snort_process.wait()
那么你可以尝试一个伪tty来启用snort的边界线buffereing 。
或者运行snort -A unsock
命令并在使用Unix域套接字生成后立即打印每个警报:
#!/usr/bin/env python import ctypes import os import socket from subprocess import Popen from snort import Alertpkt # listen for alerts using unix domain sockets (UDS) snort_log_dir = os.getcwd() server_address = os.path.join(snort_log_dir, 'snort_alert') sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) try: os.remove(server_address) except OSError: pass sock.bind(server_address) # start snort process snort_process = Popen(['snort', '-A', 'unsock', '-l', snort_log_dir, '-c', 'snort.conf'], close_fds=True) # receive alerts alert = Alertpkt() try: while 1: if sock.recv_into(alert) != ctypes.sizeof(alert): break # EOF #XXX run python script here `subprocess.call([sys.executable or 'python', '-m', 'your_module'])` print("{:03d} {}".format(alert.val, alert.data)) except KeyboardInterrupt: pass finally: sock.close() os.remove(server_address) if snort_process.poll() is None: # the process is still running snort_process.kill() snort_process.wait() # wait for snort process to exit
在你的情况下,你可以在每个警报上运行脚本而不是打印。
snort.Alertpkt
是C struct Alertpkt
的ctypes定义 。
要尝试它, 除了所有的python模块之外 ,还可以下载包含一个虚拟snort
脚本的要点,并运行run-script-on-alert-unsock.py
(或run-script-on-alert-pty.py
)。