pywinrt/python-winsdk

Question: how would i use DataReader.read_bytes()?

a-usr opened this issue · 2 comments

a-usr commented

Say have a DataReader set up from a buffer. And i want to read all bytes at once. so i feed the data reader an empty winsdk.system.array("B"). I know the buffer isnt empty. However it appears that read_bytes does not write to the array. Am I doing something wrong?

import asyncio
import PIL.Image as Image
import io
from winsdk.windows.media.control import GlobalSystemMediaTransportControlsSessionManager
from winsdk.windows.storage.streams import Buffer, InputStreamOptions, DataReader
import winsdk
from sys import argv, exit, stderr
THUMBNAIL_BUFFER_SIZE = 5 * 1024 * 1024
async def get_stream():
    print("getting sessions")
    sessions = await GlobalSystemMediaTransportControlsSessionManager.request_async()
    print("getting playback")
    playback_dict = await get_media_info(sessions.get_current_session())
    try:
        print("getting thumbnail ref")
        r_a_Stream_ref: winsdk.windows.storage.streams.IRandomAccessStreamReference
        r_a_Stream_ref = playback_dict["thumbnail"]
    except KeyError:
        print("None")
        return 1
    print("attempting to retrieve stream")
    Async_operation = r_a_Stream_ref.open_read_async()
    i = 0
    while Async_operation.status == 0 and i <500:
        if (i % 20) == 0:
            print(i)
        # I Wouldve used Async_operation.completed != True, however this throws a NotImplementedException
        i+=1
        #print(Async_operation.status) #this prints 0 indefinetly
    if i == 500:
        print("timeout")
        return 1
    print("retrieved stream")
    buffer = Buffer(THUMBNAIL_BUFFER_SIZE)
    readable_stream = Async_operation.get_results()
    print("trying to read stream..")
    await readable_stream.read_async(buffer, buffer.capacity, InputStreamOptions.READ_AHEAD)
    #print(buffer)
    buffer_reader = DataReader.from_buffer(buffer)
    arr = winsdk.system.Array("B")
    buffer_reader.read_bytes(arr)
    print(arr.__len__())
    byte_arr = bytearray(arr)
    return byte_arr
    
async def main():
    #print(byte_arr)
    img = Image.open(io.BytesIO(await get_stream()))
    img.save(".\\thumbnail.png", "PNG")
    img.close()
    return 0
    
def props_to_dict(props):
    return {
        attr: props.__getattribute__(attr) for attr in dir(props) if attr[0] != '_'
    }
async def get_media_info(current_session):

    if current_session:
        media_props = await current_session.try_get_media_properties_async()
        return {
            song_attr: media_props.__getattribute__(song_attr)
            for song_attr in dir(media_props)
            if song_attr[0] != '_'
        }
if __name__ == "__main__":
    try:
        result = asyncio.run(main())
    except Exception as e:
        if "debug" in argv:
            print(e)
        else: 
            print("err")
            exit(1)
    if result == 0:
        print("ok")
    else: exit(result)
a-usr commented

I am asking because IMO calling read_byte() for every byte i want to read takes way too long

dlech commented

It is best to avoid DataReader and use Python APIs like the struct module instead. Buffer objects can be passed directly to anything in Python that uses the Python buffer protocol. So return bytearray(buffer) would probably do what you want in this case.