Mazyod/PhoenixSharp

DelayedExecutor confusion

Closed this issue · 3 comments

I'm trying to use this library with Unity3D. At first I thought the built in timer would be acceptable but it executes on a background thread, so I'm unable to update my gameobjects through that since Unity3d's API is not thread-safe.

So I tried to implement a delayedexecutor with coroutines using Unity3d and I'm unable to get it working. My client sends a message two times to the channel two times, but then never sends a message again. Additionally, my callback:

        roomChannel.On("room_update:0,0", msg => _process_server_state(msg.payload));

Never gets called with this new delayedExecutor (it did get called with the default Timer executor);

Here is my delayedexecutor implementation:

    using System;
    using System.Collections;
    using System.Collections.Generic;
    using UnityEngine;

    namespace Phoenix {

        public class CoroutineExecutor : MonoBehaviour, IDelayedExecutor 
        {		
            private uint id = 1;
            private Dictionary<uint, Coroutine> coroutines = new Dictionary<uint, Coroutine>();


            public uint Execute(Action action, TimeSpan delay)
            {
                var id = this.id++;
                var coroutine = ActionCoroutine(action, delay);
                var coroutineStarted = StartCoroutine(coroutine);
                coroutines[id] = coroutineStarted;
                return id;
            }

            IEnumerator ActionCoroutine(Action action, TimeSpan delay) { 
                Debug.Log("started actioncoroutine with delay" + delay.TotalSeconds);
                yield return new WaitForSeconds((float) delay.TotalSeconds); 
                action();
                Debug.Log("Finished actioncoroutine");
            }
            
            public void Cancel(uint id)
            {
                if (coroutines.ContainsKey(id)) {
                    StopCoroutine(coroutines[id]);
                    coroutines.Remove(id);
                }
            }
        }
    }

I attached this script to a unity3d gameobject, and then in the Options section, added it to the value of timer:

        var socket = new Socket(socketFactory, new Socket.Options
        {
            logger = new BasicLogger(),
            delayedExecutor = GetComponent<CoroutineExecutor>()
        });

I'm a little confused why a delayedexector is needed, but I assume the delay is when you check for a response from the channel? So for a realtime game I would want this very small, correct? Perhaps 100ms or something.

Anyways, do you have any ideas why my implementation might not work for the timer?

Here is some logging:

Debug, socket, on open
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Socket:WebsocketOnOpen(IWebsocket) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:219)
Phoenix.WebsocketSharpAdapter:OnWebsocketOpen(Object, EventArgs) (at Assets/Scripts/Vendor/Phoenix/WebSocketSharp.cs.cs:55)
WebSocketSharp.Ext:Emit(EventHandler, Object, EventArgs)
WebSocketSharp.WebSocket:open()
WebSocketSharp.WebSocket:Connect()
Phoenix.WebsocketSharpAdapter:Connect() (at Assets/Scripts/Vendor/Phoenix/WebSocketSharp.cs.cs:35)
Phoenix.Socket:Connect(String, Dictionary`2) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:191)
GameServer:Start() (at Assets/Scripts/Game/GameServer.cs:51)
Trace, push, {"topic":"phoenix","event":"heartbeat","ref":null,"payload":{}}
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Socket:Push(Message) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:134)
Phoenix.Socket:SendHeartbeat() (at Assets/Scripts/Vendor/Phoenix/Socket.cs:102)
Phoenix.Socket:WebsocketOnOpen(IWebsocket) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:223)
Phoenix.WebsocketSharpAdapter:OnWebsocketOpen(Object, EventArgs) (at Assets/Scripts/Vendor/Phoenix/WebSocketSharp.cs.cs:55)
WebSocketSharp.Ext:Emit(EventHandler, Object, EventArgs)
WebSocketSharp.WebSocket:open()
WebSocketSharp.WebSocket:Connect()
Phoenix.WebsocketSharpAdapter:Connect() (at Assets/Scripts/Vendor/Phoenix/WebSocketSharp.cs.cs:35)
Phoenix.Socket:Connect(String, Dictionary`2) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:191)
GameServer:Start() (at Assets/Scripts/Game/GameServer.cs:51)
Trace, push, {"topic":"room:0,0","event":"phx_join","ref":"1","payload":{}}
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Socket:Push(Message) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:134)
Phoenix.Channel:Push(Push, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:144)
Phoenix.Channel:Push(Message, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:131)
Phoenix.Channel:Join(Dictionary`2, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:70)
GameServer:Start() (at Assets/Scripts/Game/GameServer.cs:58)
Trace, socket, received: [] phoenix: phx_reply
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Socket:WebsocketOnMessage(IWebsocket, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:275)
Phoenix.WebsocketSharpAdapter:OnWebsocketMessage(Object, MessageEventArgs) (at Assets/Scripts/Vendor/Phoenix/WebSocketSharp.cs.cs:70)
WebSocketSharp.Ext:Emit(EventHandler`1, Object, MessageEventArgs)
WebSocketSharp.WebSocket:acceptDataFrame(WsFrame)
WebSocketSharp.WebSocket:acceptFrame(WsFrame)
WebSocketSharp.<startReceiving>c__AnonStoreyE:<>m__F(WsFrame)
WebSocketSharp.<ParseAsync>c__AnonStorey1E:<>m__22(Byte[])
WebSocketSharp.<ReadBytesAsync>c__AnonStorey9:<>m__2(IAsyncResult)
System.Threading._ThreadPoolWaitCallback:PerformWaitCallback()

Trace, socket, received: [1] room:0,0: phx_reply
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Socket:WebsocketOnMessage(IWebsocket, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:275)
Phoenix.WebsocketSharpAdapter:OnWebsocketMessage(Object, MessageEventArgs) (at Assets/Scripts/Vendor/Phoenix/WebSocketSharp.cs.cs:70)
WebSocketSharp.Ext:Emit(EventHandler`1, Object, MessageEventArgs)
WebSocketSharp.WebSocket:acceptDataFrame(WsFrame)
WebSocketSharp.WebSocket:acceptFrame(WsFrame)
WebSocketSharp.<startReceiving>c__AnonStoreyE:<>m__F(WsFrame)
WebSocketSharp.<ParseAsync>c__AnonStorey1E:<>m__22(Byte[])
WebSocketSharp.<ReadBytesAsync>c__AnonStorey9:<>m__2(IAsyncResult)
System.Threading._ThreadPoolWaitCallback:PerformWaitCallback()

Trace, push, {"topic":"room:0,0","event":"client_input","ref":"2","payload":{"move":{"x":0.0,"y":0.0,"z":0.0}}}
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Socket:Push(Message) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:134)
Phoenix.Channel:Push(Push, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:144)
Phoenix.Channel:Push(Message, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:131)
Phoenix.Channel:PushJson(String, JObject, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:122)
Phoenix.Channel:Push(String, Dictionary`2, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:118)
PlayerControl:Update() (at Assets/Scripts/Game/PlayerControl.cs:28)

Debug, channel, join ok room:0,0
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Channel:OnJoinReply(Reply) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:216)
Phoenix.Channel:OnReply(Message) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:240)
Phoenix.Channel:Trigger(Message, String) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:286)
Phoenix.Socket:WebsocketOnMessage(IWebsocket, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:278)
Phoenix.WebsocketSharpAdapter:OnWebsocketMessage(Object, MessageEventArgs) (at Assets/Scripts/Vendor/Phoenix/WebSocketSharp.cs.cs:70)
WebSocketSharp.Ext:Emit(EventHandler`1, Object, MessageEventArgs)
WebSocketSharp.WebSocket:acceptDataFrame(WsFrame)
WebSocketSharp.WebSocket:acceptFrame(WsFrame)
WebSocketSharp.<startReceiving>c__AnonStoreyE:<>m__F(WsFrame)
WebSocketSharp.<ParseAsync>c__AnonStorey1E:<>m__22(Byte[])
WebSocketSharp.<ReadBytesAsync>c__AnonStorey9:<>m__2(IAsyncResult)
System.Threading._ThreadPoolWaitCallback:PerformWaitCallback()

Info, socket, An exception has occurred while receiving a message.
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Socket:WebsocketOnError(IWebsocket, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:255)
Phoenix.WebsocketSharpAdapter:OnWebsocketError(Object, ErrorEventArgs) (at Assets/Scripts/Vendor/Phoenix/WebSocketSharp.cs.cs:65)
WebSocketSharp.Ext:Emit(EventHandler`1, Object, ErrorEventArgs)
WebSocketSharp.WebSocket:error(String)
WebSocketSharp.WebSocket:acceptException(Exception, String)
WebSocketSharp.<startReceiving>c__AnonStoreyE:<>m__10(Exception)
WebSocketSharp.<ReadBytesAsync>c__AnonStorey9:<>m__2(IAsyncResult)
System.Threading._ThreadPoolWaitCallback:PerformWaitCallback()

Info, channel, room:0,0: socket error
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Channel:TriggerError(String) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:262)
Phoenix.Channel:Trigger(Message, String) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:294)
Phoenix.Channel:SocketTerminated(String) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:209)
Phoenix.<>c__DisplayClass27_0:<TriggerChannelError>b__0(Channel) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:115)
System.Collections.Generic.List`1:ForEach(Action`1)
Phoenix.Socket:TriggerChannelError(String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:113)
Phoenix.Socket:WebsocketOnError(IWebsocket, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:258)
Phoenix.WebsocketSharpAdapter:OnWebsocketError(Object, ErrorEventArgs) (at Assets/Scripts/Vendor/Phoenix/WebSocketSharp.cs.cs:65)
WebSocketSharp.Ext:Emit(EventHandler`1, Object, ErrorEventArgs)
WebSocketSharp.WebSocket:error(String)
WebSocketSharp.WebSocket:acceptException(Exception, String)
WebSocketSharp.<startReceiving>c__AnonStoreyE:<>m__10(Exception)
WebSocketSharp.<ReadBytesAsync>c__AnonStorey9:<>m__2(IAsyncResult)
System.Threading._ThreadPoolWaitCallback:PerformWaitCallback()

And then it gets logging this event over and over again, but the server does not receive these message:

Trace, push, {"topic":"room:0,0","event":"client_input","ref":"3","payload":{"move":{"x":0.0,"y":0.0,"z":0.0}}}
UnityEngine.Debug:Log(Object)
BasicLogger:Log(LogLevel, String, String) (at Assets/Scripts/Game/GameServer.cs:13)
Phoenix.Socket:Log(LogLevel, String, String) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:147)
Phoenix.Socket:Push(Message) (at Assets/Scripts/Vendor/Phoenix/Socket.cs:134)
Phoenix.Channel:Push(Push, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:144)
Phoenix.Channel:Push(Message, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:131)
Phoenix.Channel:PushJson(String, JObject, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:122)
Phoenix.Channel:Push(String, Dictionary`2, Nullable`1) (at Assets/Scripts/Vendor/Phoenix/Channel.cs:118)
PlayerControl:Update() (at Assets/Scripts/Game/PlayerControl.cs:28)

I apologize for not responding sooner to this.

Regarding the "delay" aspect, it's not a "delay" in the sense that it will slow down the network communication or anything like that, it's just a bad name I chose for "async execution" that happens to usually be executed after a certain period of time.

For example, we need to trigger timeout exception if we don't get a reply from the server within a certain period of time. This is where "DelayedExecutor" comes in.

As for the implementation, this is my own implementation which is working for my game:

using System;
using System.Collections.Generic;
using Phoenix;
using UnityEngine;


namespace DKNetwork {

	public sealed class CoroutineDelayedExecutor: IDelayedExecutor {

		private uint id = 0;
		private Dictionary<uint, Coroutine> coroutines = new Dictionary<uint, Coroutine>();


		public uint Execute(Action action, TimeSpan delay) {

			if (CoroutineManager.Instance == null) {
				return 0; // application quitting
			}

			var id = ++this.id;
			coroutines[id] = CoroutineManager.Instance.PerformAsync(() => {
				action();
				coroutines.Remove(id);
			}, delay);

			return id;
		}

		public void Cancel(uint id) {

			if (CoroutineManager.Instance == null) {
				return; // application quitting
			}

			if (coroutines.ContainsKey(id)) {
				CoroutineManager.Instance.StopCoroutine(coroutines[id]);
				coroutines.Remove(id);
			}
		}
	}
}
using System;
using UnityEngine;
using System.Collections;

/** Simple singleton that we use to attach coroutines
 */
public sealed class CoroutineManager : Singleton<CoroutineManager> {

	private IEnumerator CallAsync(Action action, TimeSpan? delay) {

		if (delay.HasValue) {
			yield return new WaitForSeconds((float)delay.Value.TotalSeconds);
		} else {
			yield return new WaitForEndOfFrame();
		}

		action();
	}

	private IEnumerator CallContinuously(Action action, TimeSpan? interval) {

		while (true) {
			if (interval.HasValue) {
				yield return new WaitForSecondsRealtime((float)interval.Value.TotalSeconds);
			} else {
				yield return new WaitForEndOfFrame();
			}

			action();
		}
	}

	// convenient method to quickly schedule stuff for next frame
	public Coroutine PerformAsync(Action action, TimeSpan? delay = null) {
		return StartCoroutine(CallAsync(action, delay));
	}

	public Coroutine RunForever(Action action, TimeSpan? interval = null) {
		return StartCoroutine(CallContinuously(action, interval));
	}
}

Kindly reopen in case the above doesn't answer your query.