Threads in python 3.4

6

Well, I'm having a hard time using threads, I need to do the following:

I have several items to compare with a variable, but this comparison takes a while, so I would like to make several comparisons at the same time and if some comparison is true, stop all the others and go to another part of the code.

I tried to use the join, but it ended up delaying the program because I need to wait for comparisons that became irrelevant from the moment I found what I needed. I also tried without the join, but the execution of other threads end up disturbing the flow of my program.

Something like this:

Para cada 10 itens da lista 
Use cada item em uma comparação distinta 
Se alguma das 10 comparações retornar como True, feche todas as outras comparações e vá para outra função 
Senão, pegue mais 10 itens da lista e refaça a comparação

More generally I did this:

palavras = ['palavra1', 'palavra2', 'palavra3', 'palavra4']
nth = 2
threads = []
def execute(palavra):
    print('\ntentando palavra'+palavra)
    time.sleep(5)
    print(palavra+'finalizada')
for nome in palavras:
    threads.append(nome)
    if len(threads)>= nth:
        for item in threads:
            _thread.start_new_thread(execute, (item,))
        threads = []

However, it starts threads continuously. I needed to keep only 10, that is, it takes 10 items, compares and as you finish a comparison, it takes one more item to always stay in 10 comparisons.

    
asked by anonymous 15.04.2015 / 16:51

2 answers

2
The output near the point you are in this case is to keep a "record" of how many threads you have already fired, and only add new threads when you hear "space." Your program creates a maximum number of threads, but it does not have any code to add new threads (or pass new elements to existing threads) after reaching that maximum number. The logic for this is sweating 'while', 'if' and one or two variables to count how many threads are active, and fire more if the number of threads is less than their limit (in case 10).

The "standard" solution for this type of problem is a little more elegant, however: it passes through the creation of a fixed set of threads - with the desired number of threads - this set in the literature is called "threadpool "- in practice it is a collection - which can be a list, where each element is one of its threads - which in this context is called" worker "

In this case a data structure called "queue" is used which is fed by a main thread, and from where each worker thread pulls elements. This way, a worker thread can pull a new element to process as soon as it finishes the previous job, regardless of what others are doing.

In other words: you put elements in the queue on the main thread - each of the previously created worker threads gets in a continuous loop by taking the next element in the queue and processing it.

You need some other way to pass information to the worker threads to say that the processing is gone, and they may be terminated. Typically this is done by placing a "Marker" object in the queue, and the threads stop when they find it. In your scenario however, it would take a little more logic to get the elements in line gradually so that the Marker would not stay at the end of the queue (and you get back to your initial problem) - then for simpler scenarios: simpler solutions: a global variable "COMPLETE" is used, and set by a worker thread that finds the result.

Note that both in thread theory and implementation in lower-level languages, this would be much more complicated: there are race conditions for the global variable to be used, which would have to be taken into account - in the case of Python, the GIL (Global Interpreter Lock) takes care of this for us - and, the Queues are already existing classes, using internally the required locks - so it is very simple to use them without major concerns.

(The price paid for this is fairly ). If the CPU-intensive threads are in a pure Python algorithm, the GIL is not released during execution of the algorithm, and its gain using threads The alternative would be to use "multiprocessing" rather than "threading" - this puts each worker in a separate process, and ends the GIL problem (but you will need another not the global variable to synchronize the Workers) - Or, write your execute function in Cython, and use the call available in that Python super-set to release GIL.

Here's the example using Python3's threading and Queue with your scenario:

from threading import Thread
from queue import Queue
import random
import time

COMPLETE = False
class Worker(Thread):
    def __init__(self, queue, output):
        self.queue = queue
        self.output = output
        super(Worker, self).__init__()

    def run(self):
        while not COMPLETE:
            element = self.queue.get()
            self.execute(element)

    def execute(self, palavra):
        global COMPLETE
        print('\ntentando palavra'+palavra)
        time.sleep(1)
        print(palavra+' finalizada')
        if random.randint(0, 10) == 5:
            COMPLETE = True
            self.output.put(palavra)

def main(palavras, nworkers):
    queue = Queue()
    result_queue = Queue()
    threads = [Worker(queue, result_queue) for _ in range(nworkers)]
    for t in threads:
        t.start()
    for p in palavras:
        queue.put(p)
    result = result_queue.get()
    print ("O resultado final é:", result)


palavras = ['palavra_{}'.format(i) for i in range(50)]
main(palavras, nworkers = 10)

To learn more, see the Queue documentation: link (even has an example similar to this there)

    
16.04.2015 / 20:09
0

Why would not it be better to create an executor and give him his tasks? For example, the following will get a list of numbers up to 1000 and return the factorial. I will call math.factorial on each element of the list and put it in a dictionary, running only 10 operations at a time.

from math import factorial
from pprint import pprint
from concurrent.futures import ThreadPoolExecutor 

lista = range(1000)

with ThreadPoolExecutor(max_workers=10) as executor:
    pprint({num: fat for num, fat in zip(lista, executor.map(factorial, lista))})

Basically the executor uses a map () to apply the function to each element of the list.

If you want to use multiprocessing.Pool instead of concurrent.futures.ThreadPoolExecutor also works, and then you would be doing true parallelism with multiple processes instead of Threads.

    
05.08.2017 / 21:30