Feature request: Addition of event queries to LedgerClient
Opened this issue · 0 comments
MissingNO57 commented
Prerequisites
- I checked the documentation and made sure this feature does not already exist.
- I checked the existing issues to make sure this feature has not already been requested.
- I have read the code of conduct before creating this issue.
Problem
Currently, there is no easy way to filter events, which can make it difficult for developers to find the specific events they need. This can be frustrating and time-consuming, especially when working with complex smart contracts that generate a large number of events. By adding event queries to the LedgerClient, developers would be able to more easily and efficiently query events from their contracts, saving time and reducing frustration.
We should implement some general high-level way of using GetTxsEventRequest with optional filters.
Feature / Solution
This implementation might help with adding feature:
from cosmpy.protos.cosmos.tx.v1beta1.service_pb2_grpc import ServiceStub as TxGrpcClient
import grpc
import certifi
from cosmpy.protos.cosmos.tx.v1beta1.service_pb2 import (
GetTxsEventRequest,
)
from typing import List, Optional, Dict
from cosmpy.protos.cosmos.base.query.v1beta1.pagination_pb2 import PageRequest
# Execute a query on the txs service
def _query_txs_events(txs,
queries: List[str]
) -> List[Dict]:
offset = 0
limit = 100
res = []
while True:
tx_res = txs.GetTxsEvent(
GetTxsEventRequest(
events=queries, pagination=PageRequest(offset=offset, limit=limit)
)
)
res.append(tx_res)
offset += len(tx_res.tx_responses)
if offset >= tx_res.pagination.total:
break
return res
# Query all events from a contract by filtering on contract address and wasm action name
def get_events(txs, contract_address, wasm_action: Optional[str] = None) -> List[Dict]:
queries = [
f"execute._contract_address='{contract_address}'", ]
if wasm_action is not None:
queries.append(f"wasm.action='{wasm_action}'")
return _query_txs_events(txs, queries)
def main():
url = "grpc-dorado.fetch.ai:443"
contract_address = "fetch16t3324s58rdm0djk5g4klkpty8xp3g5hyzjldk4sawktszg4j8rqh7rs98"
# Wasm action events to be filtered, None means all events
#wasm_action = "trigger_next_round"
wasm_action = None
# Connect to Dorado
with open(certifi.where(), "rb") as f:
trusted_certs = f.read()
credentials = grpc.ssl_channel_credentials(
root_certificates=trusted_certs
)
grpc_client = grpc.secure_channel(url, credentials)
txs = TxGrpcClient(grpc_client)
resp = get_events(txs,contract_address, wasm_action)
for tx_res in resp:
# List of all desired txs
for tx in tx_res.tx_responses:
print(f"TX {tx.txhash} events:")
# List of all events in each tx
for event in tx.events:
print(event)
if __name__ == "__main__":
main()
Alternatives
No response
Additional Context
No response