# -*- coding: utf-8 -*-

# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code

import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheByTimestamp
import hashlib
from ccxt.base.types import Any, Int, Market, OrderBook, Strings, Ticker, Tickers, FundingRate, FundingRates, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import AuthenticationError
from ccxt.base.errors import NotSupported


class coinbaseinternational(ccxt.async_support.coinbaseinternational):

    def describe(self) -> Any:
        return self.deep_extend(super(coinbaseinternational, self).describe(), {
            'has': {
                'ws': True,
                'watchTrades': True,
                'watchTradesForSymbols': True,
                'watchOrderBook': True,
                'watchOrderBookForSymbols': True,
                'watchTicker': True,
                'watchBalance': False,
                'watchMyTrades': False,
                'watchOHLCV': True,
                'watchOHLCVForSymbols': False,
                'watchOrders': False,
                'watchOrdersForSymbols': False,
                'watchPositions': False,
                'watchTickers': True,
                'createOrderWs': False,
                'editOrderWs': False,
                'cancelOrderWs': False,
                'cancelOrdersWs': False,
                'cancelAllOrdersWs': False,
                'fetchOrderWs': False,
                'fetchOrdersWs': False,
                'fetchBalanceWs': False,
                'fetchMyTradesWs': False,
            },
            'urls': {
                'api': {
                    'ws': 'wss://ws-md.international.coinbase.com',
                },
                'test': {
                    'ws': 'wss://ws-md.n5e2.coinbase.com',
                },
            },
            'options': {
                'watchTicker': {
                    'channel': 'LEVEL1',  # 'INSTRUMENTS' or 'RISK'
                },
                'tradesLimit': 1000,
                'ordersLimit': 1000,
                'myTradesLimit': 1000,
                'timeframes': {
                    '1m': 'CANDLES_ONE_MINUTE',
                    '5m': 'CANDLES_FIVE_MINUTES',
                    '30m': 'CANDLES_THIRTY_MINUTES',
                    '1h': 'CANDLES_ONE_HOUR',
                    '2h': 'CANDLES_TWO_HOURS',
                    '1d': 'CANDLES_ONE_DAY',
                },
            },
            'exceptions': {
                'exact': {
                    'Unable to authenticate': AuthenticationError,
                },
            },
        })

    async def subscribe(self, name: str, symbols: Strings = None, params={}):
        """
 @ignore
        subscribes to a websocket channel

        https://docs.cloud.coinbase.com/intx/docs/websocket-overview#subscribe

        :param str name: the name of the channel
        :param str[] [symbols]: unified market symbol
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: subscription to a websocket channel
        """
        await self.load_markets()
        self.check_required_credentials()
        market = None
        messageHash = name
        productIds = None
        if symbols is None:
            symbols = self.get_active_symbols()
        symbolsLength = len(symbols)
        messageHashes = []
        if symbolsLength > 1:
            parsedSymbols = self.market_symbols(symbols)
            marketIds = self.market_ids(parsedSymbols)
            productIds = marketIds
            for i in range(0, len(parsedSymbols)):
                messageHashes.append(name + '::' + parsedSymbols[i])
            # messageHash = messageHash + '::' + ','.join(parsedSymbols)
        elif symbolsLength == 1:
            market = self.market(symbols[0])
            messageHash = name + '::' + market['symbol']
            productIds = [market['id']]
        url = self.urls['api']['ws']
        if url is None:
            raise NotSupported(self.id + ' is not supported in sandbox environment')
        timestamp = str(self.nonce())
        auth = timestamp + self.apiKey + 'CBINTLMD' + self.password
        signature = self.hmac(self.encode(auth), self.base64_to_binary(self.secret), hashlib.sha256, 'base64')
        subscribe: dict = {
            'type': 'SUBSCRIBE',
            # 'product_ids': productIds,
            'channels': [name],
            'time': timestamp,
            'key': self.apiKey,
            'passphrase': self.password,
            'signature': signature,
        }
        if productIds is not None:
            subscribe['product_ids'] = productIds
        if symbolsLength > 1:
            return await self.watch_multiple(url, messageHashes, self.extend(subscribe, params), messageHashes)
        return await self.watch(url, messageHash, self.extend(subscribe, params), messageHash)

    async def subscribe_multiple(self, name: str, symbols: Strings = None, params={}):
        """
 @ignore
        subscribes to a websocket channel using watchMultiple

        https://docs.cloud.coinbase.com/intx/docs/websocket-overview#subscribe

        :param str name: the name of the channel
        :param string|str[] [symbols]: unified market symbol
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: subscription to a websocket channel
        """
        await self.load_markets()
        self.check_required_credentials()
        if self.is_empty(symbols):
            symbols = self.symbols
        else:
            symbols = self.market_symbols(symbols)
        messageHashes = []
        productIds = []
        for i in range(0, len(symbols)):
            marketId = self.market_id(symbols[i])
            symbol = self.symbol(marketId)
            productIds.append(marketId)
            messageHashes.append(name + '::' + symbol)
        url = self.urls['api']['ws']
        if url is None:
            raise NotSupported(self.id + ' is not supported in sandbox environment')
        timestamp = self.number_to_string(self.seconds())
        auth = timestamp + self.apiKey + 'CBINTLMD' + self.password
        signature = self.hmac(self.encode(auth), self.base64_to_binary(self.secret), hashlib.sha256, 'base64')
        subscribe: dict = {
            'type': 'SUBSCRIBE',
            'time': timestamp,
            'product_ids': productIds,
            'channels': [name],
            'key': self.apiKey,
            'passphrase': self.password,
            'signature': signature,
        }
        return await self.watch_multiple(url, messageHashes, self.extend(subscribe, params), messageHashes)

    async def watch_funding_rate(self, symbol: str, params={}) -> FundingRate:
        """
        watch the current funding rate

        https://docs.cloud.coinbase.com/intx/docs/websocket-channels#funding-channel

        :param str symbol: unified market symbol
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `funding rate structure <https://docs.ccxt.com/#/?id=funding-rate-structure>`
        """
        await self.load_markets()
        return await self.subscribe('RISK', [symbol], params)

    async def watch_funding_rates(self, symbols: List[str], params={}) -> FundingRates:
        """
        watch the funding rate for multiple markets

        https://docs.cloud.coinbase.com/intx/docs/websocket-channels#funding-channel

        :param str[]|None symbols: list of unified market symbols
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a dictionary of `funding rates structures <https://docs.ccxt.com/#/?id=funding-rates-structure>`, indexe by market symbols
        """
        await self.load_markets()
        fundingRate = await self.subscribe_multiple('RISK', symbols, params)
        symbol = self.safe_string(fundingRate, 'symbol')
        if self.newUpdates:
            result: dict = {}
            result[symbol] = fundingRate
            return result
        return self.filter_by_array(self.fundingRates, 'symbol', symbols)

    async def watch_ticker(self, symbol: str, params={}) -> Ticker:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market

        https://docs.cloud.coinbase.com/intx/docs/websocket-channels#instruments-channel

        :param str [symbol]: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to watch, 'LEVEL1' or 'INSTRUMENTS', default is 'LEVEL1'
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchTicker', 'channel', 'LEVEL1')
        return await self.subscribe(channel, [symbol], params)

    def get_active_symbols(self):
        symbols = self.symbols
        output = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            market = self.markets[symbol]
            if market['active']:
                output.append(symbol)
        return output

    async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market

        https://docs.cloud.coinbase.com/intx/docs/websocket-channels#instruments-channel

        :param str[] [symbols]: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to watch, 'LEVEL1' or 'INSTRUMENTS', default is 'INSTLEVEL1UMENTS'
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchTickers', 'channel', 'LEVEL1')
        ticker = await self.subscribe(channel, symbols, params)
        if self.newUpdates:
            result: dict = {}
            result[ticker['symbol']] = ticker
            return result
        return self.filter_by_array(self.tickers, 'symbol', symbols)

    def handle_instrument(self, client: Client, message):
        #
        #    {
        #        "sequence": 1,
        #        "product_id": "ETH-PERP",
        #        "instrument_type": "PERP",
        #        "base_asset_name": "ETH",
        #        "quote_asset_name": "USDC",
        #        "base_increment": "0.0001",
        #        "quote_increment": "0.01",
        #        "avg_daily_quantity": "43.0",
        #        "avg_daily_volume": "80245.2",
        #        "total_30_day_quantity":"1443.0",
        #        "total_30_day_volume":"3040449.0",
        #        "total_24_hour_quantity":"48.1",
        #        "total_24_hour_volume":"101348.3",
        #        "base_imf": "0.2",
        #        "min_quantity": "0.0001",
        #        "position_size_limit": "500",
        #        "funding_interval": "60000000000",
        #        "trading_state": "trading",
        #        "last_update_time": "2023-05-04T11:16:33.016Z",
        #        "time": "2023-05-10T14:58:47.000Z",
        #        "channel":"INSTRUMENTS",
        #        "type":"SNAPSHOT"
        #    }
        ticker = self.parse_ws_instrument(message)
        channel = self.safe_string(message, 'channel')
        client.resolve(ticker, channel)
        client.resolve(ticker, channel + '::' + ticker['symbol'])

    def parse_ws_instrument(self, ticker: dict, market=None):
        #
        #    {
        #        "sequence": 1,
        #        "product_id": "ETH-PERP",
        #        "instrument_type": "PERP",
        #        "base_asset_name": "ETH",
        #        "quote_asset_name": "USDC",
        #        "base_increment": "0.0001",
        #        "quote_increment": "0.01",
        #        "avg_daily_quantity": "43.0",
        #        "avg_daily_volume": "80245.2",
        #        "total_30_day_quantity":"1443.0",
        #        "total_30_day_volume":"3040449.0",
        #        "total_24_hour_quantity":"48.1",
        #        "total_24_hour_volume":"101348.3",
        #        "base_imf": "0.2",
        #        "min_quantity": "0.0001",
        #        "position_size_limit": "500",
        #        "funding_interval": "60000000000",
        #        "trading_state": "trading",
        #        "last_update_time": "2023-05-04T11:16:33.016Z",
        #        "time": "2023-05-10T14:58:47.000Z",
        #        "channel":"INSTRUMENTS",
        #        "type":"SNAPSHOT"
        #    }
        # instruments
        #   {
        #       sequence: 0,
        #       instrument_type: 'PERP',
        #       instrument_mode: 'standard',
        #       base_asset_name: 'BTC',
        #       quote_asset_name: 'USDC',
        #       base_increment: '0.0001',
        #       quote_increment: '0.1',
        #       avg_daily_quantity: '502.8845',
        #       avg_daily_volume: '3.1495242961566668E7',
        #       total30_day_quantity: '15086.535',
        #       total30_day_volume: '9.44857288847E8',
        #       total24_hour_quantity: '5.0',
        #       total24_hour_volume: '337016.5',
        #       base_imf: '0.1',
        #       min_quantity: '0.0001',
        #       position_size_limit: '800',
        #       funding_interval: '3600000000000',
        #       trading_state: 'trading',
        #       last_updated_time: '2024-07-30T15:00:00Z',
        #       default_initial_margin: '0.2',
        #       base_asset_multiplier: '1.0',
        #       channel: 'INSTRUMENTS',
        #       type: 'SNAPSHOT',
        #       time: '2024-07-30T15:26:56.766Z',
        #   }
        #
        marketId = self.safe_string(ticker, 'product_id')
        datetime = self.safe_string(ticker, 'time')
        return self.safe_ticker({
            'info': ticker,
            'symbol': self.safe_symbol(marketId, market, '-'),
            'timestamp': self.parse8601(datetime),
            'datetime': datetime,
            'high': None,
            'low': None,
            'bid': None,
            'bidVolume': None,
            'ask': None,
            'askVolume': None,
            'vwap': None,
            'open': None,
            'close': None,
            'last': None,
            'previousClose': None,
            'change': None,
            'percentage': None,
            'average': None,
            'baseVolume': self.safe_string_2(ticker, 'total_24_hour_quantity', 'total24_hour_quantity'),
            'quoteVolume': self.safe_string_2(ticker, 'total_24_hour_volume', 'total24_hour_volume'),
        })

    def handle_ticker(self, client: Client, message):
        #
        # snapshot
        #    {
        #        "sequence": 0,
        #        "product_id": "BTC-PERP",
        #        "time": "2023-05-10T14:58:47.000Z",
        #        "bid_price": "28787.8",
        #        "bid_qty": "0.466",  # One side book
        #        "channel": "LEVEL1",
        #        "type": "SNAPSHOT"
        #    }
        # update
        #    {
        #       "sequence": 1,
        #       "product_id": "BTC-PERP",
        #       "time": "2023-05-10T14:58:47.547Z",
        #       "bid_price": "28787.8",
        #       "bid_qty": "0.466",
        #       "ask_price": "28788.8",
        #       "ask_qty": "1.566",
        #       "channel": "LEVEL1",
        #       "type": "UPDATE"
        #    }
        #
        ticker = self.parse_ws_ticker(message)
        channel = self.safe_string(message, 'channel')
        client.resolve(ticker, channel)
        client.resolve(ticker, channel + '::' + ticker['symbol'])

    def parse_ws_ticker(self, ticker: object, market: Market = None) -> Ticker:
        #
        #    {
        #       "sequence": 1,
        #       "product_id": "BTC-PERP",
        #       "time": "2023-05-10T14:58:47.547Z",
        #       "bid_price": "28787.8",
        #       "bid_qty": "0.466",
        #       "ask_price": "28788.8",
        #       "ask_qty": "1.566",
        #       "channel": "LEVEL1",
        #       "type": "UPDATE"
        #    }
        #
        datetime = self.safe_string(ticker, 'time')
        marketId = self.safe_string(ticker, 'product_id')
        return self.safe_ticker({
            'info': ticker,
            'symbol': self.safe_symbol(marketId, market),
            'timestamp': self.parse8601(datetime),
            'datetime': datetime,
            'bid': self.safe_number(ticker, 'bid_price'),
            'bidVolume': self.safe_number(ticker, 'bid_qty'),
            'ask': self.safe_number(ticker, 'ask_price'),
            'askVolume': self.safe_number(ticker, 'ask_qty'),
            'high': None,
            'low': None,
            'open': None,
            'close': None,
            'last': None,
            'change': None,
            'percentage': None,
            'average': None,
            'vwap': None,
            'baseVolume': None,
            'quoteVolume': None,
            'previousClose': None,
        })

    async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
        """
        watches historical candlestick data containing the open, high, low, close price, and the volume of a market

        https://docs.cdp.coinbase.com/intx/docs/websocket-channels#candles-channel

        :param str symbol: unified symbol of the market to fetch OHLCV data for
        :param str timeframe: the length of time each candle represents
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        await self.load_markets()
        market = self.market(symbol)
        symbol = market['symbol']
        options = self.safe_dict(self.options, 'timeframes', {})
        interval = self.safe_string(options, timeframe, timeframe)
        ohlcv = await self.subscribe(interval, [symbol], params)
        if self.newUpdates:
            limit = ohlcv.getLimit(symbol, limit)
        return self.filter_by_since_limit(ohlcv, since, limit, 0, True)

    def handle_ohlcv(self, client: Client, message):
        #
        # {
        #     "sequence": 0,
        #     "product_id": "BTC-PERP",
        #     "channel": "CANDLES_ONE_MINUTE",
        #     "type": "SNAPSHOT",
        #     "candles": [
        #       {
        #           "time": "2023-05-10T14:58:47.000Z",
        #           "low": "28787.8",
        #           "high": "28788.8",
        #           "open": "28788.8",
        #           "close": "28787.8",
        #           "volume": "0.466"
        #        },
        #     ]
        #  }
        #
        messageHash = self.safe_string(message, 'channel')
        marketId = self.safe_string(message, 'product_id')
        market = self.safe_market(marketId)
        symbol = market['symbol']
        timeframe = self.find_timeframe(messageHash)
        self.ohlcvs[symbol] = self.safe_value(self.ohlcvs, symbol, {})
        if self.safe_value(self.ohlcvs[symbol], timeframe) is None:
            limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
            self.ohlcvs[symbol][timeframe] = ArrayCacheByTimestamp(limit)
        stored = self.ohlcvs[symbol][timeframe]
        data = self.safe_list(message, 'candles', [])
        for i in range(0, len(data)):
            tick = data[i]
            parsed = self.parse_ohlcv(tick, market)
            stored.append(parsed)
        client.resolve(stored, messageHash + '::' + symbol)

    async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol

        https://docs.cloud.coinbase.com/intx/docs/websocket-channels#match-channel

        :param str symbol: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        return await self.watch_trades_for_symbols([symbol], since, limit, params)

    async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a list of symbols
        :param str[] symbols: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False, True, True)
        trades = await self.subscribe_multiple('MATCH', symbols, params)
        if self.newUpdates:
            first = self.safe_dict(trades, 0)
            tradeSymbol = self.safe_string(first, 'symbol')
            limit = trades.getLimit(tradeSymbol, limit)
        return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)

    def handle_trade(self, client, message):
        #
        #    {
        #       "sequence": 0,
        #       "product_id": "BTC-PERP",
        #       "time": "2023-05-10T14:58:47.002Z",
        #       "match_id": "177101110052388865",
        #       "trade_qty": "0.006",
        #       "aggressor_side": "BUY",
        #       "trade_price": "28833.1",
        #       "channel": "MATCH",
        #       "type": "UPDATE"
        #    }
        #
        trade = self.parse_ws_trade(message)
        symbol = trade['symbol']
        channel = self.safe_string(message, 'channel')
        if not (symbol in self.trades):
            limit = self.safe_integer(self.options, 'tradesLimit', 1000)
            tradesArrayCache = ArrayCache(limit)
            self.trades[symbol] = tradesArrayCache
        tradesArray = self.trades[symbol]
        tradesArray.append(trade)
        self.trades[symbol] = tradesArray
        client.resolve(tradesArray, channel)
        client.resolve(tradesArray, channel + '::' + trade['symbol'])
        return message

    def parse_ws_trade(self, trade, market=None):
        #
        #    {
        #       "sequence": 0,
        #       "product_id": "BTC-PERP",
        #       "time": "2023-05-10T14:58:47.002Z",
        #       "match_id": "177101110052388865",
        #       "trade_qty": "0.006",
        #       "aggressor_side": "BUY",
        #       "trade_price": "28833.1",
        #       "channel": "MATCH",
        #       "type": "UPDATE"
        #    }
        marketId = self.safe_string_2(trade, 'symbol', 'product_id')
        datetime = self.safe_string(trade, 'time')
        return self.safe_trade({
            'info': trade,
            'id': self.safe_string(trade, 'match_id'),
            'order': None,
            'timestamp': self.parse8601(datetime),
            'datetime': datetime,
            'symbol': self.safe_symbol(marketId, market),
            'type': None,
            'side': self.safe_string_lower(trade, 'agressor_side'),
            'takerOrMaker': None,
            'price': self.safe_string(trade, 'trade_price'),
            'amount': self.safe_string(trade, 'trade_qty'),
            'cost': None,
            'fee': None,
        })

    async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://docs.cloud.coinbase.com/intx/docs/websocket-channels#level2-channel

        :param str symbol: unified symbol of the market to fetch the order book for
        :param int [limit]: the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        return await self.watch_order_book_for_symbols([symbol], limit, params)

    async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://docs.cloud.coinbase.com/intx/docs/websocket-channels#level2-channel

        :param str[] symbols:
        :param int [limit]: the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        await self.load_markets()
        return await self.subscribe_multiple('LEVEL2', symbols, params)

    def handle_order_book(self, client, message):
        #
        # snapshot
        #    {
        #       "sequence": 0,
        #       "product_id": "BTC-PERP",
        #       "time": "2023-05-10T14:58:47.000Z",
        #       "bids": [
        #           ["29100", "0.02"],
        #           ["28950", "0.01"],
        #           ["28900", "0.01"]
        #       ],
        #       "asks": [
        #           ["29267.8", "18"],
        #           ["29747.6", "18"],
        #           ["30227.4", "9"]
        #       ],
        #       "channel": "LEVEL2",
        #       "type": "SNAPSHOT",
        #    }
        # update
        #    {
        #       "sequence": 1,
        #       "product_id": "BTC-PERP",
        #       "time": "2023-05-10T14:58:47.375Z",
        #       "changes": [
        #           [
        #               "BUY",
        #               "28787.7",
        #               "6"
        #           ]
        #       ],
        #       "channel": "LEVEL2",
        #       "type": "UPDATE"
        #    }
        #
        type = self.safe_string(message, 'type')
        marketId = self.safe_string(message, 'product_id')
        symbol = self.safe_symbol(marketId)
        datetime = self.safe_string(message, 'time')
        channel = self.safe_string(message, 'channel')
        if not (symbol in self.orderbooks):
            limit = self.safe_integer(self.options, 'watchOrderBookLimit', 1000)
            self.orderbooks[symbol] = self.order_book({}, limit)
        orderbook = self.orderbooks[symbol]
        if type == 'SNAPSHOT':
            parsedSnapshot = self.parse_order_book(message, symbol, None, 'bids', 'asks')
            orderbook.reset(parsedSnapshot)
            orderbook['symbol'] = symbol
        else:
            changes = self.safe_list(message, 'changes', [])
            self.handle_deltas(orderbook, changes)
        orderbook['nonce'] = self.safe_integer(message, 'sequence')
        orderbook['datetime'] = datetime
        orderbook['timestamp'] = self.parse8601(datetime)
        self.orderbooks[symbol] = orderbook
        client.resolve(orderbook, channel + '::' + symbol)

    def handle_delta(self, orderbook, delta):
        rawSide = self.safe_string_lower(delta, 0)
        side = 'bids' if (rawSide == 'buy') else 'asks'
        price = self.safe_float(delta, 1)
        amount = self.safe_float(delta, 2)
        bookside = orderbook[side]
        bookside.store(price, amount)

    def handle_deltas(self, orderbook, deltas):
        for i in range(0, len(deltas)):
            self.handle_delta(orderbook, deltas[i])

    def handle_subscription_status(self, client, message):
        #
        #    {
        #       "channels": [
        #           {
        #               "name": "MATCH",
        #               "product_ids": [
        #                   "BTC-PERP",
        #                   "ETH-PERP"
        #               ]
        #           },
        #           {
        #               "name": "INSTRUMENTS",
        #               "product_ids": [
        #                   "BTC-PERP",
        #                   "ETH-PERP"
        #               ]
        #           }
        #       ],
        #       "authenticated": True,
        #       "channel": "SUBSCRIPTIONS",
        #       "type": "SNAPSHOT",
        #       "time": "2023-05-30T16:53:46.847Z"
        #    }
        #
        return message

    def handle_funding_rate(self, client: Client, message):
        #
        # snapshot
        #    {
        #       "sequence": 0,
        #       "product_id": "BTC-PERP",
        #       "time": "2023-05-10T14:58:47.000Z",
        #       "funding_rate": "0.001387",
        #       "is_final": True,
        #       "channel": "FUNDING",
        #       "type": "SNAPSHOT"
        #    }
        # update
        #    {
        #       "sequence": 1,
        #       "product_id": "BTC-PERP",
        #       "time": "2023-05-10T15:00:00.000Z",
        #       "funding_rate": "0.001487",
        #       "is_final": False,
        #       "channel": "FUNDING",
        #       "type": "UPDATE"
        #    }
        #
        channel = self.safe_string(message, 'channel')
        fundingRate = self.parse_funding_rate(message)
        self.fundingRates[fundingRate['symbol']] = fundingRate
        client.resolve(fundingRate, channel + '::' + fundingRate['symbol'])

    def handle_error_message(self, client: Client, message):
        #
        #    {
        #        message: 'Failed to subscribe',
        #        reason: 'Unable to authenticate',
        #        channel: 'SUBSCRIPTIONS',
        #        type: 'REJECT'
        #    }
        #
        type = self.safe_string(message, 'type')
        if type != 'REJECT':
            return False
        reason = self.safe_string(message, 'reason')
        errMsg = self.safe_string(message, 'message')
        try:
            feedback = self.id + ' ' + errMsg + reason
            self.throw_exactly_matched_exception(self.exceptions['exact'], reason, feedback)
            self.throw_broadly_matched_exception(self.exceptions['broad'], reason, feedback)
            raise ExchangeError(feedback)
        except Exception as e:
            client.reject(e)
        return True

    def handle_message(self, client, message):
        if self.handle_error_message(client, message):
            return
        channel = self.safe_string(message, 'channel', '')
        methods: dict = {
            'SUBSCRIPTIONS': self.handle_subscription_status,
            'INSTRUMENTS': self.handle_instrument,
            'LEVEL1': self.handle_ticker,
            'MATCH': self.handle_trade,
            'LEVEL2': self.handle_order_book,
            'FUNDING': self.handle_funding_rate,
            'RISK': self.handle_ticker,
        }
        type = self.safe_string(message, 'type')
        if type == 'error':
            errorMessage = self.safe_string(message, 'message')
            raise ExchangeError(errorMessage)
        if channel.find('CANDLES') > -1:
            self.handle_ohlcv(client, message)
        method = self.safe_value(methods, channel)
        if method is not None:
            method(client, message)
