Source code for oceanbolt.sdk.data.trade_flows

from oceanbolt.sdk.client import APIClient
from oceanbolt.sdk.helpers import (validate, pb_list_to_pandas, pb_timeseries_to_pandas)


[docs]class TradeFlows: """ The ``TradeFlows`` returns list of historical voyages. """ RESOURCE_NAME = "tradeflows/listflows" def __init__(self, client: APIClient): self.client = client._trade_flows_client() self.metadata = client.metadata
[docs] def get(self, **kwargs): """Retrieves trade flow data as a pandas.DataFrame""" kwargs = validate(kwargs) return pb_list_to_pandas(self.client.get_trade_flows(request=kwargs, metadata=self.metadata).data)
[docs]class TradeFlowTimeseries: """The ``TradeFlowsTimeseries`` returns list of historical voyages.""" RESOURCE_NAME = "tradeflows/timeseries" def __init__(self, client: APIClient): self.client = client._trade_flows_client() self.metadata = client.metadata
[docs] def get(self, **kwargs): """Retrieves timeseries data as a pandas.DataFrame""" kwargs = validate(kwargs) return pb_timeseries_to_pandas(self.client.get_trade_flow_timeseries(request=kwargs, metadata=self.metadata).timeseries)