reflex-frp/reflex

EventWriter-style effect which survives from within PushM and PullM

parenthetical opened this issue · 3 comments

Hi all! I hope this is an appropriate venue for this question.

As some of you may know I'm trying to implement a distributed extension to Reflex.
So far I've been implementing a single primitive: a distributed fold over all known/transmitted event occurrences:

class ( ... ) => Distributed t m
  foldDistributed :: (CommutativeMonoid a) => Event t a -> m (Dynamic t a)

I was able to implement this in a pure way (like Reflex.Pure), but an efficient implementation on top of an arbitratry Reflex t instance is proving to be more difficult.

My strategy is to uniquely identify the sites where the distributed fold is used and then collect local event occurrences that went into to the fold. These occurrences then have to be transmitted to other distributed instances where they are combined into the value of the dynamic with the same identifier and vice versa.

I've managed to solve the identity problem with Ryan's help, but I'm going in circles trying to solve the next part.

What I'd need is to somehow exfiltrate the input events going into each foldDistributedwherever that fold is. If the value of a distributed fold touched the output of the program in any way, its existence needs to be communicated to the other program instances. It sounds a lot like an EventWriter, but then one of which the tellEvent effect survives from within push and the like.

Is such a thing reasonably possible to implement?

I've been trying to abstract this aspect of my implementation in a module below, but I'm having a lot of trouble with the interaction between PushM/PullM/MonadSample/etc. so I'm probably not going about it the right way.
In the end w should be something like Map Identifier (Event t Updates).

{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module CollectingTimeline where

import Prelude hiding (filter)
import Control.Monad.Fix
import Control.Monad.Reader
import Control.Monad.Writer.Strict
import Reflex.Class
import Reflex.Dynamic (foldDyn)
import Data.Witherable (catMaybes)

data CollectingTimeline w t

newtype CollectM w t m a =
  CollectM { unCollectM :: (WriterT (Updates w t) m) a }
  deriving (Functor, Applicative, Monad, MonadFix, MonadIO)

type Updates w t = Dynamic t w -- Event or Incremental probably better.
type WithUpdates w t a = ((PushM t) (Updates w t), a)

instance ( Reflex t
         , Monoid w
         ) => Reflex (CollectingTimeline w t) where
  newtype Behavior (CollectingTimeline w t) a =
    Behavior' { _unBehavior' :: (WithUpdates w t (Behavior t a)) }
  newtype Event (CollectingTimeline w t) a =
    Event' { unEvent' :: (WithUpdates w t (Event t a)) }
  newtype Dynamic (CollectingTimeline w t) a =
    Dynamic' { unDynamic' :: (WithUpdates w t (Dynamic t a)) }
  newtype Incremental (CollectingTimeline w t) a =
    Incremental' { unIncremental' :: (WithUpdates w t (Incremental t a)) }
  type PushM (CollectingTimeline w t) = CollectM w t (PushM t)
  type PullM (CollectingTimeline w t) = CollectM w t (PullM t)
  push f (Event' (mw, e)) =
    let e' = pushAlways (runWriterT . unCollectM . f) e
    in Event' ( (<>) <$> mw <*> (fmap join . foldDyn mappend mempty . fmap snd $ e')
              , catMaybes . fmap fst $ e'
              )
  pull :: forall a. PullM (CollectingTimeline w t) a -> Behavior (CollectingTimeline w t) a
  pull a =
    let x :: (PullM t) (a, Dynamic t w) = runWriterT . unCollectM $ a
        y :: Behavior t (a, Dynamic t w) = pull x
    in Behavior' ( holdDyn mempty . switch . fmap (updated . snd) $ y
                 , fmap fst y
                 )
  -- ...

instance (Reflex t) => Functor (Dynamic (CollectingTimeline w t)) where
-- ...

instance (Reflex t) => Applicative (Dynamic (CollectingTimeline w t)) where
-- ...

instance (Reflex t) => Monad (Dynamic (CollectingTimeline w t)) where
-- ...

instance (Monoid w, Reflex t, MonadSample t m) => MonadSample (CollectingTimeline w t) (CollectM w t m) where
  sample :: forall a. Behavior (CollectingTimeline w t) a -> (CollectM w t m) a
  sample (Behavior' (mw, b)) = CollectM $ do
    -- "tell" mw somehow? (PushM t) =/ m... ***
    lift $ sample b

instance ( Reflex t
         , MonadHold t m
         , Monoid w
         ) => MonadHold (CollectingTimeline w t) (CollectM w t m) where
  hold :: a -> Event (CollectingTimeline w t) a -> CollectM w t m (Behavior (CollectingTimeline w t) a)
  hold a (Event' (mw, e)) = CollectM $ do
    -- What to do with mw here? ***
    b <- lift $ hold a e
    return $ Behavior' (mw, b)
  holdDyn :: a -> Event (CollectingTimeline w t) a -> CollectM w t m (Dynamic (CollectingTimeline w t) a)
  holdDyn = undefined
  holdIncremental :: Patch p
                  => PatchTarget p
                  -> Event (CollectingTimeline w t) p
                  -> CollectM w t m (Incremental (CollectingTimeline w t) p)
  holdIncremental = undefined
xplat commented

Would it work to just WriterT-transform all of the things in your timeline transformer? (PushM and PullM of course, but also Event, Behavior, and Dynamic?) It seems like Does that allow you to read the result out in the kind of context you need, or is there some obstacle? Seems like it would be straightforward given that Reflex t (including MonadSample and MonadHold) is a simple combination of algebraic effects and functorial interpreters.

I'm definitely more concerned about the next step after that, since I'm not sure it's even possible to implement headE in a way that makes sense if foldDistributed returns a Dynamic.

I'm going to give that a try, thanks. Unfortunately I found out that my identifier-generating wasn't being done correctly so I'm going to try to work that out first. Maybe I was doing the "collecting" bit right before but my tests just weren't working for this other reason.

The code below seems to work as far as I've been able to test. How does it look?
It might be possible to generalize this quite a bit if there's ever a need for that.

{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE UndecidableInstances #-}

module EventWriterTimeline where

import Prelude hiding (filter)
import Control.Monad.Identity
import Control.Monad.Reader
import Control.Monad.State.Strict
import Reflex.Class
import Reflex.Dynamic (foldDyn, switchDyn)
import Unsafe.Coerce (unsafeCoerce)
import Reflex.EventWriter.Class
import Data.Function ((&))
import Data.Functor.Misc
import qualified Data.Map.Strict as Map


data EventWriterTimeline w t

data Updates w t
  = NoUpdates
  | Singleton (Event t w)
  | JoinE (Event t (Updates w t))
  | JoinB (Behavior t (Updates w t))
  | JoinPush ((PushM t) (Updates w t))
  | JoinList [Updates w t]
  | Cons (Updates w t) (Updates w t)

instance Semigroup (Updates w t) where
  (<>) = Cons
instance Monoid (Updates w t) where
  mempty = NoUpdates

runUpdates :: forall w t.
         ( Reflex t
         , Semigroup w
         ) => Updates w t -> (PushM t) (Event t w)
runUpdates = \case
  NoUpdates -> return never
  Singleton e -> return e
  JoinE es -> do
    fmap switchDyn . foldDyn (<>) never . pushAlways runUpdates $ es
  JoinB b -> runUpdates =<< sample b
  JoinPush mu -> runUpdates =<< mu
  JoinList us -> fmap mconcat (mapM runUpdates us)
  Cons a b -> (<>) <$> runUpdates a <*> runUpdates b

newtype EventWriterTimelineT w t m a =
  EventWriterTimelineT { unEventWriterTimelineT :: (StateT (Updates w t) m) a }
  deriving (Functor, Applicative, Monad, MonadFix, MonadIO)

instance ( Monad m
         , Semigroup w
         ) => EventWriter (EventWriterTimeline w t) w (EventWriterTimelineT w t m) where
  tellEvent (Event' w) = do
    w' <- mappendState w
    modify (<> Singleton w')

instance (Monad m) => MonadState (Updates w t) (EventWriterTimelineT w t m) where
  get = EventWriterTimelineT get
  put = EventWriterTimelineT . put
  state = EventWriterTimelineT . state
                      
instance MonadTrans (EventWriterTimelineT w t) where
  lift = EventWriterTimelineT . lift

instance ( Monoid w
         , Reflex t
         , MonadReader r m
         ) => MonadReader r (EventWriterTimelineT w t m) where
  ask = lift ask
  local f (EventWriterTimelineT ma) = EventWriterTimelineT $ do
    (a,s') <- lift $ local f (flip runStateT mempty ma)
    modify (<> s')
    return a

mappendState :: ( MonadState s m
                , Monoid s
                )
             => State s a
             -> m a
mappendState a = do
    let (a',s') = flip runState mempty a
    modify (<> s')
    return a'

type WithUpdates w t a = (State (Updates w t) a)

runEventWriterTimeline :: ( Reflex t
                         , Semigroup w
                         )
                      => EventWriterTimelineT w t (PushM t) (WithUpdates w t a)
                      -> (PushM t) (a, Event t w)
runEventWriterTimeline ma = do
  (a,u) <- flip runStateT mempty . unEventWriterTimelineT $ ma
  let (a',u') = flip runState mempty a
  u'' <- runUpdates (Cons u u')
  return (a',u'')

instance ( Reflex t
         , Monoid w
         ) => Reflex (EventWriterTimeline w t) where
  newtype Behavior (EventWriterTimeline w t) a =
    Behavior' { _unBehavior' :: (WithUpdates w t (Behavior t a)) }
  newtype Event (EventWriterTimeline w t) a =
    Event' { unEvent' :: (WithUpdates w t (Event t a)) }
  newtype Dynamic (EventWriterTimeline w t) a =
    Dynamic' { unDynamic' :: (WithUpdates w t (Dynamic t a)) }
  newtype Incremental (EventWriterTimeline w t) a =
    Incremental' { unIncremental' :: (WithUpdates w t (Incremental t a)) }
  type PushM (EventWriterTimeline w t) = EventWriterTimelineT w t (PushM t)
  type PullM (EventWriterTimeline w t) = EventWriterTimelineT w t (PullM t)
  push :: forall a b. (a -> PushM (EventWriterTimeline w t) (Maybe b)) -> Event (EventWriterTimeline w t) a -> Event (EventWriterTimeline w t) b
  push f (Event' me) = Event' $ do
    e' :: Event t (Maybe b, Updates w t) <-
      push (fmap Just . flip runStateT mempty . unEventWriterTimelineT . f) <$> me
    modify (<> JoinE (fmap snd e'))
    return $ Reflex.Class.mapMaybe fst e'
  pushCheap f (Event' me) = Event' $ do
    e' <- pushCheap (fmap Just . flip runStateT mempty . unEventWriterTimelineT . f) <$> me
    modify (<> JoinE (fmap snd e'))
    return $ Reflex.Class.mapMaybe fst e'
  coincidence (Event' e) = Event' $ do
    e' <- e
    let e'' = fmap (flip runState mempty . unEvent') e'
    modify (<> JoinE (fmap snd e''))
    return $ coincidence (fmap fst e'')
  unsafeBuildDynamic :: forall a.
                        PullM (EventWriterTimeline w t) a
                        -> Event (EventWriterTimeline w t) a
                        -> Dynamic (EventWriterTimeline w t) a
  unsafeBuildDynamic readV0 (Event' e) = Dynamic' $ do
    let mv0 :: PullM t (a, Updates w t) =
          flip runStateT mempty . unEventWriterTimelineT $ readV0
    modify (<> JoinB (fmap snd . pull $ mv0))
    unsafeBuildDynamic (fst <$> mv0) <$> e
  unsafeBuildIncremental :: forall p.
                            Patch p
                         => PullM (EventWriterTimeline w t) (PatchTarget p)
                         -> Event (EventWriterTimeline w t) p
                         -> Incremental (EventWriterTimeline w t) p
  unsafeBuildIncremental readV0 (Event' e) = Incremental' $ do
    let mv0 :: PullM t (PatchTarget p, Updates w t) =
          flip runStateT mempty . unEventWriterTimelineT $ readV0
    modify (<> JoinB (fmap snd . pull $ mv0))
    unsafeBuildIncremental (fst <$> mv0) <$> e
  pull :: forall a. PullM (EventWriterTimeline w t) a -> Behavior (EventWriterTimeline w t) a
  pull (EventWriterTimelineT a) = Behavior' $ do
    let x :: Behavior t (a, Updates w t) = pull (flip runStateT mempty a)
    modify (<> JoinB (fmap snd x))
    return (fmap fst x)
  switch (Behavior' b) = Event' $ do
    b' <- b
    let b'' = fmap (flip runState mempty . unEvent') b'
    modify (<> JoinB (fmap snd b''))
    return $ switch (fst <$> b'')
  never = Event' . return $ never
  constant = Behavior' . return . constant
  updated = Event' . fmap updated . unDynamic'
  current = Behavior' . fmap current . unDynamic'
  currentIncremental = Behavior' . fmap currentIncremental . unIncremental'
  updatedIncremental = Event' . fmap updatedIncremental . unIncremental'
  incrementalToDynamic = Dynamic' . fmap incrementalToDynamic . unIncremental'
  behaviorCoercion = unsafeCoerce
  eventCoercion = unsafeCoerce
  dynamicCoercion = unsafeCoerce
  incrementalCoercion = unsafeCoerce
  -- mergeG :: forall k1 (k2 :: k1 -> *) (q :: k1 -> *) (v :: k1 -> *) (a0 :: k1).
  --           GCompare k2 =>
  --           (forall (a :: k1). q a -> Event (EventWriterTimeline w t) (v a))
  --           -> DMap k2 q
  --           -> Event (EventWriterTimeline w t) (DMap k2 v)
  mergeG f m = Event' $ do
    modify (<> (JoinList . Map.elems . weakenDMapWith (flip execState mempty . unEvent' . f) $ m))
    return . mergeG (flip evalState mempty . unEvent' . f) $ m
  mergeIncrementalG f (Incremental' m) = Event' $ do
    m' <- m
    let x :: Event t [Updates w t] =
          updatedIncremental m'
          & fmap (patchMapNewElements
                  . weakenPatchDMapWith (flip execState mempty . unEvent' . f))
    modify (<> (JoinE (fmap JoinList x)))
    return . (mergeIncrementalG (flip evalState mempty . unEvent' . f)) $ m'
  mergeIncrementalWithMoveG f (Incremental' m) = Event' $ do
    m' <- m
    let x :: Event t [Updates w t] =
          updatedIncremental m'
          & fmap (patchMapWithMoveNewElements
                  . weakenPatchDMapWithMoveWith (flip execState mempty . unEvent' . f))
    modify (<> (JoinE (fmap JoinList x)))
    return . (mergeIncrementalWithMoveG (flip evalState mempty . unEvent' . f)) $ m'
  mergeIntIncremental (Incremental' i) = Event' $ do
    i' <- i
    let x :: Event t [Updates w t] =
          updatedIncremental i'
          & fmap (patchIntMapNewElements
                  . fmap (flip execState mempty . unEvent'))
    modify (<> (JoinE (fmap JoinList x)))
    return
      . mergeIntIncremental
      . unsafeMapIncremental
        (fmap (\(Event' e) -> flip evalState mempty e))
        (fmap (\(Event' e) -> flip evalState mempty e))
      $ i'
  fanG (Event' e) = EventSelectorG $ \k -> Event' $ do
    e' <- e
    return . selectG (fanG e') $ k
  fanInt (Event' e) = EventSelectorInt $ \k -> Event' $ do
    e' <- e
    return . selectInt (fanInt e') $ k

instance (Reflex t, Semigroup w) => Functor (Dynamic (EventWriterTimeline w t)) where
  fmap f = Dynamic' . fmap (fmap f) . unDynamic'

instance (Reflex t, Semigroup w) => Applicative (Dynamic (EventWriterTimeline w t)) where
  pure = pure
  (Dynamic' df) <*> (Dynamic' da) = Dynamic' $ (<*>) <$> df <*> da

instance (Reflex t, Semigroup w) => Monad (Dynamic (EventWriterTimeline w t)) where
  return = pure
  (>>=) :: forall a b
    .  (Dynamic (EventWriterTimeline w t)) a
    -> (a -> (Dynamic (EventWriterTimeline w t)) b)
    -> (Dynamic (EventWriterTimeline w t)) b
  Dynamic' mda >>= f = Dynamic' $ do
    da :: Dynamic t a <- mda
    let g :: a -> Dynamic t (b, Updates w t) =
          ((\(d,updates) -> fmap (,updates) d)
            . flip runState mempty
            . unDynamic'
            . f)
    let dbw :: Dynamic t (b, Updates w t) = (da >>= g)
    modify (<> (JoinE . updated . fmap snd $ dbw))
    return $ fmap fst dbw

instance (Monoid w, Reflex t, MonadSample t m) => MonadSample (EventWriterTimeline w t) (EventWriterTimelineT w t m) where
  sample :: forall a. Behavior (EventWriterTimeline w t) a -> (EventWriterTimelineT w t m) a
  sample (Behavior' mb) = EventWriterTimelineT $ do
    b <- mappendState mb
    lift $ sample b

instance ( Reflex t
         , MonadHold t m
         , Monoid w
         ) => MonadHold (EventWriterTimeline w t) (EventWriterTimelineT w t m) where
  hold :: forall a
    . a
    -> Event (EventWriterTimeline w t) a
    -> EventWriterTimelineT w t m (Behavior (EventWriterTimeline w t) a)
  hold a (Event' me) = EventWriterTimelineT $ do
    e <- mappendState me
    b <- lift $ hold a e
    return . Behavior' . return $ b
  holdDyn :: forall a
    . a
    -> Event (EventWriterTimeline w t) a
    -> EventWriterTimelineT w t m (Dynamic (EventWriterTimeline w t) a)
  holdDyn a (Event' me) =
    fmap (Dynamic' . return) . lift . holdDyn a =<< mappendState me
  holdIncremental :: Patch p
                  => PatchTarget p
                  -> Event (EventWriterTimeline w t) p
                  -> EventWriterTimelineT w t m (Incremental (EventWriterTimeline w t) p)
  holdIncremental p (Event' e) =
    fmap (Incremental' . return) . lift . holdIncremental p =<< mappendState e
  buildDynamic :: forall a.
                  EventWriterTimelineT w t (PushM t) a
               -> Event (EventWriterTimeline w t) a
               -> EventWriterTimelineT w t m (Dynamic (EventWriterTimeline w t) a)
  buildDynamic pushV0 (Event' e) = do
    let mv0 :: PushM t (a, Updates w t) =
          flip runStateT mempty . unEventWriterTimelineT $ pushV0
    modify (<> JoinPush (fmap snd mv0))
    e' <- mappendState e
    Dynamic' . return <$> (lift $ buildDynamic (fst <$> mv0) e')
  headE :: forall a.
           Event (EventWriterTimeline w t) a
        -> EventWriterTimelineT w t m (Event (EventWriterTimeline w t) a)
  headE (Event' e) = do
    e' <- mappendState e
    EventWriterTimelineT . fmap (Event' . return) . headE $ e'