C # Parallel.ForEach equivalent in JavaScript

6

I'm trying to write methods with behavior similar to Array.prototype.forEach and Array.prototype.map , but using multiple threads.

For Array.prototype.parallelMap , I did the following:

Array.prototype.parallelMap = function (callback, complete) {
  var self = this;
  var count = 1;
  var result = [];
 
  var onmessage = "onmessage = " + function (event) {  	
    var indice = event.data[0];
    var object = event.data[1]; 

    var result = callback(object, indice)
    postMessage([indice, result]);
  }.toString();
  
  callback = "var callback = " + callback.toString();

  var blob = new Blob([onmessage, "\n\r", callback], { type: 'text/javascript' });
  var _url = URL.createObjectURL(blob);

  var onmessage = function (event) {
    result[event.data[0]] = event.data[1];
    if (self.length == count++) {
      complete(result);
    }
  }

  self.forEach(function (object, indice) {
    var worker = new Worker(_url);
    worker.onmessage = onmessage;
    worker.postMessage([indice, object]);
  });
}

var numeros = [0, 2, 4, 6, 8, 1, 3, 5, 7, 9];

numeros.parallelMap(function (numero, indice) {
  return numero + Math.random();
}, function (result) {
  console.log(result);
});

But for Array.prototype.parallelForEach I need to be able to modify the collection within the function, my callBack function should only serve to inform that I have finished interacting the collection:

Array.prototype.parallelForEach = function (callback, finish) { ... };

var numeros = [0, 2, 4, 6, 8, 1, 3, 5, 7, 9];
numeros.parallelMap(function (numero, indice) {
    numero += Math.random(); //não estou conseguindo persistir esta modificação.
}, function () {
    console.log(numeros); // os valores da coleção numeros deveriam está alterados neste ponto.
});
    
asked by anonymous 19.01.2016 / 21:24

1 answer

4

To modify the collection within the onmessage function, you need to use a reference to the collection itself, instead of the variable 'result' :

self[event.data[0]] = event.data[1];

Just to complement the answer:

The Parallel.ForEach method in the .NET framework performs a processing on each of the elements of an array and this processing can be done using parallelism.

At runtime, .NET evaluates the available resources (eg, number of processors and memory) and the amount of processing available on the machine to determine whether:

  • It pays or does not execute the map operation in parallel
  • in how many threads the work will be divided (if there is advantage in parallelism)

Assuming your Javascript code runs on a webpage, you can not access all of this system information to decide whether it's worth running parallel.

Even if you prefer to 'force' parallel execution, you need to determine how many threads will run the service, and for this you need to know, for example, how many processors (physical and logical) are available in the system. number of threads exceeds the number of cores available, there may be performance loss.

In your code, you are creating one thread per array element, and this is likely to reduce performance because the effort to create the threads will be much greater than the gain from parallel processing.

A common solution to this type of implementation is to create a threads pool and split the array into parts, which will be processed separately on each thread.

In the onmessage method, the pool code collects the results and updates the elements of the array.

After the last thread is executed, you call the finish callback.

One important point that needs to be evaluated is also that communication between the main thread and the workers is done by copying the parameters and not by reference, and this can lead to loss of performance depending on the algorithm you are going to implement. / p>

Below is a commented example of implementing this concept with a pool of 4 threads, however I do not recommend using this code in any production environment.

One suggestion for you to test the above concepts is to implement a "timer" and do several tests with different arrays sizes (small, medium, giant) with different thread numbers and, if possible, on different machines ( with different processor quantities) to observe implementation behavior and performance.

// Determina o número de threads no pool
var NUMERO_DE_THREADS = 4

// Cria o método parallelMap
Array.prototype.parallelMap = function (callback, finish) {
    var self = this;
    // Pool de threads
    var pool = [];
    // Status da thread. false => já terminou o trabalho
    var status = [];

    // Corpo do objeto Worker
    var source = "onmessage = " + function (event) {
        // Posição no array
        var posicao = event.data[0];
        // Trecho do array a ser processado
        var dados = event.data[1];
        // Callback de processamento
        eval("var cb = " + event.data[2]);
        // ID da thread
        var id = event.data[3]

        // Efetua o processamento do trecho do array
        for (var i=0; i<dados.length; i++)
            dados[i] = cb(dados[i], i);
        // Retorna o resultado
        postMessage([id, posicao, dados]);
        // Finaliza o Worker
        close();
    }.toString();

    var blob = new Blob([source], { type: 'text/javascript' });
    var _url = URL.createObjectURL(blob);

    // Cria um pool de workers
    for (var i=0; i<NUMERO_DE_THREADS; i++) {
        // Seta o status do worker como true => trabalhando
        status[i] = true;
        // Cria o Worker
        pool[i] = new Worker(_url);
        // Processa o retorno do Worker
        pool[i].onmessage = function(e) {
            var id = e.data[0];
            var posicao = e.data[1];
            var resultado = e.data[2];
            // Atualiza o array com os resultados
            for (var j=0; j<resultado.length; j++)
                self[posicao+j] = resultado[j];
            // Sinaliza que este Worker terminou o trabalho
            status[id] = false;
            // Retorna se algum Worker ainda estiver trabalhando
            for (var j=0; j<NUMERO_DE_THREADS; j++)
                if (status[j])
                    return;
            // Todos terminaram o trabalho, chama o callback finish
            finish(self);
        }
    }

    // Calcula o tamanho do trabalho de cada Worker, divindo o tamanho
    // do array pelo número de threads
    var tamanhoDoTrabalho = Math.floor(self.length / NUMERO_DE_THREADS);

    // Inicia o trabalho do pool e envia cada parte do array para
    // um Worker
    var posicao = 0;
    for (var i=0; i<NUMERO_DE_THREADS; i++) {
        var trabalho;

        if (i<NUMERO_DE_THREADS-1)
            trabalho = self.slice(posicao, posicao+tamanhoDoTrabalho);
        else
            trabalho = self.slice(posicao);
        pool[i].postMessage([posicao, trabalho, callback.toString(), i]);
        posicao += tamanhoDoTrabalho;
    };
}

var numeros = [7, 2, 4, 6, 8, 1, 3, 5, 7, 9];

numeros.parallelMap(function (numero, indice) {
        return numero + Math.random()*200;
    }
    , function (resultado) {
        console.log(resultado); // os valores da coleção numeros deveriam está alterados neste ponto.
    }
);

Update:

In response to the comment about not using return within the callback, in this case it is necessary to access the array element through the indice parameter, since the element is not passed by reference to the callback function, can be updated directly.

Changing the line:

dados[i] = cb(dados[i], i);

To:

cb(dados, i);

The callback function will not need return .

So the call of the parallelMap function is as follows:

numeros.parallelMap(function (numero, indice) {
            numero[indice] += Math.random()*200;
        }
        , function (resultado) {
            console.log(resultado);
        }
    );

Libraries for parallel execution in Javascript:

Parallel.js - Parallel Computing with Javascript

Hamsters.js | Parallel Javascript

threads.js

Inline Worker

    
25.01.2016 / 22:08