Question: how would i use DataReader.read_bytes()?
a-usr opened this issue · 2 comments
a-usr commented
Say have a DataReader set up from a buffer. And i want to read all bytes at once. so i feed the data reader an empty winsdk.system.array("B"). I know the buffer isnt empty. However it appears that read_bytes does not write to the array. Am I doing something wrong?
import asyncio
import PIL.Image as Image
import io
from winsdk.windows.media.control import GlobalSystemMediaTransportControlsSessionManager
from winsdk.windows.storage.streams import Buffer, InputStreamOptions, DataReader
import winsdk
from sys import argv, exit, stderr
THUMBNAIL_BUFFER_SIZE = 5 * 1024 * 1024
async def get_stream():
print("getting sessions")
sessions = await GlobalSystemMediaTransportControlsSessionManager.request_async()
print("getting playback")
playback_dict = await get_media_info(sessions.get_current_session())
try:
print("getting thumbnail ref")
r_a_Stream_ref: winsdk.windows.storage.streams.IRandomAccessStreamReference
r_a_Stream_ref = playback_dict["thumbnail"]
except KeyError:
print("None")
return 1
print("attempting to retrieve stream")
Async_operation = r_a_Stream_ref.open_read_async()
i = 0
while Async_operation.status == 0 and i <500:
if (i % 20) == 0:
print(i)
# I Wouldve used Async_operation.completed != True, however this throws a NotImplementedException
i+=1
#print(Async_operation.status) #this prints 0 indefinetly
if i == 500:
print("timeout")
return 1
print("retrieved stream")
buffer = Buffer(THUMBNAIL_BUFFER_SIZE)
readable_stream = Async_operation.get_results()
print("trying to read stream..")
await readable_stream.read_async(buffer, buffer.capacity, InputStreamOptions.READ_AHEAD)
#print(buffer)
buffer_reader = DataReader.from_buffer(buffer)
arr = winsdk.system.Array("B")
buffer_reader.read_bytes(arr)
print(arr.__len__())
byte_arr = bytearray(arr)
return byte_arr
async def main():
#print(byte_arr)
img = Image.open(io.BytesIO(await get_stream()))
img.save(".\\thumbnail.png", "PNG")
img.close()
return 0
def props_to_dict(props):
return {
attr: props.__getattribute__(attr) for attr in dir(props) if attr[0] != '_'
}
async def get_media_info(current_session):
if current_session:
media_props = await current_session.try_get_media_properties_async()
return {
song_attr: media_props.__getattribute__(song_attr)
for song_attr in dir(media_props)
if song_attr[0] != '_'
}
if __name__ == "__main__":
try:
result = asyncio.run(main())
except Exception as e:
if "debug" in argv:
print(e)
else:
print("err")
exit(1)
if result == 0:
print("ok")
else: exit(result)
a-usr commented
I am asking because IMO calling read_byte() for every byte i want to read takes way too long
dlech commented
It is best to avoid DataReader
and use Python APIs like the struct
module instead. Buffer
objects can be passed directly to anything in Python that uses the Python buffer protocol. So return bytearray(buffer)
would probably do what you want in this case.