Comparing sensitive data, confidential files or internal emails?

Most legal and privacy policies prohibit uploading sensitive data online. Diffchecker Desktop ensures your confidential information never leaves your computer. Work offline and compare documents securely.

client_vs_client_async

Created Diff never expires
26 removals
178 lines
93 additions
220 lines
""""
""""
Basic Dune Client Class responsible for refreshing Dune Queries
Async Dune Client Class responsible for refreshing Dune Queries
Framework built on Dune's API Documentation
Framework built on Dune's API Documentation
https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a
https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a
"""
"""
from __future__ import annotations
from __future__ import annotations


import time
import asyncio
from io import BytesIO
from io import BytesIO
from typing import Any
from typing import Any, Optional


import requests
from aiohttp import (
from requests import Response, JSONDecodeError
ClientSession,
ClientResponse,
ContentTypeError,
TCPConnector,
ClientTimeout,
)


from dune_client.base_client import BaseDuneClient
from dune_client.base_client import BaseDuneClient
from dune_client.interface import DuneInterface
from dune_client.models import (
from dune_client.models import (
ExecutionResponse,
ExecutionResponse,
ExecutionResultCSV,
ExecutionResultCSV,
DuneError,
DuneError,
QueryFailed,
QueryFailed,
ExecutionStatusResponse,
ExecutionStatusResponse,
ResultsResponse,
ResultsResponse,
ExecutionState,
ExecutionState,
)
)


from dune_client.query import Query
from dune_client.query import Query




class DuneClient(DuneInterface, BaseDuneClient):
# pylint: disable=duplicate-code
class AsyncDuneClient(BaseDuneClient):
"""
"""
An interface for Dune API with a few convenience methods
An asynchronous interface for Dune API with a few convenience methods
combining the use of endpoints (e.g. refresh)
combining the use of endpoints (e.g. refresh)
"""
"""


def _handle_response(
_connection_limit = 3
self,

response: Response,
def __init__(self, api_key: str, connection_limit: int = 3):
) -> Any:
"""
api_key - Dune API key
connection_limit - number of parallel requests to execute.
For non-pro accounts Dune allows only up to 3 requests but that number can be increased.
"""
super().__init__(api_key=api_key)
self._connection_limit = connection_limit
self._session: Optional[ClientSession] = None

async def _create_session(self) -> ClientSession:
conn = TCPConnector(limit=self._connection_limit)
return ClientSession(
connector=conn,
base_url=self.BASE_URL,
timeout=ClientTimeout(total=self.DEFAULT_TIMEOUT),
)

async def connect(self) -> None:
"""Opens a client session (can be used instead of async with)"""
self._session = await self._create_session()

async def disconnect(self) -> None:
"""Closes client session"""
if self._session:
await self._session.close()

async def __aenter__(self) -> AsyncDuneClient:
self._session = await self._create_session()
return self

async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.disconnect()

async def _handle_response(self, response: ClientResponse) -> Any:
try:
try:
# Some responses can be decoded and converted to DuneErrors
# Some responses can be decoded and converted to DuneErrors
response_json = response.json()
response_json = await response.json()
self.logger.debug(f"received response {response_json}")
self.logger.debug(f"received response {response_json}")
return response_json
return response_json
except JSONDecodeError as err:
except ContentTypeError as err:
# Others can't. Only raise HTTP error for not decodable errors
# Others can't. Only raise HTTP error for not decodable errors
response.raise_for_status()
response.raise_for_status()
raise ValueError("Unreachable since previous line raises") from err
raise ValueError("Unreachable since previous line raises") from err


def _route_url(self, route: str) -> str:
def _route_url(self, route: str) -> str:
return f"{self.BASE_URL}{self.API_PATH}{route}"
return f"{self.API_PATH}{route}"


def _get(self, route: str, raw: bool = False) -> Any:
async def _get(self, route: str, raw: bool = False) -> Any:
url = self._route_url(route)
url = self._route_url(route)
if self._session is None:
raise ValueError("Client is not connected; call `await cl.connect()`")
self.logger.debug(f"GET received input url={url}")
self.logger.debug(f"GET received input url={url}")
response = requests.get(
response = await self._session.get(
url=url,
url=url,
headers=self.default_headers(),
headers=self.default_headers(),
timeout=self.DEFAULT_TIMEOUT,
)
)
if raw:
if raw:
return response
return response
return self._handle_response(response)
return await self._handle_response(response)


def _post(self, route: str, params: Any) -> Any:
async def _post(self, route: str, params: Any) -> Any:
url = self._route_url(route)
url = self._route_url(route)
if self._session is None:
raise ValueError("Client is not connected; call `await cl.connect()`")
self.logger.debug(f"POST received input url={url}, params={params}")
self.logger.debug(f"POST received input url={url}, params={params}")
response = requests.post(
response = await self._session.post(
url=url,
url=url,
json=params,
json=params,
headers=self.default_headers(),
headers=self.default_headers(),
timeout=self.DEFAULT_TIMEOUT,
)
)
return self._handle_response(response)
return await self._handle_response(response)


def execute(self, query: Query) -> ExecutionResponse:
async def execute(self, query: Query) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
"""Post's to Dune API for execute `query`"""
response_json = self._post(
response_json = await self._post(
route=f"/query/{query.query_id}/execute", params=query.request_format()
route=f"/query/{query.query_id}/execute", params=query.request_format()
)
)
try:
try:
return ExecutionResponse.from_dict(response_json)
return ExecutionResponse.from_dict(response_json)
except KeyError as err:
except KeyError as err:
raise DuneError(response_json, "ExecutionResponse", err) from err
raise DuneError(response_json, "ExecutionResponse", err) from err


def get_status(self, job_id: str) -> ExecutionStatusResponse:
async def get_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/status")
response_json = await self._get(route=f"/execution/{job_id}/status")
try:
try:
return ExecutionStatusResponse.from_dict(response_json)
return ExecutionStatusResponse.from_dict(response_json)
except KeyError as err:
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err
raise DuneError(response_json, "ExecutionStatusResponse", err) from err


def get_result(self, job_id: str) -> ResultsResponse:
async def get_result(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/results")
response_json = await self._get(route=f"/execution/{job_id}/results")
try:
try:
return ResultsResponse.from_dict(response_json)
return ResultsResponse.from_dict(response_json)
except KeyError as err:
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err
raise DuneError(response_json, "ResultsResponse", err) from err


def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
async def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
"""
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)


this API only returns the raw data in CSV format, it is faster & lighterweight
this API only returns the raw data in CSV format, it is faster & lighterweight
use this method for large results where you want lower CPU and memory overhead
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
if you need metadata information use get_results() or get_status()
"""
"""
route = f"/execution/{job_id}/results/csv"
route = f"/execution/{job_id}/results/csv"
url = self._route_url(f"/execution/{job_id}/results/csv")
url = self._route_url(f"/execution/{job_id}/results/csv")
self.logger.debug(f"GET CSV received input url={url}")
self.logger.debug(f"GET CSV received input url={url}")
response = self._get(route=route, raw=True)
response = await self._get(route=route, raw=True)
response.raise_for_status()
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(response.content))
return ExecutionResultCSV(data=BytesIO(await response.content.read(-1)))


def cancel_execution(self, job_id: str) -> bool:
async def cancel_execution(self, job_id: str) -> bool:
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
response_json = self._post(
response_json = await self._post(
route=f"/execution/{job_id}/cancel",
route=f"/execution/{job_id}/cancel",
params=None,
params=None,
)
)
try:
try:
# No need to make a dataclass for this since it's just a boolean.
# No need to make a dataclass for this since it's just a boolean.
success: bool = response_json["success"]
success: bool = response_json["success"]
return success
return success
except KeyError as err:
except KeyError as err:
raise DuneError(response_json, "CancellationResponse", err) from err
raise DuneError(response_json, "CancellationResponse", err) from err


def _refresh(self, query: Query, ping_frequency: int = 5) -> str:
async def _refresh(self, query: Query, ping_frequency: int = 5) -> str:
job_id = self.execute(query).execution_id
job_id = (await self.execute(query)).execution_id
status = self.get_status(job_id)
status = await self.get_status(job_id)
while status.state not in ExecutionState.terminal_states():
while status.state not in ExecutionState.terminal_states():
self.logger.info(
self.logger.info(
f"waiting for query execution {job_id} to complete: {status}"
f"waiting for query execution {job_id} to complete: {status}"
)
)
time.sleep(ping_frequency)
await asyncio.sleep(ping_frequency)
status = self.get_status(job_id)
status = await self.get_status(job_id)
if status.state == ExecutionState.FAILED:
if status.state == ExecutionState.FAILED:
self.logger.error(status)
self.logger.error(status)
raise QueryFailed(f"{status}. Perhaps your query took too long to run!")
raise QueryFailed(f"{status}. Perhaps your query took too long to run!")


return job_id
return job_id


def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsResponse:
async def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsResponse:
"""
"""
Executes a Dune `query`, waits until execution completes,
Executes a Dune `query`, waits until execution completes,
fetches and returns the results.
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
Sleeps `ping_frequency` seconds between each status request.
"""
"""
job_id = self._refresh(query, ping_frequency=ping_frequency)
job_id = await self._refresh(query, ping_frequency=ping_frequency)
return self.get_result(job_id)
return await self.get_result(job_id)


def refresh_csv(self, query: Query, ping_frequency: int = 5) -> ExecutionResultCSV:
async def refresh_csv(
self, query: Query, ping_frequency: int = 5
) -> ExecutionResultCSV:
"""
"""
Executes a Dune query, waits till execution completes,
Executes a Dune query, waits till execution completes,
fetches and the results in CSV format
fetches and the results in CSV format
(use it load the data directly in pandas.from_csv() or similar frameworks)
(use it load the data directly in pandas.from_csv() or similar frameworks)
"""
"""
job_id = self._refresh(query, ping_frequency=ping_frequency)
job_id = await self._refresh(query, ping_frequency=ping_frequency)
return self.get_result_csv(job_id)
return await self.get_result_csv(job_id)


def refresh_into_dataframe(self, query: Query) -> Any:
async def refresh_into_dataframe(self, query: Query) -> Any:
"""
"""
Execute a Dune Query, waits till execution completes,
Execute a Dune Query, waits till execution completes,
fetched and returns the result as a Pandas DataFrame
fetched and returns the result as a Pandas DataFrame


This is a convenience method that uses refresh_csv underneath
This is a convenience method that uses refresh_csv underneath
"""
"""
try:
try:
import pandas # type: ignore # pylint: disable=import-outside-toplevel
import pandas # type: ignore # pylint: disable=import-outside-toplevel
except ImportError as exc:
except ImportError as exc:
raise ImportError(
raise ImportError(
"dependency failure, pandas is required but missing"
"dependency failure, pandas is required but missing"
) from exc
) from exc
data = self.refresh_csv(query).data
data = (await self.refresh_csv(query)).data
return pandas.read_csv(data)
return pandas.read_csv(data)