How to manage my message flow?

8

I have this JavaScript code that represents my connection to the WebSockets server, where it receives and sends messages to the server:

var connection = new WebSocket('ws://127.0.0.1:2333');

connection.onopen = function () { // abriu a conexão com o servidor 
};

connection.onmessage = function (e) { // recebe do servidor
};

function sendToServer(data) { // envia mensagens para o servidor
   connection.send(data)
}

Since I do not want messages to be sent and received simultaneously, for example, if the onmessage gate is busy receiving messages, the sendToServer() gate will not send anything until all the entries are completed. With these three criteria:

  • No messages should be missed
  • Everything must happen automatically
  • Sequence must be respected

When I say that no message should be lost I say that if a message from the server comes while an output message is being worked it will be "saved" and "crafted" as soon as possible.

When I say that everything has to happen automatically I mean that nothing can be stopped, if a message was saved to be executed as soon as possible, it has to be executed as soon as possible, it can not be "forgotten" ..

And finally the sequence, the sequence must be respected, but the sequence in which messages arrive / exit within the script. Assuming a A message coming from the server arrives and then while the message A is "worked" another message arrives from the server named B and after B another one arrives from the server called C and while all these, (A, B and C) , wait in queue to be "worked" the user orders that three other messages should be sent to the server, first D , then E and then F , in that order respectively.

Assuming all of this, the final output / input or "work" sequence of this script should be:

A - > B - > C - > D - > E - > F

I think of it as an airport .. but it's kind of complex for me. I tried to apply semaphore but I did not succeed so I have to resort ..

"work" = send / receive pro / from the server

    
asked by anonymous 24.07.2015 / 17:44

2 answers

10

You need a queue : Whenever sendToServer is called, put the data in the queue instead of sending it straight. At the end of the two one:

  • If the system is in a state that allows messages to be sent, send them at once (you only need one);
  • Otherwise do not send.

When the system changes from the "locked" state to the "available" state, send everything in the queue (so you do not have to wait for the next call to sendToServer .)

var connection = new WebSocket('ws://127.0.0.1:2333');

    var fila = [];
    function esvaziaFila() {
        while ( fila.length > 0 ) {
            if ( !podeProcessar )
                return;
          
            var proximo = fila.shift();
            if ( proximo.envio )
                connection.send(proximo.dados);
            if ( proximo.recebimento )
                processar(proximo.evento);
        }
    }
    var podeProcessar = false;
    
    connection.onopen = function () { // abriu a conexão com o servidor 
        document.body.innerHTML += "<pre>Abriu a conexão</pre>";
        podeProcessar = true;
    };
    
    connection.onerror = function (error) { // ocorreu um erro na comunicação
    };
    
    connection.onmessage = function (e) { // recebe do servidor
        rec("cliente", e);
        fila.push({ recebimento:true, evento:e });
        esvaziaFila();
        
    };

    function processar(evento) { // recebe mensagens do servidor
      document.body.innerHTML += "<pre>Cliente vai processar " + JSON.stringify(evento) + "; não faz mais nada até estar pronto!";
      podeProcessar = false;
      
      tarefaDemorada(evento);
    }
    
    function sendToServer(data) { // envia mensagens para o servidor
       fila.push({envio:true, dados:data});
       esvaziaFila();
    }

/********** Mockups para testar ***************/
function rec(classe, dados) {
    document.body.innerHTML += "<pre class='" + classe + "'>" + classe + " recebeu: " + JSON.stringify(dados) + "</pre>";
}
function env(classe, dados) {
    document.body.innerHTML += "<pre class='" + classe + "'>" + classe + " enviou: " + JSON.stringify(dados) + "</pre>";
}

function WebSocket() {
  this.send = function(data) {
    rec("servidor", data);
  }
  // Manda algumas mensagens no futuro
  var self = this;
  setTimeout(function() { self.onopen() }, 500);
  setTimeout(function() { env("servidor","A"); self.onmessage("A") }, 2000);
  setTimeout(function() { env("servidor","B"); self.onmessage("B") }, 4000);
  setTimeout(function() { env("servidor","C"); self.onmessage("C") }, 6000);
}

setTimeout(function() { env("cliente","D"); sendToServer("D"); }, 8000);
setTimeout(function() { env("cliente","E"); sendToServer("E"); }, 10000);

function tarefaDemorada(evento) {
    setTimeout(function() {
        document.body.innerHTML += "<pre>Terminou de processar " + JSON.stringify(evento) + "; pode processar o resto.";
        podeProcessar = true;
        esvaziaFila();
    }, evento == "B" ? 5500 : 1250);
}
.servidor {
  color: blue;
}

.cliente {
  color: red;
}

Alternative: dependency graph

If you have a complex send / receive stream, one way to manage this complexity is to create a dependency graph between your incoming and outgoing messages. I suggest this - not a total order - because it is complicated to say that "event A occurred before event B" when A originated on one machine (eg client) and B on another (eg server) [1] .

An example combining the queue technique with a dependency graph would be as follows:

var fila = [];
function processaFila() {
    while ( fila.length > 0 ) {
        var proximo = fila[fila.length-1];

        if ( !proximo.pronto ) // Se está aguardando outro processamento
            break;             // Não processa mais nada

        fila.pop();
        if ( proximo.envio ) // Se está pronta pra ser enviada
             connection.send(proximo.dados);
        if ( proximo.recebimento ) // Se foi recebida e tem de ser processada
            proximo.processar(proximo.evento);
    }
}

function incluiNaFila(job) {
    var i = fila.length;
    while ( i > 0 ) {
        // Se o próximo da fila precisa ser executado depois da tarefa atual
        if ( dependeDe(fila[i-1], job) )
            break; // Coloca a atual no começo da fila

        // Senão, coloca a atual depois do primeiro da fila
        fila[i] = fila[i-1];
        i--; // Repete a lógica pro segundo da fila, etc
    }
    fila[i] = job;
}

connection.onmessage = function (e) {
    incluiNaFila({
        recebimento:true, pronto:true,
        evento:e,
        processar:function(e) { ... }, // Função que processa a entrada
        ... // outros campos que determinam a dependência
    });
    processaFila();
}

function sendToServer(data) {
    incluiNaFila({
        envio: true, pronto:true,
        dados: data,
        ... // Outros campos que determinam a dependência
    });
    processarFila();
}

function dependeDe(tarefaA, tarefaB) {
    /* Aqui entra sua lógica específica.
       Ela deve retornar false se a tarefa B pode ser executada depois
       de A (B é uma tarefa mais nova que A), ou true se A precisa
       esperar B estar pronta pra executar.
    */
}

After a quick read on the example in pastebin (in the comments), I would suggest something like this:

var sequencial = 0; // Cada mensagem de iniciativa do servidor ou do cliente
                    // recebe um id sequencial; mensagens que são respostas
                    // a outra mensagem recebem o mesmo id da mensagem original.

connection.onmessage = function(e) {
    var id = sequencial++;
    incluiNaFila({
        recebimento:true,
        evento:e,
        processar:function(e) {
            var aguradando = {
                recebimento:true, processar:function(){},
                pronto:false, id:id
            }

            incluiNaFila(aguardando);
            funcaoquetrataamensagem(.......).then(function() {
                aguardando.pronto = true;
                processaFila();
            });
        },
        id:id
    });
    processaFila();
}

// Envia uma mensagem, sem impor nenhuma ordem específica (iniciativa do cliente)
function sendToServer(data) {
    incluiNaFila({
        envio: true, pronto:true,
        dados: data,
        id:sequencial++
    });
    processarFila();        
}

// Envia uma mensagem como resposta a uma mensagem recebida
function respondToServer(idMensagemOriginal, data) {
    incluiNaFila({
        envio: true, pronto:true,
        dados: data,
        id:idMensagemOriginal+1 // Garante que só será enviada quando a
                                // mensagem original tiver sido processada
    });
    processarFila();        
}

function dependeDe(a, b) {
    return ( a.id > b.id );
}

In this example, incoming messages and messages sent on the client's initiative will be processed in the (queue) order, while messages that are replying to another message will be processed only after the original message has been fully processed. Any message that needs to "do more things" before finishing its processing blocks the entire queue - by inserting a new task aguardando with the same id, and consequently the same priority, as the original message.

[1]: There are a number of attempts to solve this problem, but none is perfect. Choosing one side of communication to be authoritative can cause starvation in the other; The technique Operational Tranform is promising, but does not apply to all cases; In some scenarios (eg the BitCoin network), the proof of work can be used to reduce the problem, but it does not completely eliminate it. Etc.

    
24.07.2015 / 19:56
2

I do not know if the WebSocket object has some type control, but you can control this manually

var connection = new WebSocket('ws://127.0.0.1:2333');
var available = true;

connection.onopen = function () { // abriu a conexão com o servidor 
    // mantém available = false
};

connection.onerror = function (error) { // ocorreu um erro na comunicação
    available = true;
};

connection.onmessage = function (e) { // recebe do servidor
    processa(e);
    available = true;
};

function sendToServer(data) { // envia mensagens para o servidor 
    var interval = setInterval(function () {
        if (available) {
            clearInterval(interval);
            available = false;
            connection.send(data);
        }
    }, 1000)
}
    
24.07.2015 / 18:43