change notification
Closed this issue · 5 comments
I'd like to have some kind of notification mechanism which I can tap into when synchronised objects are changing. I suppose the PipeNotifier
maybe something that does this. Unfortunately there is no documentation or an example available for it.
If the PipeNotifier
is not the right way, how would one be able to trigger a mechanism (e.g. a callback function) in case a particular synchronised object managed by the SyncObject
is updated? Maybe an example for this (I presume relatively common) use case would be good to provide, too.
The easiest way is to add callbacks manually:
class MyCounter(SyncObj):
def __init__(self, myCallback):
self.__myCallback = myCallback # you should store it before calling super
super(MyCounter, self).__init__('serverA:4321', ['serverB:4321', 'serverC:4321'])
self.__counter = 0
@replicated
def incCounter(self):
self.__counter += 1
self.__myCallback()
def getCounter(self):
return self.__counter
myCallback
will be called on each node each time incCounter
is called. Keep in mind that callback will be called from separate thread, so you should use a mutex if your callback modifies some global state.
+1
Thanks for the swift response. Very helpful!
I've just tried it, and come to experience some odd phenomena:
- There seems to be an endless loop, that keeps syncing, and setting the new value on all nodes over and over again, starting from the same old value (seems like the new value is never set).
- The second
set
on theMyReplDict
is never propagated (or never shows up in the callback).
Here's the simple sample code I've used to test it (using Python 3.6).
Note: I've also tried the same with a global callback function rather than a method on a different class with the same result.
"""
Start in three terminals with the following, just using different ports:
$ python pysyncobj_trial_callback.py localhost:4321
"""
import sys
from pysyncobj import SyncObj
from pysyncobj.batteries import ReplDict, replicated
HOSTS = ['localhost:4321', 'localhost:4322', 'localhost:4323']
class MyReplDict(ReplDict):
def __init__(self, my_callback):
# Callback needs to be stored before calling super.
self._my_callback = my_callback
super().__init__()
@replicated
def set(self, key, value):
old_value = self.get(key)
super().set(key, value)
self._my_callback(key, old_value, value)
class SyncerTest:
def __init__(self, me):
self.me = me
self.others = HOSTS.copy()
del self.others[HOSTS.index(me)]
self.my_dict = MyReplDict(self.on_set_cb)
self.sync_obj = SyncObj(self.me, self.others, consumers=[self.my_dict])
self.update_count = 0
def on_set_cb(self, key, old_value, new_value):
self.update_count += 1
print('key: {}, old_value: {}, new_value: {}, count: {}'
.format(key, old_value, new_value, self.update_count))
def do_stuff(self):
print('*** Sync set.')
self.my_dict.set('test_key_1', 'test_value_1', sync=True)
print('*** Async set.')
# The same as previous, but asynchronous (non-blocking).
self.my_dict['test_key_2'] = 'test_value_2'
print('*** Stuff done.')
def main():
me = sys.argv[1]
my_syncer = SyncerTest(me)
if me == 'localhost:4323':
my_syncer.do_stuff()
print('Exit with ENTER key press.'))
input()
if __name__ == '__main__':
main()
You should add _doApply=True
argument when calling parent methods in classes inherited from batteries. Your set method should be following:
@replicated
def set(self, key, value):
old_value = self.get(key)
super().set(key, value, _doApply=True)
self._my_callback(key, old_value, value)
Without _doApply=True
methods tries to replicate and you get a recursion.
Also don't forget to override __setitem__
method, otherwise you won't get callback triggered at self.my_dict['test_key_2'] = 'test_value_2'
Sweet, that works now.
And yes, obviously I had forgotten to remove the direct key/value assignment from the simplified example, but doing it analogously obviously now works perfectly. Cheers.