tarb/betfair_data

Add streaming_update / streaming_unique_id to Market

liampauling opened this issue · 16 comments

Is it possible to add the raw streaming_update to the Market object?

>> market.streaming_update
{"id":"1.196641872","rc":[{"atb":[[1.11,0]],"id":40849650},{"atb":[[1.11,0]],"id":40550684},{"atb":[[1.13,1.99]],"id":40570484}]}

Would also be nice to be able to add a unique id to the file, not sure how this should be done or if it would be possible to set it after created:

market.streaming_unique_id = 123

In reference to #1 flumine uses the two fields above for optimisation on the simulation logic.

tarb commented

So the market.streaming_unique_id = 123 should be easy as it looks to be just a getter/setter of an int. I quite like the objects being immutable so adding a field into the File constructor feels like a better approach (or maybe a lambda, that can return the value if the value is more complicated to figure out), then having a setter.
Also in the recorded files json, theres a field _stream_id on the market Mc object, its currently just ignored but this could used as the value if desired.

With regards to market.streaming_update this would be a little harder. Im assuming its a dict of the update that was just serialised?
Creating and using this dict each update would, I expect, have a pretty significant slowdown on parsing speed, as you would be creating and allocating a lot of new values that I've gone to a bit of care to avoid making.
It would be interesting to know how you use this optimisation to see if theres possibly a faster way of getting the same result without paying for all these allocations.
For example, if it's to act as a quick way to know what values have changed from the last update we could try a different approach comparing object references. For example a list of RunnerBooks, each runner object would have the same reference as the previous update, unless it was updated, where a new RunnerBook object would have been created. Then each value (atb, atl, etc) in that RunnerBook should have the same reference, unless updated, etc. It might sound like a bit of work to check, but I would expect this to be significantly faster, then creating the python objects that reflect the updated values.

Happy to work through the best approach here, even if thats to

  1. implement the streaming_update field as is and see how it affects performance.
  2. If significant, switch to faster optimisation strategy.

For the streaming_unique_id we can change things so it's not required but it's a handy flag in flumine to know if a strategy should receive a market update. Agreed on the object being immutable as it prevents issues but a setter would be good although the id doesn't change once the file is processed.

This makes sense, for context flumine uses the raw update to see what runners have been updated via the rc field, this is the code:

    @staticmethod
    def _process_streaming_update(market_book) -> list:
        # return list of runners that have been updated
        update = market_book.streaming_update
        if update.get("img") or update.get("marketDefinition"):
            return [runner.selection_id for runner in market_book.runners]
        else:
            return [runner["id"] for runner in update.get("rc", [])]

Agree with what you are saying, a flag in the RunnerBook to signal an update would be the ideal scenario.

Just looking at the id's of the objects can you confirm that a new object is only created when there has been an update? It looks as though this is the case which means flumine can cache the id and work off that for when there has been an update.

tarb commented

Yea new objects are only created on update and references to the previous objects wont be dropped until after creation of the next object so each update will have a unique id when compared to the one previous.
I don't know how python pools or reuses objects so I think all you could safely use it for is to detect whether an object has changed from the previous - but thats all we'd need for this case.
You can observe it with something like this:

print({[ id(r[0]) != id(r[1]) for r in zip_longest(market_book.runners, previous.runners) ]})

For streaming_unique_id, I would happily add an optional field into the File constructor, that just propagates down to each market_book, seems like it would be the neatest.

bflw.File(file_name, bytes, cumulative_runner_tv=True, streaming_unique_id=123)

Would you want to do anything with the streaming_id on the recorded json files or just keep ignoring it and use the value from the constructor?

That looks good, yeah stream_id isn't used for anything in bflw or flumine.

tarb commented

I just implemented #5 (simplifying Files) and was wondering how you thought implementing streaming_unique_id for files should be done. Am I correct that every File would have its own unique id?
Im thinking I can either

  1. auto-increment the id, maybe providing initial value
  2. take a lambda and delegate responsibility to the caller (could be needlessly messy if its just auto incremented anyway)

Ideally yes (id per file) although flumine wants to set it, is it easier to create a setter so it can be added to the MarketBook once created?

tarb commented

Would it be on the MarketBook object, or on the bflw.File object (or both).

Maybe I need to clean up the code, this is how I currently have betfair_data integrated, we process one file at a time so it looks a bit hacky, I assume I should be using File instead?

    def _read_loop(self):
        paths = [self.file_path]
        files = betfair_data.Files(paths)
        for file in files.bflw():
            for update in file:
                yield update

In which case ideal scenario is something like this

    def _read_loop(self):
        files = betfair_data.File(
            self.file_path, 
            cumulative_runner_tv=False, 
            streaming_unique_id=self.stream_id
        )
        for file in files.bflw():
            for update in file:
                assert update[0].streaming_unique_id == self.stream_id
                yield update
tarb commented

Is _read_loop handling multiple files or just 1 at a time?

For 1 at a time, youre probably best just using bflw.File(...), but you'll need to open and read the bytes of the file yourself and handle any decompressing.
If its reading multiple paths, then using Files could simplify things a lot, but would lock you in to only being able to open what Files can open (gz, bz2, uncompressed and tars/zips containing such) - which is probably too limiting.

tarb commented

this is the interface im thinking, (still not sure about passing the stream_id into Files)

class File(Iterator[Sequence[MarketBook]]):
    file_name: str
    stream_unique_id: Optional[int]
    
    def __init__(self, path: str, bytes: bytes, cumulative_runner_tv: bool = True, streaming_unique_id: Optional[int] = None) -> None: ...

class Files(Iterator[File]):
    """"""
    def __init__(self, paths: Sequence[str], cumulative_runner_tv: bool = True, streaming_unique_id: Optional[int] = None) -> None: ...

but I can add the id to the MarketBook object as well if you want, but that would expose it as an accessible field in flumine (and etc) which you might not want

Looks good, I see you have now added streaming_unique_id but I don't think it's fully implemented yet?

tarb commented

Yep, didn't realise that it also goes on the MarketBook, currently just on the File. Theres am update incoming

tarb commented

Added streaming_unique_id onto MarketBook in ab1abe6

Thanks, sorry I wasn't very clear

Perfect