Python脚本使用while循环来保持更新作业脚本,并在队列中多处理任务

我正在尝试编写一个python脚本来扫描文件夹并收集更新的SQL脚本,然后自动为SQL脚本提取数据。 在代码中,while循环正在扫描新的SQL文件,并发送到数据拉取function。 我很难理解如何使用while循环创build一个dynamic队列,但也有多进程来运行队列中的任务。

下面的代码有一个问题,while循环迭代将工作在一个长期的工作之前,它转移到下一个迭代和收集其他工作,以填补空置的处理器。

更新:

  1. 感谢@pbacterio捕捉错误,现在错误信息消失了。 更改代码后,python代码可以在一次迭代过程中获取所有作业脚本,并将脚本分配给四个处理器。 但是,下一次迭代将会花费很长时间,扫描并提交新添加的作业脚本。 任何想法如何重build代码?

  2. 我终于想出了解决scheme,请参阅下面的答案。 原来,我正在寻找的是

    the_queue = Queue()
    the_pool = Pool(4,worker_main,(the_queue,))

  3. 对于那些碰到类似想法的人来说,下面是这个自动化脚本将共享驱动器转换为“用于SQL拉的服务器”或任何其他作业队列“服务器”的整个架构。

    一个。 Python脚本auto_data_pull.py如答案中所示。 你需要添加自己的工作职能。

    湾 一个“批处理脚本”如下:

    启动C:\ Anaconda2 \ python.exe C:\ Users \ bin \ auto_data_pull.py

    C。 添加启动计算机触发的任务,运行“批处理脚本”即可。 有用。

Python代码:

 from glob import glob import os, time import sys import CSV import re import subprocess import pandas as PD import pypyodbc from multiprocessing import Process, Queue, current_process, freeze_support # # Function run by worker processes # def worker(input, output): for func, args in iter(input.get, 'STOP'): result = compute(func, args) output.put(result) # # Function used to compute result # def compute(func, args): result = func(args) return '%s says that %s%s = %s' % \ (current_process().name, func.__name__, args, result) def query_sql(sql_file): #test func #jsl file processing and SQL querying, data table will be saved to csv. fo_name = os.path.splitext(sql_file)[0] + '.csv' fo = open(fo_name, 'w') print sql_file fo.write("sql_file {0} is done\n".format(sql_file)) return "Query is done for \n".format(sql_file) def check_files(path): """ arguments -- root path to monitor returns -- dictionary of {file: timestamp, ...} """ sql_query_dirs = glob(path + "/*/IDABox/") files_dict = {} for sql_query_dir in sql_query_dirs: for root, dirs, filenames in os.walk(sql_query_dir): [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for filename in filenames if filename.endswith('.jsl')] return files_dict ##### working in single thread def single_thread(): path = "Y:/" before = check_files(path) sql_queue = [] while True: time.sleep(3) after = check_files(path) added = [f for f in after if not f in before] deleted = [f for f in before if not f in after] overlapped = list(set(list(after)) & set(list(before))) updated = [f for f in overlapped if before[f] < after[f]] before = after sql_queue = added + updated # print sql_queue for sql_file in sql_queue: try: query_sql(sql_file) except: pass ##### not working in queue def multiple_thread(): NUMBER_OF_PROCESSES = 4 path = "Y:/" sql_queue = [] before = check_files(path) # get the current dictionary of sql_files task_queue = Queue() done_queue = Queue() while True: #while loop to check the changes of the files time.sleep(5) after = check_files(path) added = [f for f in after if not f in before] deleted = [f for f in before if not f in after] overlapped = list(set(list(after)) & set(list(before))) updated = [f for f in overlapped if before[f] < after[f]] before = after sql_queue = added + updated TASKS = [(query_sql, sql_file) for sql_file in sql_queue] # Create queues #submit task for task in TASKS: task_queue.put(task) for i in range(NUMBER_OF_PROCESSES): p = Process(target=worker, args=(task_queue, done_queue)).start() # try: # p = Process(target=worker, args=(task_queue)) # p.start() # except: # pass # Get and print results print 'Unordered results:' for i in range(len(TASKS)): print '\t', done_queue.get() # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put('STOP') # single_thread() if __name__ == '__main__': # freeze_support() multiple_thread() 

参考:

  1. 使用python脚本监视文件更改: http : //timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
  2. 多:
    https://docs.python.org/2/library/multiprocessing.html

你在哪里定义了multiple_thread()中的sql_file

 multiprocessing.Process(target=query_sql, args=(sql_file)).start() 

您尚未在方法中定义sql_file ,而且您已经在for循环中使用了该变量。 变量的作用域仅限于for循环。

尝试替换这个:

 result = func(*args) 

这样:

 result = func(args) 

我已经明白了这一点。 感谢你的回应启发了这个想法。 现在脚本可以运行一个while循环来监视新的更新/添加的SQL脚本文件夹,然后将数据分发到多个线程。 解决方案来自queue.get()和queue.put()。 我假设队列对象自己负责通信。

这是最终的代码 –

 from glob import glob import os, time import sys import pypyodbc from multiprocessing import Process, Queue, Event, Pool, current_process, freeze_support def query_sql(sql_file): #test func #jsl file processing and SQL querying, data table will be saved to csv. fo_name = os.path.splitext(sql_file)[0] + '.csv' fo = open(fo_name, 'w') print sql_file fo.write("sql_file {0} is done\n".format(sql_file)) return "Query is done for \n".format(sql_file) def check_files(path): """ arguments -- root path to monitor returns -- dictionary of {file: timestamp, ...} """ sql_query_dirs = glob(path + "/*/IDABox/") files_dict = {} try: for sql_query_dir in sql_query_dirs: for root, dirs, filenames in os.walk(sql_query_dir): [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for filename in filenames if filename.endswith('.jsl')] except: pass return files_dict def worker_main(queue): print os.getpid(),"working" while True: item = queue.get(True) query_sql(item) def main(): the_queue = Queue() the_pool = Pool(4, worker_main,(the_queue,)) path = "Y:/" before = check_files(path) # get the current dictionary of sql_files while True: #while loop to check the changes of the files time.sleep(5) sql_queue = [] after = check_files(path) added = [f for f in after if not f in before] deleted = [f for f in before if not f in after] overlapped = list(set(list(after)) & set(list(before))) updated = [f for f in overlapped if before[f] < after[f]] before = after sql_queue = added + updated if sql_queue: for jsl_file in sql_queue: try: the_queue.put(jsl_file) except: print "{0} failed with error {1}. \n".format(jsl_file, str(sys.exc_info()[0])) pass else: pass if __name__ == "__main__": main()