# -*- coding: utf-8 -*-

# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code

import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp
from ccxt.base.types import Any, Balances, Int, Market, Order, OrderBook, Position, Str, Strings, Ticker, Tickers, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List


class xt(ccxt.async_support.xt):

    def describe(self) -> Any:
        return self.deep_extend(super(xt, self).describe(), {
            'has': {
                'ws': True,
                'watchOHLCV': True,
                'watchOrderBook': True,
                'watchTicker': True,
                'watchTickers': True,
                'watchTrades': True,
                'watchTradesForSymbols': False,
                'watchBalance': True,
                'watchOrders': True,
                'watchMyTrades': True,
                'watchPositions': True,
            },
            'urls': {
                'api': {
                    'ws': {
                        'spot': 'wss://stream.xt.com',
                        'contract': 'wss://fstream.xt.com/ws',
                    },
                },
            },
            'options': {
                'tradesLimit': 1000,
                'ordersLimit': 1000,
                'OHLCVLimit': 1000,
                'watchTicker': {
                    'method': 'ticker',  # agg_ticker(contract only)
                },
                'watchTickers': {
                    'method': 'tickers',  # agg_tickers(contract only)
                },
                'watchPositions': {
                    'type': 'swap',
                    'fetchPositionsSnapshot': True,
                    'awaitPositionsSnapshot': True,
                },
            },
            'streaming': {
                'keepAlive': 20000,
                'ping': self.ping,
            },
            'token': None,
        })

    async def get_listen_key(self, isContract: bool):
        """
 @ignore
        required for private endpoints
        :param str isContract: True for contract trades

        https://doc.xt.com/#websocket_privategetToken
        https://doc.xt.com/#futures_user_websocket_v2base

        :returns str: listen key / access token
        """
        self.check_required_credentials()
        tradeType = 'contract' if isContract else 'spot'
        url = self.urls['api']['ws'][tradeType]
        if not isContract:
            url = url + '/private'
        client = self.client(url)
        token = self.safe_string(client.subscriptions, 'token')
        if token is None:
            if isContract:
                response = await self.privateLinearGetFutureUserV1UserListenKey()
                #
                #    {
                #        returnCode: '0',
                #        msgInfo: 'success',
                #        error: null,
                #        result: '3BC1D71D6CF96DA3458FC35B05B633351684511731128'
                #    }
                #
                client.subscriptions['token'] = self.safe_string(response, 'result')
            else:
                response = await self.privateSpotPostWsToken()
                #
                #    {
                #        "rc": 0,
                #        "mc": "SUCCESS",
                #        "ma": [],
                #        "result": {
                #            "token": "eyJhbqGciOiJSUzI1NiJ9.eyJhY2NvdW50SWQiOiIyMTQ2Mjg1MzIyNTU5Iiwic3ViIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsInNjb3BlIjoiYXV0aCIsImlzcyI6Inh0LmNvbSIsImxhc3RBdXRoVGltZSI6MTY2MzgxMzY5MDk1NSwic2lnblR5cGUiOiJBSyIsInVzZXJOYW1lIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsImV4cCI6MTY2NjQwNTY5MCwiZGV2aWNlIjoidW5rbm93biIsInVzZXJJZCI6MjE0NjI4NTMyMjU1OX0.h3zJlJBQrK2x1HvUxsKivnn6PlSrSDXXXJ7WqHAYSrN2CG5XPTKc4zKnTVoYFbg6fTS0u1fT8wH7wXqcLWXX71vm0YuP8PCvdPAkUIq4-HyzltbPr5uDYd0UByx0FPQtq1exvsQGe7evXQuDXx3SEJXxEqUbq_DNlXPTq_JyScI",
                #            "refreshToken": "eyJhbGciOiqJSUzI1NiJ9.eyJhY2NvdW50SWQiOiIyMTQ2Mjg1MzIyNTU5Iiwic3ViIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsInNjb3BlIjoicmVmcmVzaCIsImlzcyI6Inh0LmNvbSIsImxhc3RBdXRoVGltZSI6MTY2MzgxMzY5MDk1NSwic2lnblR5cGUiOiJBSyIsInVzZXJOYW1lIjoibGh4dDRfMDAwMUBzbmFwbWFpbC5jYyIsImV4cCI6MTY2NjQwNTY5MCwiZGV2aWNlIjoidW5rbm93biIsInVzZXJJZCI6MjE0NjI4NTMyMjU1OX0.Fs3YVm5YrEOzzYOSQYETSmt9iwxUHBovh2u73liv1hLUec683WGfktA_s28gMk4NCpZKFeQWFii623FvdfNoteXR0v1yZ2519uNvNndtuZICDdv3BQ4wzW1wIHZa1skxFfqvsDnGdXpjqu9UFSbtHwxprxeYfnxChNk4ssei430"
                #        }
                #    }
                #
                result = self.safe_dict(response, 'result')
                client.subscriptions['token'] = self.safe_string(result, 'accessToken')
        return client.subscriptions['token']

    def get_cache_index(self, orderbook, cache):
        # return the first index of the cache that can be applied to the orderbook or -1 if not possible
        nonce = self.safe_integer(orderbook, 'nonce')
        firstDelta = self.safe_value(cache, 0)
        firstDeltaNonce = self.safe_integer_2(firstDelta, 'i', 'u')
        if nonce < firstDeltaNonce - 1:
            return -1
        for i in range(0, len(cache)):
            delta = cache[i]
            deltaNonce = self.safe_integer_2(delta, 'i', 'u')
            if deltaNonce >= nonce:
                return i
        return len(cache)

    def handle_delta(self, orderbook, delta):
        orderbook['nonce'] = self.safe_integer_2(delta, 'i', 'u')
        obAsks = self.safe_list(delta, 'a', [])
        obBids = self.safe_list(delta, 'b', [])
        bids = orderbook['bids']
        asks = orderbook['asks']
        for i in range(0, len(obBids)):
            bid = obBids[i]
            price = self.safe_number(bid, 0)
            quantity = self.safe_number(bid, 1)
            bids.store(price, quantity)
        for i in range(0, len(obAsks)):
            ask = obAsks[i]
            price = self.safe_number(ask, 0)
            quantity = self.safe_number(ask, 1)
            asks.store(price, quantity)
        # self.handleBidAsks(storedBids, bids)
        # self.handleBidAsks(storedAsks, asks)

    async def subscribe(self, name: str, access: str, methodName: str, market: Market = None, symbols: List[str] = None, params={}):
        """
 @ignore
        Connects to a websocket channel

        https://doc.xt.com/#websocket_privaterequestFormat
        https://doc.xt.com/#futures_market_websocket_v2base

        :param str name: name of the channel
        :param str access: public or private
        :param str methodName: the name of the CCXT class method
        :param dict [market]: CCXT market
        :param str[] [symbols]: unified market symbols
        :param dict params: extra parameters specific to the xt api
        :returns dict: data from the websocket stream
        """
        privateAccess = access == 'private'
        type = None
        type, params = self.handle_market_type_and_params(methodName, market, params)
        isContract = (type != 'spot')
        subscribe = {
            'method': 'SUBSCRIBE' if isContract else 'subscribe',
            'id': self.number_to_string(self.milliseconds()) + name,  # call back ID
        }
        if privateAccess:
            if not isContract:
                subscribe['params'] = [name]
                subscribe['listenKey'] = await self.get_listen_key(isContract)
            else:
                listenKey = await self.get_listen_key(isContract)
                param = name + '@' + listenKey
                subscribe['params'] = [param]
        else:
            subscribe['params'] = [name]
        tradeType = 'contract' if isContract else 'spot'
        messageHash = name + '::' + tradeType
        if symbols is not None:
            messageHash = messageHash + '::' + ','.join(symbols)
        request = self.extend(subscribe, params)
        tail = access
        if isContract:
            tail = 'user' if privateAccess else 'market'
        url = self.urls['api']['ws'][tradeType] + '/' + tail
        return await self.watch(url, messageHash, request, messageHash)

    async def watch_ticker(self, symbol: str, params={}) -> Ticker:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market

        https://doc.xt.com/#websocket_publictickerRealTime
        https://doc.xt.com/#futures_market_websocket_v2tickerRealTime
        https://doc.xt.com/#futures_market_websocket_v2aggTickerRealTime

        :param str symbol: unified symbol of the market to fetch the ticker for
        :param dict params: extra parameters specific to the xt api endpoint
        :param str [params.method]: 'agg_ticker'(contract only) or 'ticker', default = 'ticker' - the endpoint that will be streamed
        :returns dict: a `ticker structure <https://docs.ccxt.com/en/latest/manual.html#ticker-structure>`
        """
        await self.load_markets()
        market = self.market(symbol)
        options = self.safe_dict(self.options, 'watchTicker')
        defaultMethod = self.safe_string(options, 'method', 'ticker')
        method = self.safe_string(params, 'method', defaultMethod)
        name = method + '@' + market['id']
        return await self.subscribe(name, 'public', 'watchTicker', market, None, params)

    async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market

        https://doc.xt.com/#websocket_publicallTicker
        https://doc.xt.com/#futures_market_websocket_v2allTicker
        https://doc.xt.com/#futures_market_websocket_v2allAggTicker

        :param str [symbols]: unified market symbols
        :param dict params: extra parameters specific to the xt api endpoint
        :param str [params.method]: 'agg_tickers'(contract only) or 'tickers', default = 'tickers' - the endpoint that will be streamed
        :returns dict: a `ticker structure <https://docs.ccxt.com/en/latest/manual.html#ticker-structure>`
        """
        await self.load_markets()
        options = self.safe_dict(self.options, 'watchTickers')
        defaultMethod = self.safe_string(options, 'method', 'tickers')
        name = self.safe_string(params, 'method', defaultMethod)
        market = None
        if symbols is not None:
            market = self.market(symbols[0])
        tickers = await self.subscribe(name, 'public', 'watchTickers', market, symbols, params)
        if self.newUpdates:
            return tickers
        return self.filter_by_array(self.tickers, 'symbol', symbols)

    async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market

        https://doc.xt.com/#websocket_publicsymbolKline
        https://doc.xt.com/#futures_market_websocket_v2symbolKline

        :param str symbol: unified symbol of the market to fetch OHLCV data for
        :param str timeframe: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, or 1M
        :param int [since]: not used by xt watchOHLCV
        :param int [limit]: not used by xt watchOHLCV
        :param dict params: extra parameters specific to the xt api endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        await self.load_markets()
        market = self.market(symbol)
        name = 'kline@' + market['id'] + ',' + timeframe
        ohlcv = await self.subscribe(name, 'public', 'watchOHLCV', market, None, params)
        if self.newUpdates:
            limit = ohlcv.getLimit(symbol, limit)
        return self.filter_by_since_limit(ohlcv, since, limit, 0, True)

    async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol

        https://doc.xt.com/#websocket_publicdealRecord
        https://doc.xt.com/#futures_market_websocket_v2dealRecord

        :param str symbol: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict params: extra parameters specific to the xt api endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/en/latest/manual.html?#public-trades>`
        """
        await self.load_markets()
        market = self.market(symbol)
        name = 'trade@' + market['id']
        trades = await self.subscribe(name, 'public', 'watchTrades', market, None, params)
        if self.newUpdates:
            limit = trades.getLimit(symbol, limit)
        return self.filter_by_since_limit(trades, since, limit, 'timestamp')

    async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://doc.xt.com/#websocket_publiclimitDepth
        https://doc.xt.com/#websocket_publicincreDepth
        https://doc.xt.com/#futures_market_websocket_v2limitDepth
        https://doc.xt.com/#futures_market_websocket_v2increDepth

        :param str symbol: unified symbol of the market to fetch the order book for
        :param int [limit]: not used by xt watchOrderBook
        :param dict params: extra parameters specific to the xt api endpoint
        :param int [params.levels]: 5, 10, 20, or 50
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/en/latest/manual.html#order-book-structure>` indexed by market symbols
        """
        await self.load_markets()
        market = self.market(symbol)
        levels = self.safe_string(params, 'levels')
        params = self.omit(params, 'levels')
        name = 'depth_update@' + market['id']
        if levels is not None:
            name = 'depth@' + market['id'] + ',' + levels
        orderbook = await self.subscribe(name, 'public', 'watchOrderBook', market, None, params)
        return orderbook.limit()

    async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
        """
        watches information on multiple orders made by the user

        https://doc.xt.com/#websocket_privateorderChange
        https://doc.xt.com/#futures_user_websocket_v2order

        :param str [symbol]: unified market symbol
        :param int [since]: not used by xt watchOrders
        :param int [limit]: the maximum number of orders to return
        :param dict params: extra parameters specific to the xt api endpoint
        :returns dict[]: a list of `order structures <https://docs.ccxt.com/en/latest/manual.html#order-structure>`
        """
        await self.load_markets()
        name = 'order'
        market = None
        if symbol is not None:
            market = self.market(symbol)
        orders = await self.subscribe(name, 'private', 'watchOrders', market, None, params)
        if self.newUpdates:
            limit = orders.getLimit(symbol, limit)
        return self.filter_by_since_limit(orders, since, limit, 'timestamp')

    async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        watches information on multiple trades made by the user

        https://doc.xt.com/#websocket_privateorderDeal
        https://doc.xt.com/#futures_user_websocket_v2trade

        :param str symbol: unified market symbol of the market orders were made in
        :param int [since]: the earliest time in ms to fetch orders for
        :param int [limit]: the maximum number of  orde structures to retrieve
        :param dict params: extra parameters specific to the kucoin api endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=trade-structure>`
        """
        await self.load_markets()
        name = 'trade'
        market = None
        if symbol is not None:
            market = self.market(symbol)
        trades = await self.subscribe(name, 'private', 'watchMyTrades', market, None, params)
        if self.newUpdates:
            limit = trades.getLimit(symbol, limit)
        return self.filter_by_since_limit(trades, since, limit, 'timestamp')

    async def watch_balance(self, params={}) -> Balances:
        """
        watches information on multiple orders made by the user

        https://doc.xt.com/#websocket_privatebalanceChange
        https://doc.xt.com/#futures_user_websocket_v2balance

        :param dict params: extra parameters specific to the xt api endpoint
        :returns dict[]: a list of `balance structures <https://docs.ccxt.com/#/?id=balance-structure>`
        """
        await self.load_markets()
        name = 'balance'
        return await self.subscribe(name, 'private', 'watchBalance', None, None, params)

    async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]:
        """

        https://doc.xt.com/#futures_user_websocket_v2position

        watch all open positions
        :param str[]|None symbols: list of unified market symbols
        :param number [since]: since timestamp
        :param number [limit]: limit
        :param dict params: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
        """
        await self.load_markets()
        url = self.urls['api']['ws']['contract'] + '/' + 'user'
        client = self.client(url)
        self.set_positions_cache(client)
        fetchPositionsSnapshot = self.handle_option('watchPositions', 'fetchPositionsSnapshot', True)
        awaitPositionsSnapshot = self.handle_option('watchPositions', 'awaitPositionsSnapshot', True)
        cache = self.positions
        if fetchPositionsSnapshot and awaitPositionsSnapshot and self.is_empty(cache):
            snapshot = await client.future('fetchPositionsSnapshot')
            return self.filter_by_symbols_since_limit(snapshot, symbols, since, limit, True)
        name = 'position'
        newPositions = await self.subscribe(name, 'private', 'watchPositions', None, None, params)
        if self.newUpdates:
            return newPositions
        return self.filter_by_symbols_since_limit(cache, symbols, since, limit, True)

    def set_positions_cache(self, client: Client):
        if self.positions is None:
            self.positions = ArrayCacheBySymbolBySide()
        fetchPositionsSnapshot = self.handle_option('watchPositions', 'fetchPositionsSnapshot')
        if fetchPositionsSnapshot:
            messageHash = 'fetchPositionsSnapshot'
            if not (messageHash in client.futures):
                client.future(messageHash)
                self.spawn(self.load_positions_snapshot, client, messageHash)

    async def load_positions_snapshot(self, client, messageHash):
        positions = await self.fetch_positions(None)
        self.positions = ArrayCacheBySymbolBySide()
        cache = self.positions
        for i in range(0, len(positions)):
            position = positions[i]
            contracts = self.safe_number(position, 'contracts', 0)
            if contracts > 0:
                cache.append(position)
        # don't remove the future from the .futures cache
        future = client.futures[messageHash]
        future.resolve(cache)
        client.resolve(cache, 'position::contract')

    def handle_position(self, client, message):
        #
        #    {
        #      topic: 'position',
        #      event: 'position',
        #      data: {
        #        accountId: 245296,
        #        accountType: 0,
        #        symbol: 'eth_usdt',
        #        contractType: 'PERPETUAL',
        #        positionType: 'CROSSED',
        #        positionSide: 'LONG',
        #        positionSize: '1',
        #        closeOrderSize: '0',
        #        availableCloseSize: '1',
        #        realizedProfit: '-0.0121',
        #        entryPrice: '2637.87',
        #        openOrderSize: '1',
        #        isolatedMargin: '2.63787',
        #        openOrderMarginFrozen: '2.78832014',
        #        underlyingType: 'U_BASED',
        #        leverage: 10,
        #        welfareAccount: False,
        #        profitFixedLatest: {},
        #        closeProfit: '0.0000',
        #        totalFee: '-0.0158',
        #        totalFundFee: '0.0037',
        #        markPrice: '2690.96'
        #      }
        #    }
        #
        if self.positions is None:
            self.positions = ArrayCacheBySymbolBySide()
        cache = self.positions
        data = self.safe_dict(message, 'data', {})
        position = self.parse_position(data)
        cache.append(position)
        messageHashes = self.find_message_hashes(client, 'position::contract')
        for i in range(0, len(messageHashes)):
            messageHash = messageHashes[i]
            parts = messageHash.split('::')
            symbolsString = parts[1]
            symbols = symbolsString.split(',')
            positions = self.filter_by_array([position], 'symbol', symbols, False)
            if not self.is_empty(positions):
                client.resolve(positions, messageHash)
        client.resolve([position], 'position::contract')

    def handle_ticker(self, client: Client, message: dict):
        #
        # spot
        #
        #    {
        #        topic: 'ticker',
        #        event: 'ticker@btc_usdt',
        #        data: {
        #           s: 'btc_usdt',            # symbol
        #           t: 1683501935877,         # time(Last transaction time)
        #           cv: '-82.67',             # priceChangeValue(24 hour price change)
        #           cr: '-0.0028',            # priceChangeRate 24-hour price change(percentage)
        #           o: '28823.87',            # open price
        #           c: '28741.20',            # close price
        #           h: '29137.64',            # highest price
        #           l: '28660.93',            # lowest price
        #           q: '6372.601573',         # quantity
        #           v: '184086075.2772391'    # volume
        #        }
        #    }
        #
        # contract
        #
        #    {
        #        "topic": "ticker",
        #        "event": "ticker@btc_usdt",
        #        "data": {
        #            "s": "btc_index",  # trading pair
        #            "o": "49000",      # opening price
        #            "c": "50000",      # closing price
        #            "h": "0.1",        # highest price
        #            "l": "0.1",        # lowest price
        #            "a": "0.1",        # volume
        #            "v": "0.1",        # turnover
        #            "ch": "0.21",      # quote change
        #            "t": 123124124     # timestamp
        #       }
        #    }
        #
        # agg_ticker(contract)
        #
        #    {
        #        "topic": "agg_ticker",
        #        "event": "agg_ticker@btc_usdt",
        #        "data": {
        #            "s": "btc_index",          # trading pair
        #            "o": "49000",              # opening price
        #            "c": "50000",              # closing price
        #            "h": "0.1",                # highest price
        #            "l": "0.1",                # lowest price
        #            "a": "0.1",                # volume
        #            "v": "0.1",                # turnover
        #            "ch": "0.21",              # quote change
        #            "i": "0.21" ,              # index price
        #            "m": "0.21",               # mark price
        #            "bp": "0.21",              # bid price
        #            "ap": "0.21" ,             # ask price
        #            "t": 123124124             # timestamp
        #       }
        #    }
        #
        data = self.safe_dict(message, 'data')
        marketId = self.safe_string(data, 's')
        if marketId is not None:
            cv = self.safe_string(data, 'cv')
            isSpot = cv is not None
            ticker = self.parse_ticker(data)
            symbol = ticker['symbol']
            self.tickers[symbol] = ticker
            event = self.safe_string(message, 'event')
            messageHashTail = 'spot' if isSpot else 'contract'
            messageHash = event + '::' + messageHashTail
            client.resolve(ticker, messageHash)
        return message

    def handle_tickers(self, client: Client, message: dict):
        #
        # spot
        #
        #    {
        #        topic: 'tickers',
        #        event: 'tickers',
        #        data: [
        #            {
        #                s: 'elon_usdt',
        #                t: 1683502958381,
        #                cv: '-0.0000000125',
        #                cr: '-0.0495',
        #                o: '0.0000002522',
        #                c: '0.0000002397',
        #                h: '0.0000002690',
        #                l: '0.0000002371',
        #                q: '3803783034.0000000000',
        #                v: '955.3260820022'
        #            },
        #            ...
        #        ]
        #    }
        #
        # contract
        #
        #    {
        #        "topic": "tickers",
        #        "event": "tickers",
        #        "data": [
        #            {
        #                "s": "btc_index",  # trading pair
        #                "o": "49000",      # opening price
        #                "c": "50000",      # closing price
        #                "h": "0.1",        # highest price
        #                "l": "0.1",        # lowest price
        #                "a": "0.1",        # volume
        #                "v": "0.1",        # turnover
        #                "ch": "0.21",      # quote change
        #                "t": 123124124     # timestamp
        #            }
        #        ]
        #    }
        #
        # agg_ticker(contract)
        #
        #    {
        #        "topic": "agg_tickers",
        #        "event": "agg_tickers",
        #        "data": [
        #            {
        #                "s": "btc_index",          # trading pair
        #                "o": "49000",              # opening price
        #                "c": "50000",              # closing price
        #                "h": "0.1",                # highest price
        #                "l": "0.1",                # lowest price
        #                "a": "0.1",                # volume
        #                "v": "0.1",                # turnover
        #                "ch": "0.21",              # quote change
        #                "i": "0.21" ,              # index price
        #                "m": "0.21",               # mark price
        #                "bp": "0.21",              # bid price
        #                "ap": "0.21" ,             # ask price
        #                "t": 123124124             # timestamp
        #            }
        #        ]
        #    }
        #
        data = self.safe_list(message, 'data', [])
        firstTicker = self.safe_dict(data, 0)
        spotTest = self.safe_string_2(firstTicker, 'cv', 'aq')
        tradeType = 'spot' if (spotTest is not None) else 'contract'
        newTickers = []
        for i in range(0, len(data)):
            tickerData = data[i]
            ticker = self.parse_ticker(tickerData)
            symbol = ticker['symbol']
            self.tickers[symbol] = ticker
            newTickers.append(ticker)
        messageHashStart = self.safe_string(message, 'topic') + '::' + tradeType
        messageHashes = self.find_message_hashes(client, messageHashStart + '::')
        for i in range(0, len(messageHashes)):
            messageHash = messageHashes[i]
            parts = messageHash.split('::')
            symbolsString = parts[2]
            symbols = symbolsString.split(',')
            tickers = self.filter_by_array(newTickers, 'symbol', symbols)
            tickersSymbols = list(tickers.keys())
            numTickers = len(tickersSymbols)
            if numTickers > 0:
                client.resolve(tickers, messageHash)
        client.resolve(self.tickers, messageHashStart)
        return message

    def handle_ohlcv(self, client: Client, message: dict):
        #
        # spot
        #
        #    {
        #        "topic": "kline",
        #        "event": "kline@btc_usdt,5m",
        #        "data": {
        #            "s": "btc_usdt",        # symbol
        #            "t": 1656043200000,     # time
        #            "i": "5m",              # interval
        #            "o": "44000",           # open price
        #            "c": "50000",           # close price
        #            "h": "52000",           # highest price
        #            "l": "36000",           # lowest price
        #            "q": "34.2",            # qty(quantity)
        #            "v": "230000"           # volume
        #        }
        #    }
        #
        # contract
        #
        #    {
        #        "topic": "kline",
        #        "event": "kline@btc_usdt,5m",
        #        "data": {
        #            "s": "btc_index",      # trading pair
        #            "o": "49000",          # opening price
        #            "c": "50000",          # closing price
        #            "h": "0.1",            # highest price
        #            "l": "0.1",            # lowest price
        #            "a": "0.1",            # volume
        #            "v": "0.1",            # turnover
        #            "ch": "0.21",          # quote change
        #            "t": 123124124         # timestamp
        #        }
        #    }
        #
        data = self.safe_dict(message, 'data', {})
        marketId = self.safe_string(data, 's')
        if marketId is not None:
            timeframe = self.safe_string(data, 'i')
            tradeType = 'spot' if ('q' in data) else 'contract'
            market = self.safe_market(marketId, None, None, tradeType)
            symbol = market['symbol']
            parsed = self.parse_ohlcv(data, market)
            self.ohlcvs[symbol] = self.safe_dict(self.ohlcvs, symbol, {})
            stored = self.safe_value(self.ohlcvs[symbol], timeframe)
            if stored is None:
                limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
                stored = ArrayCacheByTimestamp(limit)
                self.ohlcvs[symbol][timeframe] = stored
            stored.append(parsed)
            event = self.safe_string(message, 'event')
            messageHash = event + '::' + tradeType
            client.resolve(stored, messageHash)
        return message

    def handle_trade(self, client: Client, message: dict):
        #
        # spot
        #
        #    {
        #        topic: 'trade',
        #        event: 'trade@btc_usdt',
        #        data: {
        #            s: 'btc_usdt',
        #            i: '228825383103928709',
        #            t: 1684258222702,
        #            p: '27003.65',
        #            q: '0.000796',
        #            b: True
        #        }
        #    }
        #
        # contract
        #
        #    {
        #        "topic": "trade",
        #        "event": "trade@btc_usdt",
        #        "data": {
        #            "s": "btc_index",  # trading pair
        #            "p": "50000",      # price
        #            "a": "0.1"         # Quantity
        #            "m": "BID"         # Deal side  BID:Buy ASK:Sell
        #            "t": 123124124     # timestamp
        #        }
        #    }
        #
        data = self.safe_dict(message, 'data')
        marketId = self.safe_string_lower(data, 's')
        if marketId is not None:
            trade = self.parse_trade(data)
            i = self.safe_string(data, 'i')
            tradeType = 'spot' if (i is not None) else 'contract'
            market = self.safe_market(marketId, None, None, tradeType)
            symbol = market['symbol']
            event = self.safe_string(message, 'event')
            tradesArray = self.safe_value(self.trades, symbol)
            if tradesArray is None:
                tradesLimit = self.safe_integer(self.options, 'tradesLimit', 1000)
                tradesArray = ArrayCache(tradesLimit)
                self.trades[symbol] = tradesArray
            tradesArray.append(trade)
            messageHash = event + '::' + tradeType
            client.resolve(tradesArray, messageHash)
        return message

    def handle_order_book(self, client: Client, message: dict):
        #
        # spot
        #
        #    {
        #        "topic": "depth",
        #        "event": "depth@btc_usdt,20",
        #        "data": {
        #            "s": "btc_usdt",        # symbol
        #            "fi": 1681433733351,    # firstUpdateId = previous lastUpdateId + 1
        #            "i": 1681433733371,     # updateId
        #            "a": [                 # asks(sell order)
        #                [                  # [0]price, [1]quantity
        #                    "34000",        # price
        #                    "1.2"           # quantity
        #                ],
        #                [
        #                    "34001",
        #                    "2.3"
        #                ]
        #            ],
        #            "b": [                  # bids(buy order)
        #                [
        #                    "32000",
        #                    "0.2"
        #                ],
        #                [
        #                    "31000",
        #                    "0.5"
        #                ]
        #            ]
        #        }
        #    }
        #
        # contract
        #
        #    {
        #        "topic": "depth",
        #        "event": "depth@btc_usdt,20",
        #        "data": {
        #            s: "btc_usdt",
        #            pu: "548111455664",
        #            fu: "548111455665",
        #            u: "548111455667",
        #            a: [
        #                [
        #                    "26841.5",
        #                    "50210",
        #                ],
        #            ],
        #            b: [
        #                [
        #                    "26841",
        #                    "67075",
        #                ],
        #            ],
        #            t: 1684530667083,
        #        }
        #    }
        #
        data = self.safe_dict(message, 'data')
        marketId = self.safe_string(data, 's')
        if marketId is not None:
            event = self.safe_string(message, 'event')
            splitEvent = event.split(',')
            event = self.safe_string(splitEvent, 0)
            tradeType = 'contract' if ('fu' in data) else 'spot'
            market = self.safe_market(marketId, None, None, tradeType)
            symbol = market['symbol']
            obAsks = self.safe_list(data, 'a')
            obBids = self.safe_list(data, 'b')
            messageHash = event + '::' + tradeType
            if not (symbol in self.orderbooks):
                subscription = self.safe_dict(client.subscriptions, messageHash, {})
                limit = self.safe_integer(subscription, 'limit')
                self.orderbooks[symbol] = self.order_book({}, limit)
            orderbook = self.orderbooks[symbol]
            nonce = self.safe_integer(orderbook, 'nonce')
            if nonce is None:
                cacheLength = len(orderbook.cache)
                snapshotDelay = self.handle_option('watchOrderBook', 'snapshotDelay', 25)
                if cacheLength == snapshotDelay:
                    self.spawn(self.load_order_book, client, messageHash, symbol)
                orderbook.cache.append(data)
                return
            if obAsks is not None:
                asks = orderbook['asks']
                for i in range(0, len(obAsks)):
                    ask = obAsks[i]
                    price = self.safe_number(ask, 0)
                    quantity = self.safe_number(ask, 1)
                    asks.store(price, quantity)
            if obBids is not None:
                bids = orderbook['bids']
                for i in range(0, len(obBids)):
                    bid = obBids[i]
                    price = self.safe_number(bid, 0)
                    quantity = self.safe_number(bid, 1)
                    bids.store(price, quantity)
            timestamp = self.safe_integer(data, 't')
            orderbook['nonce'] = self.safe_integer_2(data, 'i', 'u')
            orderbook['timestamp'] = timestamp
            orderbook['datetime'] = self.iso8601(timestamp)
            orderbook['symbol'] = symbol
            client.resolve(orderbook, messageHash)

    def parse_ws_order_trade(self, trade: dict, market: Market = None):
        #
        #    {
        #        "s": "btc_usdt",                         # symbol
        #        "t": 1656043204763,                      # time happened time
        #        "i": "6216559590087220004",              # orderId,
        #        "ci": "test123",                         # clientOrderId
        #        "st": "PARTIALLY_FILLED",                # state
        #        "sd": "BUY",                             # side BUY/SELL
        #        "eq": "2",                               # executedQty executed quantity
        #        "ap": "30000",                           # avg price
        #        "f": "0.002"                             # fee
        #    }
        #
        # contract
        #
        #    {
        #        "symbol": "btc_usdt",                    # Trading pair
        #        "orderId": "1234",                       # Order Id
        #        "origQty": "34244",                      # Original Quantity
        #        "avgPrice": "123",                       # Quantity
        #        "price": "1111",                         # Average price
        #        "executedQty": "34244",                  # Volume(Cont)
        #        "orderSide": "BUY",                      # BUY, SELL
        #        "positionSide": "LONG",                  # LONG, SHORT
        #        "marginFrozen": "123",                   # Occupied margin
        #        "sourceType": "default",                 # DEFAULT:normal order,ENTRUST:plan commission,PROFIR:Take Profit and Stop Loss
        #        "sourceId" : "1231231",                  # Triggering conditions ID
        #        "state": "",                             # state:NEW：New order(unfilled);PARTIALLY_FILLED:Partial deal;PARTIALLY_CANCELED:Partial revocation;FILLED:Filled;CANCELED:Cancled;REJECTED:Order failed;EXPIRED：Expired
        #        "createTime": 1731231231,                # CreateTime
        #        "clientOrderId": "204788317630342726"
        #    }
        #
        marketId = self.safe_string(trade, 's')
        tradeType = 'contract' if ('symbol' in trade) else 'spot'
        market = self.safe_market(marketId, market, None, tradeType)
        timestamp = self.safe_string(trade, 't')
        return self.safe_trade({
            'info': trade,
            'id': None,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'symbol': market['symbol'],
            'order': self.safe_string(trade, 'i', 'orderId'),
            'type': self.parse_order_status(self.safe_string(trade, 'st', 'state')),
            'side': self.safe_string_lower(trade, 'sd', 'orderSide'),
            'takerOrMaker': None,
            'price': self.safe_number(trade, 'price'),
            'amount': self.safe_string(trade, 'origQty'),
            'cost': None,
            'fee': {
                'currency': None,
                'cost': self.safe_number(trade, 'f'),
                'rate': None,
            },
        }, market)

    def parse_ws_order(self, order: dict, market: Market = None):
        #
        # spot
        #
        #    {
        #        "s": "btc_usdt",                # symbol
        #        "bc": "btc",                    # base currency
        #        "qc": "usdt",                   # quotation currency
        #        "t": 1656043204763,             # happened time
        #        "ct": 1656043204663,            # create time
        #        "i": "6216559590087220004",     # order id,
        #        "ci": "test123",                # client order id
        #        "st": "PARTIALLY_FILLED",       # state NEW/PARTIALLY_FILLED/FILLED/CANCELED/REJECTED/EXPIRED
        #        "sd": "BUY",                    # side BUY/SELL
        #        "tp": "LIMIT",                  # type LIMIT/MARKET
        #        "oq":  "4"                      # original quantity
        #        "oqq":  48000,                  # original quotation quantity
        #        "eq": "2",                      # executed quantity
        #        "lq": "2",                      # remaining quantity
        #        "p": "4000",                    # price
        #        "ap": "30000",                  # avg price
        #        "f":"0.002"                     # fee
        #    }
        #
        # contract
        #
        #    {
        #        "symbol": "btc_usdt",                    # Trading pair
        #        "orderId": "1234",                       # Order Id
        #        "origQty": "34244",                      # Original Quantity
        #        "avgPrice": "123",                       # Quantity
        #        "price": "1111",                         # Average price
        #        "executedQty": "34244",                  # Volume(Cont)
        #        "orderSide": "BUY",                      # BUY, SELL
        #        "positionSide": "LONG",                  # LONG, SHORT
        #        "marginFrozen": "123",                   # Occupied margin
        #        "sourceType": "default",                 # DEFAULT:normal order,ENTRUST:plan commission,PROFIR:Take Profit and Stop Loss
        #        "sourceId" : "1231231",                  # Triggering conditions ID
        #        "state": "",                             # state:NEW：New order(unfilled);PARTIALLY_FILLED:Partial deal;PARTIALLY_CANCELED:Partial revocation;FILLED:Filled;CANCELED:Cancled;REJECTED:Order failed;EXPIRED：Expired
        #        "createTime": 1731231231,                # CreateTime
        #        "clientOrderId": "204788317630342726"
        #    }
        #
        marketId = self.safe_string_2(order, 's', 'symbol')
        tradeType = 'contract' if ('symbol' in order) else 'spot'
        market = self.safe_market(marketId, market, None, tradeType)
        timestamp = self.safe_integer_2(order, 'ct', 'createTime')
        return self.safe_order({
            'info': order,
            'id': self.safe_string_2(order, 'i', 'orderId'),
            'clientOrderId': self.safe_string_2(order, 'ci', 'clientOrderId'),
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'lastTradeTimestamp': None,
            'symbol': market['symbol'],
            'type': market['type'],
            'timeInForce': None,
            'postOnly': None,
            'side': self.safe_string_lower_2(order, 'sd', 'orderSide'),
            'price': self.safe_number_2(order, 'p', 'price'),
            'stopPrice': None,
            'stopLoss': None,
            'takeProfit': None,
            'amount': self.safe_string_2(order, 'oq', 'origQty'),
            'filled': self.safe_string_2(order, 'eq', 'executedQty'),
            'remaining': self.safe_string(order, 'lq'),
            'cost': None,
            'average': self.safe_string_2(order, 'ap', 'avgPrice'),
            'status': self.parse_order_status(self.safe_string(order, 'st', 'state')),
            'fee': {
                'currency': None,
                'cost': self.safe_number(order, 'f'),
            },
            'trades': None,
        }, market)

    def handle_order(self, client: Client, message: dict):
        #
        # spot
        #
        #    {
        #        "topic": "order",
        #        "event": "order",
        #        "data": {
        #            "s": "btc_usdt",                # symbol
        #            "t": 1656043204763,             # time happened time
        #            "i": "6216559590087220004",     # orderId,
        #            "ci": "test123",                # clientOrderId
        #            "st": "PARTIALLY_FILLED",       # state
        #            "sd": "BUY",                    # side BUY/SELL
        #            "eq": "2",                      # executedQty executed quantity
        #            "ap": "30000",                  # avg price
        #            "f": "0.002"                    # fee
        #        }
        #    }
        #
        # contract
        #
        #    {
        #        "topic": "order",
        #        "event": "order@123456",
        #        "data": {
        #             "symbol": "btc_usdt",                    # Trading pair
        #             "orderId": "1234",                       # Order Id
        #             "origQty": "34244",                      # Original Quantity
        #             "avgPrice": "123",                       # Quantity
        #             "price": "1111",                         # Average price
        #             "executedQty": "34244",                  # Volume(Cont)
        #             "orderSide": "BUY",                      # BUY, SELL
        #             "positionSide": "LONG",                  # LONG, SHORT
        #             "marginFrozen": "123",                   # Occupied margin
        #             "sourceType": "default",                 # DEFAULT:normal order,ENTRUST:plan commission,PROFIR:Take Profit and Stop Loss
        #             "sourceId" : "1231231",                  # Triggering conditions ID
        #             "state": "",                             # state:NEW：New order(unfilled);PARTIALLY_FILLED:Partial deal;PARTIALLY_CANCELED:Partial revocation;FILLED:Filled;CANCELED:Cancled;REJECTED:Order failed;EXPIRED：Expired
        #             "createTime": 1731231231,                # CreateTime
        #             "clientOrderId": "204788317630342726"
        #           }
        #    }
        #
        orders = self.orders
        if orders is None:
            limit = self.safe_integer(self.options, 'ordersLimit')
            orders = ArrayCacheBySymbolById(limit)
            self.orders = orders
        order = self.safe_dict(message, 'data', {})
        marketId = self.safe_string_2(order, 's', 'symbol')
        if marketId is not None:
            tradeType = 'contract' if ('symbol' in order) else 'spot'
            market = self.safe_market(marketId, None, None, tradeType)
            parsed = self.parse_ws_order(order, market)
            orders.append(parsed)
            client.resolve(orders, 'order::' + tradeType)
        return message

    def handle_balance(self, client: Client, message: dict):
        #
        # spot
        #
        #    {
        #        topic: 'balance',
        #        event: 'balance',
        #        data: {
        #            a: 3513677381884,
        #            t: 1684250056775,
        #            c: 'usdt',
        #            b: '7.71000000',
        #            f: '0.00000000',
        #            z: 'SPOT'
        #        }
        #    }
        #
        # contract
        #
        #    {
        #        "topic": "balance",
        #        "event": "balance@123456",
        #        "data": {
        #            "coin": "usdt",
        #            "underlyingType": 1,                          # 1:Coin-M,2:USDT-M
        #            "walletBalance": "123",                       # Balance
        #            "openOrderMarginFrozen": "123",               # Frozen order
        #            "isolatedMargin": "213",                      # Isolated Margin
        #            "crossedMargin": "0"                          # Crossed Margin
        #            "availableBalance": '2.256114450000000000',
        #            "coupon": '0',
        #            "bonus": '0'
        #        }
        #    }
        #
        data = self.safe_dict(message, 'data', {})
        currencyId = self.safe_string_2(data, 'c', 'coin')
        code = self.safe_currency_code(currencyId)
        account = self.account()
        account['free'] = self.safe_string(data, 'availableBalance')
        account['used'] = self.safe_string(data, 'f')
        account['total'] = self.safe_string_2(data, 'b', 'walletBalance')
        self.balance[code] = account
        self.balance = self.safe_balance(self.balance)
        tradeType = 'contract' if ('coin' in data) else 'spot'
        client.resolve(self.balance, 'balance::' + tradeType)

    def handle_my_trades(self, client: Client, message: dict):
        #
        # spot
        #
        #    {
        #        "topic": "trade",
        #        "event": "trade",
        #        "data": {
        #            "s": "btc_usdt",                # symbol
        #            "t": 1656043204763,             # time
        #            "i": "6316559590087251233",     # tradeId
        #            "oi": "6216559590087220004",    # orderId
        #            "p": "30000",                   # trade price
        #            "q": "3",                       # qty quantity
        #            "v": "90000"                    # volume trade amount
        #        }
        #    }
        #
        # contract
        #
        #    {
        #       "topic": "trade",
        #       "event": "trade@123456",
        #       "data": {
        #            "symbol": 'btc_usdt',
        #            "orderSide": 'SELL',
        #            "positionSide": 'LONG',
        #            "orderId": '231485367663419328',
        #            "price": '27152.7',
        #            "quantity": '33',
        #            "marginUnfrozen": '2.85318000',
        #            "timestamp": 1684892412565
        #        }
        #    }
        #
        data = self.safe_dict(message, 'data', {})
        stored = self.myTrades
        if stored is None:
            limit = self.safe_integer(self.options, 'tradesLimit', 1000)
            stored = ArrayCacheBySymbolById(limit)
            self.myTrades = stored
        parsedTrade = self.parse_trade(data)
        market = self.market(parsedTrade['symbol'])
        stored.append(parsedTrade)
        tradeType = 'contract' if market['contract'] else 'spot'
        client.resolve(stored, 'trade::' + tradeType)

    def handle_message(self, client: Client, message):
        event = self.safe_string(message, 'event')
        if event == 'pong':
            client.onPong()
        elif event is not None:
            topic = self.safe_string(message, 'topic')
            methods = {
                'kline': self.handle_ohlcv,
                'depth': self.handle_order_book,
                'depth_update': self.handle_order_book,
                'ticker': self.handle_ticker,
                'agg_ticker': self.handle_ticker,
                'tickers': self.handle_tickers,
                'agg_tickers': self.handle_tickers,
                'balance': self.handle_balance,
                'order': self.handle_order,
                'position': self.handle_position,
            }
            method = self.safe_value(methods, topic)
            if topic == 'trade':
                data = self.safe_dict(message, 'data')
                if ('oi' in data) or ('orderId' in data):
                    method = self.handle_my_trades
                else:
                    method = self.handle_trade
            if method is not None:
                method(client, message)

    def ping(self, client: Client):
        client.lastPong = self.milliseconds()
        return 'ping'

    def handle_error_message(self, client: Client, message: dict):
        #
        #    {
        #        "id": "123",
        #        "code": 401,
        #        "msg": "token expire"
        #    }
        #
        msg = self.safe_string(message, 'msg')
        if (msg == 'invalid_listen_key') or (msg == 'token expire'):
            client.subscriptions['token'] = None
            self.get_listen_key(True)
            return
        client.reject(message)
