I have a process that needs to break into multithreads, basically it works like this: It has Phases every phase has a process queue, each process has several files to process.
Type Phase1 > Fila1, Fila2, Fila3 > Fila1: Process1: File1, Process1: FileN
So I open a thread for each queue on the main:
If I have 3 phases, and the Number of Queues is 10, I open 10 Threads.
My problem is like this, I use open the Pool for the Queues, and soon I open the Queue Threads I open N Threads for Processes.
The First Queue process is an sql in the database, it usually works. The sequential process is processing from the previous This process dies before or from RunTime Error Lib:
from concurrent.futures import ThreadPoolExecutor
Code Opening Threads for Phases:
self.pools_process = ThreadPoolExecutor(len(self.mapping_execute))
result = []
for p in self.mapping_execute:
result.append(self.pools_process.submit(ThreadManager,4,p))
Code Phase that calls and handles files:
def ThreadManager(threads,params):
pool_exec = ThreadPoolExecutor(threads,params["output_name"]+"_")
result = []
input_file = ""
output_file = ""
if(params["input_name"] == "None"):
folder_input = definitions.DIR_DATA+str(params["output_name"]+"\")
folder_output = definitions.DIR_DATA+str(params["output_name"]+"\")
onlyfiles = [f for f in listdir(folder_input) if isfile(join(folder_input, f))]
for file in onlyfiles:
#print("pass")
pool_exec.submit(params["function"],folder_input+"\"+file)
else:
folder_input = str(definitions.DIR_DATA+str(params["input_name"]+"\"))
folder_output = str(definitions.DIR_DATA+str(params["output_name"]+"\"))
execs = 30
file_result = []
while True:
execs = execs - 1
time.sleep(2)
inputfiles = [f for f in listdir(folder_input) if isfile(join(folder_input, f))]
outputfiles = [f for f in listdir(folder_output) if isfile(join(folder_output, f))]
#print(inputfiles)
for i in inputfiles:
if (i.find("done") > 0 and i not in file_result):
print("File Done: ",i)
file_result.append(i)
input_f = i.split(".")[0].split("_")
input_id = input_f[len(input_f)-1]
output_f = i.split(".")[0].split("_")
output_id = output_f[len(output_f)-1]
input_file = str(folder_input.replace("\","\\"))+params["input_name"]+"_"+input_id+".done"
output_file = str(folder_output.replace("\","\\"))+params["output_name"]+"_"+output_id+".make"
print ("/n",input_file,output_file )
if (input_id != output_id):
print("Ids Diferentes")
else:
try:
print(params["function"],input_file,output_file)
#etl_paid_search(input_file,output_file) #<-- Its WORKS
pool_exec.submit(etl_paid_search,input_file,output_file) #<== Its return: Unexpected error: <class 'RuntimeError'>
except OSError as err:
print("OS error: {0}".format(err))
except ValueError:
print("Could not convert data to an integer.")
except:
print("Unexpected error:", sys.exc_info()[0])
raise
if(execs < 0):
print("Saida por TimeOut")