vovach777/node.pas

multi thread

Closed this issue · 5 comments

procedure __connection_cb (server: puv_stream_t; Status: integer); cdecl;
begin

// can multithreading uv_accept be used here?

end;

what is multithreading uv_accept? Libuv not designed to be multithreaded (except uv_async_send).
Normaly, You no need multithreading when event system.
For multiplex reason, see:
https://github.com/willemt/uv_multiplex

http://docs.libuv.org/en/v1.x/guide/processes.html#pipes
Please see example of use pipe's IPC handle.

you can pass the handle to the workers threads (ready with uv_loop) but it is better to use several processes for balancing.


{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils,
  np.Core,
  np.libuv,
  np.buffer,
  np.Ut;

  type
     TNPTCPSocketStream = class( TNPTCPStream )
     public
        constructor CreateSocket(fd : uv_os_sock_t );
     end;

  var
    workers : array [1..4] of TLoop;
//    barrier : uv_barrier_t;

  procedure init_worker(num : integer);
  begin
    thread_create(
        procedure
        begin
           workers[num] := loop;
           loop.addTask;
           loop.once(ev_loop_beforeTerminate,
              procedure
              begin
                loop.removeTask;
              end);
           OutputDebugStr('%d worker start!',[num]);
//           uv_barrier_wait(@barrier);
           loophere;
           OutputDebugStr('%d worker end!',[num]);
        end
     );

  end;

  procedure init_workers;
  var  i : integer;
  begin
//     uv_barrier_init(@barrier,HIGH(workers)+1);
     for I := Low(workers) to High(workers) do
     begin
       init_worker(i);
     end;
//     uv_barrier_wait(@barrier);
//     uv_barrier_destroy(@barrier);
  end;

  procedure run_server;
  var
    server : INPTCPServer;
    next_worker: integer;
  begin
          server := TNPTCPServer.Create();
          server.set_nodelay(true);
          server.bind('127.0.0.1',9999);
          next_worker:=0;

          server.setOnClient(
                  procedure (_server: INPTCPServer)
                  var
                    uvclient : uv_tcp_t;
                    socket: uv_os_fd_t;
                  begin
                     stdout.PrintLn('Client connected');
                     inc(next_worker);
                     if next_worker = 5 then
                        next_worker := 1;
                    duv_ok( uv_tcp_init(loop.uvloop, @uvclient ));
                    duv_ok( uv_accept( puv_stream_t( _server._uv_handle ), @uvclient ) );
                    duv_ok( uv_fileno( @uvclient, @socket ) );
                    workers[next_worker].setImmediate(
                      procedure
                      var
                        client : INPTCPStream;
                      begin
                        client := TNPTCPSocketStream.CreateSocket(socket);
                        client.set_nodelay(true);
                        client.setOnData(
                            procedure (data:PBufferRef)
                            begin
                              client.write(data^);
                            end);
                      end
                    );
                  end
               );
          server.listen(1);
  end;

{ TNPTCPSocketStream }

constructor TNPTCPSocketStream.CreateSocket(fd: uv_os_sock_t);
begin
  inherited Create;
  uv_tcp_open(puv_tcp_t( FHandle ),fd);
end;


procedure run_client;
var
  connect: INPTCPConnect;
  addr: TSockAddr_in_any;
begin
  //echo server
  connect := TNPTCPStream.CreateConnect();
  connect.set_nodelay(true);
  uv_ip4_addr('127.0.0.1',9999,addr.ip4 );
  connect.connect(addr);
  connect.setOnConnect(
      procedure
      var
        OutBuf : BufferRef;
        InputBuf: BufferRef;
      begin
        OutBuf := Buffer.Create(8);
        OutBuf.write_as<int64>(0,1);
        InputBuf := Buffer.Null;

        connect.write(OutBuf);
          connect.setOnData(
                procedure (data:PBufferRef)
                var
                  data64: int64;
                begin
                    InputBuf := Buffer.Create( [InputBuf, data^] );
                    while InputBuf.HasSize(8) do
                    begin
                      data64 := InputBuf.unpack<int64>;
                      InputBuf.TrimL(8);
                      if data64 and $FFF = 0 then
                      begin
                        stdout.Print(#27'[2K'#13+IntToStr(data64*100 div $10000)+'%' );
                      end;
                      if data64 <= $10000 then
                      begin
                        inc(data64);
                        connect.write(BufferRef.Pack<int64>(data64));
                      end
                      else
                      begin
                        stdout.PrintLn('');
                        connect.shutdown();
                      end;
                    end;
                end
            );
      end
    );
end;


begin
  try
    IsMultiThread := true;
    loop.SetImmediate(
        procedure
        begin
          init_workers();
          run_server;
          SetInterval(procedure
          begin
             run_client;
          end,2000 );
        end);
    LoopHere;
    WriteLn('loop exit');
    readln;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
end.

barrier commented by issue (((

You have a very special idea! :)