EventWriter-style effect which survives from within PushM and PullM
parenthetical opened this issue · 3 comments
Hi all! I hope this is an appropriate venue for this question.
As some of you may know I'm trying to implement a distributed extension to Reflex.
So far I've been implementing a single primitive: a distributed fold over all known/transmitted event occurrences:
class ( ... ) => Distributed t m
foldDistributed :: (CommutativeMonoid a) => Event t a -> m (Dynamic t a)
I was able to implement this in a pure way (like Reflex.Pure
), but an efficient implementation on top of an arbitratry Reflex t
instance is proving to be more difficult.
My strategy is to uniquely identify the sites where the distributed fold is used and then collect local event occurrences that went into to the fold. These occurrences then have to be transmitted to other distributed instances where they are combined into the value of the dynamic with the same identifier and vice versa.
I've managed to solve the identity problem with Ryan's help, but I'm going in circles trying to solve the next part.
What I'd need is to somehow exfiltrate the input events going into each foldDistributed
—wherever that fold is. If the value of a distributed fold touched the output of the program in any way, its existence needs to be communicated to the other program instances. It sounds a lot like an EventWriter
, but then one of which the tellEvent
effect survives from within push
and the like.
Is such a thing reasonably possible to implement?
I've been trying to abstract this aspect of my implementation in a module below, but I'm having a lot of trouble with the interaction between PushM/PullM/MonadSample/etc. so I'm probably not going about it the right way.
In the end w
should be something like Map Identifier (Event t Updates)
.
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module CollectingTimeline where
import Prelude hiding (filter)
import Control.Monad.Fix
import Control.Monad.Reader
import Control.Monad.Writer.Strict
import Reflex.Class
import Reflex.Dynamic (foldDyn)
import Data.Witherable (catMaybes)
data CollectingTimeline w t
newtype CollectM w t m a =
CollectM { unCollectM :: (WriterT (Updates w t) m) a }
deriving (Functor, Applicative, Monad, MonadFix, MonadIO)
type Updates w t = Dynamic t w -- Event or Incremental probably better.
type WithUpdates w t a = ((PushM t) (Updates w t), a)
instance ( Reflex t
, Monoid w
) => Reflex (CollectingTimeline w t) where
newtype Behavior (CollectingTimeline w t) a =
Behavior' { _unBehavior' :: (WithUpdates w t (Behavior t a)) }
newtype Event (CollectingTimeline w t) a =
Event' { unEvent' :: (WithUpdates w t (Event t a)) }
newtype Dynamic (CollectingTimeline w t) a =
Dynamic' { unDynamic' :: (WithUpdates w t (Dynamic t a)) }
newtype Incremental (CollectingTimeline w t) a =
Incremental' { unIncremental' :: (WithUpdates w t (Incremental t a)) }
type PushM (CollectingTimeline w t) = CollectM w t (PushM t)
type PullM (CollectingTimeline w t) = CollectM w t (PullM t)
push f (Event' (mw, e)) =
let e' = pushAlways (runWriterT . unCollectM . f) e
in Event' ( (<>) <$> mw <*> (fmap join . foldDyn mappend mempty . fmap snd $ e')
, catMaybes . fmap fst $ e'
)
pull :: forall a. PullM (CollectingTimeline w t) a -> Behavior (CollectingTimeline w t) a
pull a =
let x :: (PullM t) (a, Dynamic t w) = runWriterT . unCollectM $ a
y :: Behavior t (a, Dynamic t w) = pull x
in Behavior' ( holdDyn mempty . switch . fmap (updated . snd) $ y
, fmap fst y
)
-- ...
instance (Reflex t) => Functor (Dynamic (CollectingTimeline w t)) where
-- ...
instance (Reflex t) => Applicative (Dynamic (CollectingTimeline w t)) where
-- ...
instance (Reflex t) => Monad (Dynamic (CollectingTimeline w t)) where
-- ...
instance (Monoid w, Reflex t, MonadSample t m) => MonadSample (CollectingTimeline w t) (CollectM w t m) where
sample :: forall a. Behavior (CollectingTimeline w t) a -> (CollectM w t m) a
sample (Behavior' (mw, b)) = CollectM $ do
-- "tell" mw somehow? (PushM t) =/ m... ***
lift $ sample b
instance ( Reflex t
, MonadHold t m
, Monoid w
) => MonadHold (CollectingTimeline w t) (CollectM w t m) where
hold :: a -> Event (CollectingTimeline w t) a -> CollectM w t m (Behavior (CollectingTimeline w t) a)
hold a (Event' (mw, e)) = CollectM $ do
-- What to do with mw here? ***
b <- lift $ hold a e
return $ Behavior' (mw, b)
holdDyn :: a -> Event (CollectingTimeline w t) a -> CollectM w t m (Dynamic (CollectingTimeline w t) a)
holdDyn = undefined
holdIncremental :: Patch p
=> PatchTarget p
-> Event (CollectingTimeline w t) p
-> CollectM w t m (Incremental (CollectingTimeline w t) p)
holdIncremental = undefined
Would it work to just WriterT-transform all of the things in your timeline transformer? (PushM
and PullM
of course, but also Event
, Behavior
, and Dynamic
?) It seems like Does that allow you to read the result out in the kind of context you need, or is there some obstacle? Seems like it would be straightforward given that Reflex t (including MonadSample and MonadHold) is a simple combination of algebraic effects and functorial interpreters.
I'm definitely more concerned about the next step after that, since I'm not sure it's even possible to implement headE
in a way that makes sense if foldDistributed
returns a Dynamic
.
I'm going to give that a try, thanks. Unfortunately I found out that my identifier-generating wasn't being done correctly so I'm going to try to work that out first. Maybe I was doing the "collecting" bit right before but my tests just weren't working for this other reason.
The code below seems to work as far as I've been able to test. How does it look?
It might be possible to generalize this quite a bit if there's ever a need for that.
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE UndecidableInstances #-}
module EventWriterTimeline where
import Prelude hiding (filter)
import Control.Monad.Identity
import Control.Monad.Reader
import Control.Monad.State.Strict
import Reflex.Class
import Reflex.Dynamic (foldDyn, switchDyn)
import Unsafe.Coerce (unsafeCoerce)
import Reflex.EventWriter.Class
import Data.Function ((&))
import Data.Functor.Misc
import qualified Data.Map.Strict as Map
data EventWriterTimeline w t
data Updates w t
= NoUpdates
| Singleton (Event t w)
| JoinE (Event t (Updates w t))
| JoinB (Behavior t (Updates w t))
| JoinPush ((PushM t) (Updates w t))
| JoinList [Updates w t]
| Cons (Updates w t) (Updates w t)
instance Semigroup (Updates w t) where
(<>) = Cons
instance Monoid (Updates w t) where
mempty = NoUpdates
runUpdates :: forall w t.
( Reflex t
, Semigroup w
) => Updates w t -> (PushM t) (Event t w)
runUpdates = \case
NoUpdates -> return never
Singleton e -> return e
JoinE es -> do
fmap switchDyn . foldDyn (<>) never . pushAlways runUpdates $ es
JoinB b -> runUpdates =<< sample b
JoinPush mu -> runUpdates =<< mu
JoinList us -> fmap mconcat (mapM runUpdates us)
Cons a b -> (<>) <$> runUpdates a <*> runUpdates b
newtype EventWriterTimelineT w t m a =
EventWriterTimelineT { unEventWriterTimelineT :: (StateT (Updates w t) m) a }
deriving (Functor, Applicative, Monad, MonadFix, MonadIO)
instance ( Monad m
, Semigroup w
) => EventWriter (EventWriterTimeline w t) w (EventWriterTimelineT w t m) where
tellEvent (Event' w) = do
w' <- mappendState w
modify (<> Singleton w')
instance (Monad m) => MonadState (Updates w t) (EventWriterTimelineT w t m) where
get = EventWriterTimelineT get
put = EventWriterTimelineT . put
state = EventWriterTimelineT . state
instance MonadTrans (EventWriterTimelineT w t) where
lift = EventWriterTimelineT . lift
instance ( Monoid w
, Reflex t
, MonadReader r m
) => MonadReader r (EventWriterTimelineT w t m) where
ask = lift ask
local f (EventWriterTimelineT ma) = EventWriterTimelineT $ do
(a,s') <- lift $ local f (flip runStateT mempty ma)
modify (<> s')
return a
mappendState :: ( MonadState s m
, Monoid s
)
=> State s a
-> m a
mappendState a = do
let (a',s') = flip runState mempty a
modify (<> s')
return a'
type WithUpdates w t a = (State (Updates w t) a)
runEventWriterTimeline :: ( Reflex t
, Semigroup w
)
=> EventWriterTimelineT w t (PushM t) (WithUpdates w t a)
-> (PushM t) (a, Event t w)
runEventWriterTimeline ma = do
(a,u) <- flip runStateT mempty . unEventWriterTimelineT $ ma
let (a',u') = flip runState mempty a
u'' <- runUpdates (Cons u u')
return (a',u'')
instance ( Reflex t
, Monoid w
) => Reflex (EventWriterTimeline w t) where
newtype Behavior (EventWriterTimeline w t) a =
Behavior' { _unBehavior' :: (WithUpdates w t (Behavior t a)) }
newtype Event (EventWriterTimeline w t) a =
Event' { unEvent' :: (WithUpdates w t (Event t a)) }
newtype Dynamic (EventWriterTimeline w t) a =
Dynamic' { unDynamic' :: (WithUpdates w t (Dynamic t a)) }
newtype Incremental (EventWriterTimeline w t) a =
Incremental' { unIncremental' :: (WithUpdates w t (Incremental t a)) }
type PushM (EventWriterTimeline w t) = EventWriterTimelineT w t (PushM t)
type PullM (EventWriterTimeline w t) = EventWriterTimelineT w t (PullM t)
push :: forall a b. (a -> PushM (EventWriterTimeline w t) (Maybe b)) -> Event (EventWriterTimeline w t) a -> Event (EventWriterTimeline w t) b
push f (Event' me) = Event' $ do
e' :: Event t (Maybe b, Updates w t) <-
push (fmap Just . flip runStateT mempty . unEventWriterTimelineT . f) <$> me
modify (<> JoinE (fmap snd e'))
return $ Reflex.Class.mapMaybe fst e'
pushCheap f (Event' me) = Event' $ do
e' <- pushCheap (fmap Just . flip runStateT mempty . unEventWriterTimelineT . f) <$> me
modify (<> JoinE (fmap snd e'))
return $ Reflex.Class.mapMaybe fst e'
coincidence (Event' e) = Event' $ do
e' <- e
let e'' = fmap (flip runState mempty . unEvent') e'
modify (<> JoinE (fmap snd e''))
return $ coincidence (fmap fst e'')
unsafeBuildDynamic :: forall a.
PullM (EventWriterTimeline w t) a
-> Event (EventWriterTimeline w t) a
-> Dynamic (EventWriterTimeline w t) a
unsafeBuildDynamic readV0 (Event' e) = Dynamic' $ do
let mv0 :: PullM t (a, Updates w t) =
flip runStateT mempty . unEventWriterTimelineT $ readV0
modify (<> JoinB (fmap snd . pull $ mv0))
unsafeBuildDynamic (fst <$> mv0) <$> e
unsafeBuildIncremental :: forall p.
Patch p
=> PullM (EventWriterTimeline w t) (PatchTarget p)
-> Event (EventWriterTimeline w t) p
-> Incremental (EventWriterTimeline w t) p
unsafeBuildIncremental readV0 (Event' e) = Incremental' $ do
let mv0 :: PullM t (PatchTarget p, Updates w t) =
flip runStateT mempty . unEventWriterTimelineT $ readV0
modify (<> JoinB (fmap snd . pull $ mv0))
unsafeBuildIncremental (fst <$> mv0) <$> e
pull :: forall a. PullM (EventWriterTimeline w t) a -> Behavior (EventWriterTimeline w t) a
pull (EventWriterTimelineT a) = Behavior' $ do
let x :: Behavior t (a, Updates w t) = pull (flip runStateT mempty a)
modify (<> JoinB (fmap snd x))
return (fmap fst x)
switch (Behavior' b) = Event' $ do
b' <- b
let b'' = fmap (flip runState mempty . unEvent') b'
modify (<> JoinB (fmap snd b''))
return $ switch (fst <$> b'')
never = Event' . return $ never
constant = Behavior' . return . constant
updated = Event' . fmap updated . unDynamic'
current = Behavior' . fmap current . unDynamic'
currentIncremental = Behavior' . fmap currentIncremental . unIncremental'
updatedIncremental = Event' . fmap updatedIncremental . unIncremental'
incrementalToDynamic = Dynamic' . fmap incrementalToDynamic . unIncremental'
behaviorCoercion = unsafeCoerce
eventCoercion = unsafeCoerce
dynamicCoercion = unsafeCoerce
incrementalCoercion = unsafeCoerce
-- mergeG :: forall k1 (k2 :: k1 -> *) (q :: k1 -> *) (v :: k1 -> *) (a0 :: k1).
-- GCompare k2 =>
-- (forall (a :: k1). q a -> Event (EventWriterTimeline w t) (v a))
-- -> DMap k2 q
-- -> Event (EventWriterTimeline w t) (DMap k2 v)
mergeG f m = Event' $ do
modify (<> (JoinList . Map.elems . weakenDMapWith (flip execState mempty . unEvent' . f) $ m))
return . mergeG (flip evalState mempty . unEvent' . f) $ m
mergeIncrementalG f (Incremental' m) = Event' $ do
m' <- m
let x :: Event t [Updates w t] =
updatedIncremental m'
& fmap (patchMapNewElements
. weakenPatchDMapWith (flip execState mempty . unEvent' . f))
modify (<> (JoinE (fmap JoinList x)))
return . (mergeIncrementalG (flip evalState mempty . unEvent' . f)) $ m'
mergeIncrementalWithMoveG f (Incremental' m) = Event' $ do
m' <- m
let x :: Event t [Updates w t] =
updatedIncremental m'
& fmap (patchMapWithMoveNewElements
. weakenPatchDMapWithMoveWith (flip execState mempty . unEvent' . f))
modify (<> (JoinE (fmap JoinList x)))
return . (mergeIncrementalWithMoveG (flip evalState mempty . unEvent' . f)) $ m'
mergeIntIncremental (Incremental' i) = Event' $ do
i' <- i
let x :: Event t [Updates w t] =
updatedIncremental i'
& fmap (patchIntMapNewElements
. fmap (flip execState mempty . unEvent'))
modify (<> (JoinE (fmap JoinList x)))
return
. mergeIntIncremental
. unsafeMapIncremental
(fmap (\(Event' e) -> flip evalState mempty e))
(fmap (\(Event' e) -> flip evalState mempty e))
$ i'
fanG (Event' e) = EventSelectorG $ \k -> Event' $ do
e' <- e
return . selectG (fanG e') $ k
fanInt (Event' e) = EventSelectorInt $ \k -> Event' $ do
e' <- e
return . selectInt (fanInt e') $ k
instance (Reflex t, Semigroup w) => Functor (Dynamic (EventWriterTimeline w t)) where
fmap f = Dynamic' . fmap (fmap f) . unDynamic'
instance (Reflex t, Semigroup w) => Applicative (Dynamic (EventWriterTimeline w t)) where
pure = pure
(Dynamic' df) <*> (Dynamic' da) = Dynamic' $ (<*>) <$> df <*> da
instance (Reflex t, Semigroup w) => Monad (Dynamic (EventWriterTimeline w t)) where
return = pure
(>>=) :: forall a b
. (Dynamic (EventWriterTimeline w t)) a
-> (a -> (Dynamic (EventWriterTimeline w t)) b)
-> (Dynamic (EventWriterTimeline w t)) b
Dynamic' mda >>= f = Dynamic' $ do
da :: Dynamic t a <- mda
let g :: a -> Dynamic t (b, Updates w t) =
((\(d,updates) -> fmap (,updates) d)
. flip runState mempty
. unDynamic'
. f)
let dbw :: Dynamic t (b, Updates w t) = (da >>= g)
modify (<> (JoinE . updated . fmap snd $ dbw))
return $ fmap fst dbw
instance (Monoid w, Reflex t, MonadSample t m) => MonadSample (EventWriterTimeline w t) (EventWriterTimelineT w t m) where
sample :: forall a. Behavior (EventWriterTimeline w t) a -> (EventWriterTimelineT w t m) a
sample (Behavior' mb) = EventWriterTimelineT $ do
b <- mappendState mb
lift $ sample b
instance ( Reflex t
, MonadHold t m
, Monoid w
) => MonadHold (EventWriterTimeline w t) (EventWriterTimelineT w t m) where
hold :: forall a
. a
-> Event (EventWriterTimeline w t) a
-> EventWriterTimelineT w t m (Behavior (EventWriterTimeline w t) a)
hold a (Event' me) = EventWriterTimelineT $ do
e <- mappendState me
b <- lift $ hold a e
return . Behavior' . return $ b
holdDyn :: forall a
. a
-> Event (EventWriterTimeline w t) a
-> EventWriterTimelineT w t m (Dynamic (EventWriterTimeline w t) a)
holdDyn a (Event' me) =
fmap (Dynamic' . return) . lift . holdDyn a =<< mappendState me
holdIncremental :: Patch p
=> PatchTarget p
-> Event (EventWriterTimeline w t) p
-> EventWriterTimelineT w t m (Incremental (EventWriterTimeline w t) p)
holdIncremental p (Event' e) =
fmap (Incremental' . return) . lift . holdIncremental p =<< mappendState e
buildDynamic :: forall a.
EventWriterTimelineT w t (PushM t) a
-> Event (EventWriterTimeline w t) a
-> EventWriterTimelineT w t m (Dynamic (EventWriterTimeline w t) a)
buildDynamic pushV0 (Event' e) = do
let mv0 :: PushM t (a, Updates w t) =
flip runStateT mempty . unEventWriterTimelineT $ pushV0
modify (<> JoinPush (fmap snd mv0))
e' <- mappendState e
Dynamic' . return <$> (lift $ buildDynamic (fst <$> mv0) e')
headE :: forall a.
Event (EventWriterTimeline w t) a
-> EventWriterTimelineT w t m (Event (EventWriterTimeline w t) a)
headE (Event' e) = do
e' <- mappendState e
EventWriterTimelineT . fmap (Event' . return) . headE $ e'