Request: Support stream commands (play, pause, next etc)
Chreece opened this issue · 1 comments
Chreece commented
Is it possible to add support for these functions?
Chreece commented
I'm no programmer, don't kill me please...
Just wanted to see how good is chatgpt:
stream.py:
"""Snapcast stream."""
import requests
import json
class Snapstream():
"""Represents a snapcast stream."""
def __init__(self, data, host, port=1705):
"""Initialize."""
self.update(data)
self._callback_func = None
self.host = host
self.port = port
self.url = f"http://{self.host}:{self.port}/jsonrpc"
@property
def identifier(self):
"""Get stream id."""
return self._stream.get('id')
@property
def status(self):
"""Get stream status."""
return self._stream.get('status')
@property
def name(self):
"""Get stream name."""
return self._stream.get('uri').get('query').get('name')
@property
def friendly_name(self):
"""Get friendly name."""
return self.name if self.name != '' else self.identifier
@property
def metadata(self):
"""Get metadata."""
if 'properties' in self._stream:
return self._stream['properties'].get('metadata')
return self._stream.get('meta')
@property
def meta(self):
"""Get metadata. Deprecated."""
return self.metadata
@property
def properties(self):
"""Get properties."""
return self._stream.get('properties')
@property
def path(self):
"""Get stream path."""
return self._stream.get('uri').get('path')
def update(self, data):
"""Update stream."""
self._stream = data
def update_meta(self, data):
"""Update stream metadata."""
self.update_metadata(data)
def update_metadata(self, data):
"""Update stream metadata."""
if 'properties' in self._stream:
self._stream['properties']['metadata'] = data
self._stream['meta'] = data
def update_properties(self, data):
"""Update stream properties."""
self._stream['properties'] = data
def __repr__(self):
"""Return string representation."""
return f'Snapstream ({self.name})'
def callback(self):
"""Run callback."""
if self._callback_func and callable(self._callback_func):
self._callback_func(self)
def set_callback(self, func):
"""Set callback."""
self._callback_func = func
def _send_command(self, method, params=None):
headers = {'Content-Type': 'application/json'}
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params if params else [],
"id": 1
}
response = requests.post(self.url, headers=headers, data=json.dumps(payload))
return response.json()
def play(self):
"""Play the stream."""
return self._send_command("StreamControl.Play", [self.identifier])
def pause(self):
"""Pause the stream."""
return self._send_command("StreamControl.Pause", [self.identifier])
def next(self):
"""Next track in the stream."""
return self._send_command("StreamControl.Next", [self.identifier])
def previous(self):
"""Previous track in the stream."""
return self._send_command("StreamControl.Previous", [self.identifier])
def shuffle(self, enable=True):
"""Enable or disable shuffle."""
return self._send_command("StreamControl.Shuffle", [self.identifier, enable])
def repeat(self, mode):
"""Set repeat mode."""
# mode should be one of 'off', 'one', 'all'
return self._send_command("StreamControl.Repeat", [self.identifier, mode])
if __name__ == "__main__":
# Example usage
sample_data = {
'id': 'example_stream_id',
'status': 'playing',
'uri': {
'query': {
'name': 'Example Stream'
},
'path': '/example/path'
},
'properties': {
'metadata': 'Example Metadata'
},
'meta': 'Example Meta'
}
host = 'localhost'
stream = Snapstream(sample_data, host)
# Play the stream
play_response = stream.play()
print("Play response:", play_response)
# Pause the stream
pause_response = stream.pause()
print("Pause response:", pause_response)
# Next track
next_response = stream.next()
print("Next response:", next_response)
# Previous track
previous_response = stream.previous()
print("Previous response:", previous_response)
# Enable shuffle
shuffle_response = stream.shuffle(True)
print("Shuffle response:", shuffle_response)
# Set repeat mode to 'all'
repeat_response = stream.repeat('all')
print("Repeat response:", repeat_response)
init.py:
"""Snapcast control for Snapcast 0.11.1."""
from snapcast.control.server import Snapserver, CONTROL_PORT
from snapcast.control.stream import Snapstream # Ensure you import the updated Snapstream class
async def create_server(loop, host, port=CONTROL_PORT, reconnect=False):
"""Server factory."""
server = Snapserver(loop, host, port, reconnect)
await server.start()
return server
class ExtendedSnapserver(Snapserver):
"""Extended Snapserver with stream control."""
def __init__(self, loop, host, port=CONTROL_PORT, reconnect=False):
super().__init__(loop, host, port, reconnect)
self.streams = {}
async def start(self):
"""Start the server and initialize streams."""
await super().start()
# Initialize Snapstream instances for each stream
for stream_id, stream_data in self.streams.items():
self.streams[stream_id] = Snapstream(stream_data, self.host, self.port)
def add_stream(self, stream_data):
"""Add a stream to the server."""
stream_id = stream_data['id']
self.streams[stream_id] = Snapstream(stream_data, self.host, self.port)
def play_stream(self, stream_id):
"""Play a stream."""
if stream_id in self.streams:
return self.streams[stream_id].play()
def pause_stream(self, stream_id):
"""Pause a stream."""
if stream_id in self.streams:
return self.streams[stream_id].pause()
def next_stream(self, stream_id):
"""Next track in the stream."""
if stream_id in self.streams:
return self.streams[stream_id].next()
def previous_stream(self, stream_id):
"""Previous track in the stream."""
if stream_id in self.streams:
return self.streams[stream_id].previous()
def shuffle_stream(self, stream_id, enable=True):
"""Enable or disable shuffle for the stream."""
if stream_id in self.streams:
return self.streams[stream_id].shuffle(enable)
def repeat_stream(self, stream_id, mode):
"""Set repeat mode for the stream."""
if stream_id in self.streams:
return self.streams[stream_id].repeat(mode)
# Example usage of ExtendedSnapserver
async def main():
loop = asyncio.get_event_loop()
server = ExtendedSnapserver(loop, 'localhost')
await server.start()
# Add a sample stream
sample_stream_data = {
'id': 'example_stream_id',
'status': 'playing',
'uri': {
'query': {
'name': 'Example Stream'
},
'path': '/example/path'
},
'properties': {
'metadata': 'Example Metadata'
},
'meta': 'Example Meta'
}
server.add_stream(sample_stream_data)
# Play the stream
play_response = server.play_stream('example_stream_id')
print("Play response:", play_response)
# Pause the stream
pause_response = server.pause_stream('example_stream_id')
print("Pause response:", pause_response)
# Next track
next_response = server.next_stream('example_stream_id')
print("Next response:", next_response)
# Previous track
previous_response = server.previous_stream('example_stream_id')
print("Previous response:", previous_response)
# Enable shuffle
shuffle_response = server.shuffle_stream('example_stream_id', True)
print("Shuffle response:", shuffle_response)
# Set repeat mode to 'all'
repeat_response = server.repeat_stream('example_stream_id', 'all')
print("Repeat response:", repeat_response)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Can someone use them?