bakwc/PySyncObj

change notification

Closed this issue · 5 comments

I'd like to have some kind of notification mechanism which I can tap into when synchronised objects are changing. I suppose the PipeNotifier maybe something that does this. Unfortunately there is no documentation or an example available for it.

If the PipeNotifier is not the right way, how would one be able to trigger a mechanism (e.g. a callback function) in case a particular synchronised object managed by the SyncObject is updated? Maybe an example for this (I presume relatively common) use case would be good to provide, too.

bakwc commented

The easiest way is to add callbacks manually:

class MyCounter(SyncObj):
    def __init__(self, myCallback):
        self.__myCallback = myCallback # you should store it before calling super
        super(MyCounter, self).__init__('serverA:4321', ['serverB:4321', 'serverC:4321'])
        self.__counter = 0

    @replicated
    def incCounter(self):
        self.__counter += 1
        self.__myCallback()

    def getCounter(self):
        return self.__counter

myCallback will be called on each node each time incCounter is called. Keep in mind that callback will be called from separate thread, so you should use a mutex if your callback modifies some global state.

+1
Thanks for the swift response. Very helpful!

I've just tried it, and come to experience some odd phenomena:

  • There seems to be an endless loop, that keeps syncing, and setting the new value on all nodes over and over again, starting from the same old value (seems like the new value is never set).
  • The second set on the MyReplDict is never propagated (or never shows up in the callback).

Here's the simple sample code I've used to test it (using Python 3.6).

Note: I've also tried the same with a global callback function rather than a method on a different class with the same result.

"""
Start in three terminals with the following, just using different ports:
$ python pysyncobj_trial_callback.py localhost:4321
"""

import sys
from pysyncobj import SyncObj
from pysyncobj.batteries import ReplDict, replicated

HOSTS = ['localhost:4321', 'localhost:4322', 'localhost:4323']


class MyReplDict(ReplDict):
    def __init__(self, my_callback):
        # Callback needs to be stored before calling super.
        self._my_callback = my_callback
        super().__init__()

    @replicated
    def set(self, key, value):
        old_value = self.get(key)
        super().set(key, value)
        self._my_callback(key, old_value, value)
    
    
class SyncerTest:
    def __init__(self, me):
        self.me = me
        self.others = HOSTS.copy()
        del self.others[HOSTS.index(me)]
        self.my_dict = MyReplDict(self.on_set_cb)
        self.sync_obj = SyncObj(self.me, self.others, consumers=[self.my_dict])
        self.update_count = 0

    def on_set_cb(self, key, old_value, new_value):
        self.update_count += 1
        print('key: {}, old_value: {}, new_value: {}, count: {}'
              .format(key, old_value, new_value, self.update_count))
    
    def do_stuff(self):
        print('*** Sync set.')
        self.my_dict.set('test_key_1', 'test_value_1', sync=True)
        print('*** Async set.')
        # The same as previous, but asynchronous (non-blocking).
        self.my_dict['test_key_2'] = 'test_value_2'
        print('*** Stuff done.')

def main():
    me = sys.argv[1]
    my_syncer = SyncerTest(me)
    if me == 'localhost:4323':
        my_syncer.do_stuff()
    print('Exit with ENTER key press.'))
    input()
    
    
if __name__ == '__main__':
    main()
bakwc commented

You should add _doApply=True argument when calling parent methods in classes inherited from batteries. Your set method should be following:

    @replicated
    def set(self, key, value):
        old_value = self.get(key)
        super().set(key, value, _doApply=True)
        self._my_callback(key, old_value, value)

Without _doApply=True methods tries to replicate and you get a recursion.

Also don't forget to override __setitem__ method, otherwise you won't get callback triggered at self.my_dict['test_key_2'] = 'test_value_2'

Sweet, that works now.
And yes, obviously I had forgotten to remove the direct key/value assignment from the simplified example, but doing it analogously obviously now works perfectly. Cheers.