Cannot abstract out a subscription without an async context manager.
ingenium21 opened this issue · 3 comments
Hi devs,
this is is more a cry for help than an actual issue but I was wondering if you could help me figure out how to get this working.
I am trying to put this in a Device class so that I can initialize different devices and concurrently subscribe to all of them with a callHistory event so that they send me their latest call info and I can send it to a log analysis tool.
This is my code; currently it does work, but as you'll see, the connect() function is doing a lot of heavy lifting, and only works as an asynchronous content manager.
#!/usr/bin/env python
import xows
import asyncio
from dotenv import load_dotenv
import os
class Device:
"""A Class to manage the device"""
def __init__(self, name, ip_address, username, password, log_path):
"""initializes the devices"""
self.name = name
self.ip_address = ip_address
self.username = username
self.password = password
self.log_path = log_path
def append_to_log(self, ce_host, logPath, output, command):
"""This function is primarily used to append the output to the log file"""
filename = f"{logPath}{ce_host}_{command}.log"
if os.path.exists(filename):
with open(filename, 'a', encoding='utf-8') as fn:
fn.write(output)
else:
with open(filename, 'w', encoding='utf-8') as fn:
fn.write(output)
async def connect(self):
"""connects to the device using websockets"""
async with xows.XoWSClient(self.ip_address,username=self.username, password=self.password) as client:
async def callback(data, id_):
if id_ == 1:
print("new call was made")
print("=============================")
print("getting latest call")
call_history = await client.xCommand(['CallHistory','Get'], Limit=1, detaillevel='full')
call_history = call_history['Entry']
call_history.reverse()
append_to_log(ce_host, log_path, call_history, 'CallHistory')
print(f'Feedback(Id {id_}): {data}')
print('Subscription volume:')
volume_id = await client.subscribe(['Status', 'Audio', 'Volume'], callback, True)
print("Subscription callHistory Event:")
callHistory_id = await client.subscribe(['Event', 'CallHistory'], callback, True)
# print("Subscription Configuration")
# configuration_id = await client.subscribe(['Configuration'], callback, True)
await client.wait_until_closed()
async def main():
load_dotenv()
name = "device1"
ip_address = os.getenv('CE_HOST')
username = os.getenv('CE_USER')
password = os.getenv('CE_PASS')
log_path = os.getenv('LOG_PATH')
dev1 = Device(name=name, ip_address=ip_address, username=username, password=password, log_path=log_path)
await dev1.connect()
if __name__ == "__main__":
asyncio.run(main())
Here is the code I would like to work. It would be cool if I could get a self.client to initialize and then run self.client.connect() to open a connection. However, it seems that after that connect() function runs, it auto closes.
Any help would be greatly appreciated!
#!/usr/bin/env python
import xows
import asyncio
from dotenv import load_dotenv
import os
class Device:
"""A Class to manage the device"""
def __init__(self, name, ip_address, username, password, log_path):
"""initializes the devices"""
self.name = name
self.ip_address = ip_address
self.username = username
self.password = password
self.log_path = log_path
self.client = xows.XoWSClient(self.ip_address,username=self.username, password=self.password)
def append_to_log(self, ce_host, logPath, output, command):
"""This function is primarily used to append the output to the log file"""
filename = f"{logPath}{ce_host}_{command}.log"
if os.path.exists(filename):
with open(filename, 'a', encoding='utf-8') as fn:
fn.write(output)
else:
with open(filename, 'w', encoding='utf-8') as fn:
fn.write(output)
def connect(self):
"""Connects to the device using websockets"""
self.client.connect()
def disconnect(self):
self.client.disconnect()
async def get_call_history(self, client, limit=1):
"""Gets the call history.
Takes a limit variable"""
call_history = await client.xCommand(['CallHistory','Get'], Limit=1, detaillevel='full')
return call_history
async def set_call_history_subscription(self, client):
"""Creates a callHistory event subscription"""
await client.subscribe(['Event', 'CallHistory'], self.callback, True)
async def set_volume_subscription(self):
volume_id = await self.subscribe(['Status', 'Audio', 'Volume'], self.callback, True)
await volume_id
async def callback(self, client, data, id_):
if id == 0:
print("new call was made")
print("=============================")
print("getting latest call")
call_history = client.get_call_history()
print(call_history)
return call_history
def main():
load_dotenv()
name = "device1"
ip_address = os.getenv('CE_HOST')
username = os.getenv('CE_USER')
password = os.getenv('CE_PASS')
log_path = os.getenv('LOG_PATH')
dev1 = Device(name=name, ip_address=ip_address, username=username, password=password, log_path=log_path)
dev1.connect()
if __name__ == "__main__":
asyncio.run(main())
Your missing await client.wait_until_closed()
is essential.
You either need to do that in your connect
or return an async up to your main
so that you can actually block somewhere while program runs.
Now, once dev1.connect()
has ben run, it will simply exit the application, as no more code is waiting to run to completion.
Thank you! I'll try to get it working this weekend and will let you know how it goes.
Is there a specific place where I should place that await client.wait_until_closed()
?
I placed it here:
async def connect(self):
"""Connects to the device using websockets"""
self.client.connect()
await self.client.wait_until_closed()
and received the same error.