# -*- coding: utf-8 -*-

# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code

import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp
import asyncio
import hashlib
import json
from ccxt.base.types import Any, Int, Order, OrderBook, Position, Str, Strings, Ticker, Tickers, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import AuthenticationError
from ccxt.base.errors import ArgumentsRequired


class apex(ccxt.async_support.apex):

    def describe(self) -> Any:
        return self.deep_extend(super(apex, self).describe(), {
            'has': {
                'ws': True,
                'watchTicker': True,
                'watchTickers': True,
                'watchOrderBook': True,
                'watchOrders': True,
                'watchTrades': True,
                'watchTradesForSymbols': False,
                'watchPositions': True,
                'watchMyTrades': True,
                'watchBalance': False,
                'watchOHLCV': True,
            },
            'urls': {
                'logo': 'https://omni.apex.exchange/assets/logo_content-CY9uyFbz.svg',
                'api': {
                    'ws': {
                        'public': 'wss://quote.omni.apex.exchange/realtime_public?v=2',
                        'private': 'wss://quote.omni.apex.exchange/realtime_private?v=2',
                    },
                },
                'test': {
                    'ws': {
                        'public': 'wss://qa-quote.omni.apex.exchange/realtime_public?v=2',
                        'private': 'wss://qa-quote.omni.apex.exchange/realtime_private?v=2',
                    },
                },
                'www': 'https://apex.exchange/',
                'doc': 'https://api-docs.pro.apex.exchange',
                'fees': 'https://apex-pro.gitbook.io/apex-pro/apex-omni-live-now/trading-perpetual-contracts/trading-fees',
                'referral': 'https://omni.apex.exchange/trade',
            },
            'options': {},
            'streaming': {
                'ping': self.ping,
                'keepAlive': 18000,
            },
        })

    async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        watches information on multiple trades made in a market

        https://api-docs.pro.apex.exchange/#websocket-v3-for-omni-websocket-endpoint

        :param str symbol: unified market symbol of the market trades were made in
        :param int [since]: the earliest time in ms to fetch trades for
        :param int [limit]: the maximum number of trade structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=trade-structure>`
        """
        return await self.watch_trades_for_symbols([symbol], since, limit, params)

    async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a list of symbols

        https://api-docs.pro.apex.exchange/#websocket-v3-for-omni-websocket-endpoint

        :param str[] symbols: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols)
        symbolsLength = len(symbols)
        if symbolsLength == 0:
            raise ArgumentsRequired(self.id + ' watchTradesForSymbols() requires a non-empty array of symbols')
        timeStamp = str(self.milliseconds())
        url = self.urls['api']['ws']['public'] + '&timestamp=' + timeStamp
        topics = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            market = self.market(symbol)
            topic = 'recentlyTrade.H.' + market['id2']
            topics.append(topic)
            messageHash = 'trade:' + symbol
            messageHashes.append(messageHash)
        trades = await self.watch_topics(url, messageHashes, topics, params)
        if self.newUpdates:
            first = self.safe_value(trades, 0)
            tradeSymbol = self.safe_string(first, 'symbol')
            limit = trades.getLimit(tradeSymbol, limit)
        return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)

    def handle_trades(self, client: Client, message):
        #
        #     {
        #         "topic": "recentlyTrade.H.BTCUSDT",
        #         "type": "snapshot",
        #         "ts": 1672304486868,
        #         "data": [
        #             {
        #                 "T": 1672304486865,
        #                 "s": "BTCUSDT",
        #                 "S": "Buy",
        #                 "v": "0.001",
        #                 "p": "16578.50",
        #                 "L": "PlusTick",
        #                 "i": "20f43950-d8dd-5b31-9112-a178eb6023af",
        #                 "BT": False
        #             }
        #         ]
        #     }
        #
        data = self.safe_value(message, 'data', {})
        topic = self.safe_string(message, 'topic')
        trades = data
        parts = topic.split('.')
        marketId = self.safe_string(parts, 2)
        market = self.safe_market(marketId, None, None)
        symbol = market['symbol']
        stored = self.safe_value(self.trades, symbol)
        if stored is None:
            limit = self.safe_integer(self.options, 'tradesLimit', 1000)
            stored = ArrayCache(limit)
            self.trades[symbol] = stored
        for j in range(0, len(trades)):
            parsed = self.parse_ws_trade(trades[j], market)
            stored.append(parsed)
        messageHash = 'trade' + ':' + symbol
        client.resolve(stored, messageHash)

    def parse_ws_trade(self, trade, market=None):
        #
        # public
        #    {
        #         "T": 1672304486865,
        #         "s": "BTCUSDT",
        #         "S": "Buy",
        #         "v": "0.001",
        #         "p": "16578.50",
        #         "L": "PlusTick",
        #         "i": "20f43950-d8dd-5b31-9112-a178eb6023af",
        #         "BT": False
        #     }
        #
        id = self.safe_string_n(trade, ['i', 'id', 'v'])
        marketId = self.safe_string_n(trade, ['s', 'symbol'])
        market = self.safe_market(marketId, market, None)
        symbol = market['symbol']
        timestamp = self.safe_integer_n(trade, ['t', 'T', 'createdAt'])
        side = self.safe_string_lower_n(trade, ['S', 'side'])
        price = self.safe_string_n(trade, ['p', 'price'])
        amount = self.safe_string_n(trade, ['q', 'v', 'size'])
        return self.safe_trade({
            'id': id,
            'info': trade,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'symbol': symbol,
            'order': None,
            'type': None,
            'side': side,
            'takerOrMaker': None,
            'price': price,
            'amount': amount,
            'cost': None,
            'fee': None,
        }, market)

    async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://api-docs.pro.apex.exchange/#websocket-v3-for-omni-websocket-endpoint

        :param str symbol: unified symbol of the market to fetch the order book for
        :param int [limit]: the maximum amount of order book entries to return.
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        return await self.watch_order_book_for_symbols([symbol], limit, params)

    async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://api-docs.pro.apex.exchange/#websocket-v3-for-omni-websocket-endpoint

        :param str[] symbols: unified array of symbols
        :param int [limit]: the maximum amount of order book entries to return.
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        await self.load_markets()
        symbolsLength = len(symbols)
        if symbolsLength == 0:
            raise ArgumentsRequired(self.id + ' watchOrderBookForSymbols() requires a non-empty array of symbols')
        symbols = self.market_symbols(symbols)
        timeStamp = str(self.milliseconds())
        url = self.urls['api']['ws']['public'] + '&timestamp=' + timeStamp
        topics = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            market = self.market(symbol)
            if limit is None:
                limit = 25
            topic = 'orderBook' + str(limit) + '.H.' + market['id2']
            topics.append(topic)
            messageHash = 'orderbook:' + symbol
            messageHashes.append(messageHash)
        orderbook = await self.watch_topics(url, messageHashes, topics, params)
        return orderbook.limit()

    async def watch_topics(self, url, messageHashes, topics, params={}):
        request: dict = {
            'op': 'subscribe',
            'args': topics,
        }
        message = self.extend(request, params)
        return await self.watch_multiple(url, messageHashes, message, messageHashes)

    def handle_order_book(self, client: Client, message):
        #
        #     {
        #         "topic": "orderbook25.H.BTCUSDT",
        #         "type": "snapshot",
        #         "ts": 1672304484978,
        #         "data": {
        #             "s": "BTCUSDT",
        #             "b": [
        #                 ...,
        #                 [
        #                     "16493.50",
        #                     "0.006"
        #                 ],
        #                 [
        #                     "16493.00",
        #                     "0.100"
        #                 ]
        #             ],
        #             "a": [
        #                 [
        #                     "16611.00",
        #                     "0.029"
        #                 ],
        #                 [
        #                     "16612.00",
        #                     "0.213"
        #                 ],
        #             ],
        #             "u": 18521288,
        #             "seq": 7961638724
        #         }
        #     }
        #
        type = self.safe_string(message, 'type')
        isSnapshot = (type == 'snapshot')
        data = self.safe_dict(message, 'data', {})
        marketId = self.safe_string(data, 's')
        market = self.safe_market(marketId, None, None)
        symbol = market['symbol']
        timestamp = self.safe_integer_product(message, 'ts', 0.001)
        if not (symbol in self.orderbooks):
            self.orderbooks[symbol] = self.order_book()
        orderbook = self.orderbooks[symbol]
        if isSnapshot:
            snapshot = self.parse_order_book(data, symbol, timestamp, 'b', 'a')
            orderbook.reset(snapshot)
        else:
            asks = self.safe_list(data, 'a', [])
            bids = self.safe_list(data, 'b', [])
            self.handle_deltas(orderbook['asks'], asks)
            self.handle_deltas(orderbook['bids'], bids)
            orderbook['timestamp'] = timestamp
            orderbook['datetime'] = self.iso8601(timestamp)
        messageHash = 'orderbook' + ':' + symbol
        self.orderbooks[symbol] = orderbook
        client.resolve(orderbook, messageHash)

    def handle_delta(self, bookside, delta):
        bidAsk = self.parse_bid_ask(delta, 0, 1)
        bookside.storeArray(bidAsk)

    def handle_deltas(self, bookside, deltas):
        for i in range(0, len(deltas)):
            self.handle_delta(bookside, deltas[i])

    async def watch_ticker(self, symbol: str, params={}) -> Ticker:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market

        https://api-docs.pro.apex.exchange/#websocket-v3-for-omni-websocket-endpoint

        :param str symbol: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        market = self.market(symbol)
        symbol = market['symbol']
        timeStamp = str(self.milliseconds())
        url = self.urls['api']['ws']['public'] + '&timestamp=' + timeStamp
        messageHash = 'ticker:' + symbol
        topic = 'instrumentInfo' + '.H.' + market['id2']
        topics = [topic]
        return await self.watch_topics(url, [messageHash], topics, params)

    async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list

        https://api-docs.pro.apex.exchange/#websocket-v3-for-omni-websocket-endpoint

        :param str[] symbols: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False)
        messageHashes = []
        timeStamp = str(self.milliseconds())
        url = self.urls['api']['ws']['public'] + '&timestamp=' + timeStamp
        topics = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            market = self.market(symbol)
            topic = 'instrumentInfo' + '.H.' + market['id2']
            topics.append(topic)
            messageHash = 'ticker:' + symbol
            messageHashes.append(messageHash)
        ticker = await self.watch_topics(url, messageHashes, topics, params)
        if self.newUpdates:
            result: dict = {}
            result[ticker['symbol']] = ticker
            return result
        return self.filter_by_array(self.tickers, 'symbol', symbols)

    def handle_ticker(self, client: Client, message):
        # "topic":"instrumentInfo.H.BTCUSDT",
        #     "type":"snapshot",
        #     "data":{
        #     "symbol":"BTCUSDT",
        #         "lastPrice":"21572.5",
        #         "price24hPcnt":"-0.0194318181818182",
        #         "highPrice24h":"25306.5",
        #         "lowPrice24h":"17001.5",
        #         "turnover24h":"1334891.4545",
        #         "volume24h":"64.896",
        #         "nextFundingTime":"2022-08-26T08:00:00Z",
        #         "oraclePrice":"21412.060000000002752512",
        #         "indexPrice":"21409.82",
        #         "openInterest":"49.598",
        #         "tradeCount":"0",
        #         "fundingRate":"0.0000125",
        #         "predictedFundingRate":"0.0000125"
        # },
        #     "cs":44939063,
        #     "ts":1661500091955487
        # }
        topic = self.safe_string(message, 'topic', '')
        updateType = self.safe_string(message, 'type', '')
        data = self.safe_dict(message, 'data', {})
        symbol = None
        parsed = None
        if (updateType == 'snapshot'):
            parsed = self.parse_ticker(data)
            symbol = parsed['symbol']
        elif updateType == 'delta':
            topicParts = topic.split('.')
            topicLength = len(topicParts)
            marketId = self.safe_string(topicParts, topicLength - 1)
            market = self.safe_market(marketId, None, None)
            symbol = market['symbol']
            ticker = self.safe_dict(self.tickers, symbol, {})
            rawTicker = self.safe_dict(ticker, 'info', {})
            merged = self.extend(rawTicker, data)
            parsed = self.parse_ticker(merged)
        timestamp = self.safe_integer_product(message, 'ts', 0.001)
        parsed['timestamp'] = timestamp
        parsed['datetime'] = self.iso8601(timestamp)
        self.tickers[symbol] = parsed
        messageHash = 'ticker:' + symbol
        client.resolve(self.tickers[symbol], messageHash)

    async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market

        https://api-docs.pro.apex.exchange/#websocket-v3-for-omni-websocket-endpoint

        :param str symbol: unified symbol of the market to fetch OHLCV data for
        :param str timeframe: the length of time each candle represents
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        params['callerMethodName'] = 'watchOHLCV'
        result = await self.watch_ohlcv_for_symbols([[symbol, timeframe]], since, limit, params)
        return result[symbol][timeframe]

    async def watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], since: Int = None, limit: Int = None, params={}):
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market

        https://api-docs.pro.apex.exchange/#websocket-v3-for-omni-websocket-endpoint

        :param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A list of candles ordered, open, high, low, close, volume
        """
        await self.load_markets()
        timeStamp = str(self.milliseconds())
        url = self.urls['api']['ws']['public'] + '&timestamp=' + timeStamp
        rawHashes = []
        messageHashes = []
        for i in range(0, len(symbolsAndTimeframes)):
            data = symbolsAndTimeframes[i]
            symbolString = self.safe_string(data, 0)
            market = self.market(symbolString)
            symbolString = market['id2']
            unfiedTimeframe = self.safe_string(data, 1, '1')
            timeframeId = self.safe_string(self.timeframes, unfiedTimeframe, unfiedTimeframe)
            rawHashes.append('candle.' + timeframeId + '.' + symbolString)
            messageHashes.append('ohlcv::' + symbolString + '::' + unfiedTimeframe)
        symbol, timeframe, stored = await self.watch_topics(url, messageHashes, rawHashes, params)
        if self.newUpdates:
            limit = stored.getLimit(symbol, limit)
        filtered = self.filter_by_since_limit(stored, since, limit, 0, True)
        return self.create_ohlcv_object(symbol, timeframe, filtered)

    def handle_ohlcv(self, client: Client, message):
        #
        #     {
        #         "topic": "candle.5.BTCUSDT",
        #         "data": [
        #             {
        #                 "start": 1672324800000,
        #                 "end": 1672325099999,
        #                 "interval": "5",
        #                 "open": "16649.5",
        #                 "close": "16677",
        #                 "high": "16677",
        #                 "low": "16608",
        #                 "volume": "2.081",
        #                 "turnover": "34666.4005",
        #                 "confirm": False,
        #                 "timestamp": 1672324988882
        #             }
        #         ],
        #         "ts": 1672324988882,
        #         "type": "snapshot"
        #     }
        #
        data = self.safe_value(message, 'data', {})
        topic = self.safe_string(message, 'topic')
        topicParts = topic.split('.')
        topicLength = len(topicParts)
        timeframeId = self.safe_string(topicParts, 1)
        timeframe = self.find_timeframe(timeframeId)
        marketId = self.safe_string(topicParts, topicLength - 1)
        isSpot = client.url.find('spot') > -1
        marketType = 'spot' if isSpot else 'contract'
        market = self.safe_market(marketId, None, None, marketType)
        symbol = market['symbol']
        ohlcvsByTimeframe = self.safe_value(self.ohlcvs, symbol)
        if ohlcvsByTimeframe is None:
            self.ohlcvs[symbol] = {}
        if self.safe_value(ohlcvsByTimeframe, timeframe) is None:
            limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
            self.ohlcvs[symbol][timeframe] = ArrayCacheByTimestamp(limit)
        stored = self.ohlcvs[symbol][timeframe]
        for i in range(0, len(data)):
            parsed = self.parse_ws_ohlcv(data[i])
            stored.append(parsed)
        messageHash = 'ohlcv::' + symbol + '::' + timeframe
        resolveData = [symbol, timeframe, stored]
        client.resolve(resolveData, messageHash)

    def parse_ws_ohlcv(self, ohlcv, market=None) -> list:
        #
        #     {
        #         "start": 1670363160000,
        #         "end": 1670363219999,
        #         "interval": "1",
        #         "open": "16987.5",
        #         "close": "16987.5",
        #         "high": "16988",
        #         "low": "16987.5",
        #         "volume": "23.511",
        #         "turnover": "399396.344",
        #         "confirm": False,
        #         "timestamp": 1670363219614
        #     }
        #
        return [
            self.safe_integer(ohlcv, 'start'),
            self.safe_number(ohlcv, 'open'),
            self.safe_number(ohlcv, 'high'),
            self.safe_number(ohlcv, 'low'),
            self.safe_number(ohlcv, 'close'),
            self.safe_number_2(ohlcv, 'volume', 'turnover'),
        ]

    async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        watches information on multiple trades made by the user

        https://api-docs.pro.apex.exchange/#private-websocket

        :param str symbol: unified market symbol of the market orders were made in
        :param int [since]: the earliest time in ms to fetch orders for
        :param int [limit]: the maximum number of order structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param boolean [params.unifiedMargin]: use unified margin account
        :returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        messageHash = 'myTrades'
        await self.load_markets()
        if symbol is not None:
            symbol = self.symbol(symbol)
            messageHash += ':' + symbol
        timeStamp = str(self.milliseconds())
        url = self.urls['api']['ws']['private'] + '&timestamp=' + timeStamp
        await self.authenticate(url)
        trades = await self.watch_topics(url, [messageHash], ['myTrades'], params)
        if self.newUpdates:
            limit = trades.getLimit(symbol, limit)
        return self.filter_by_symbol_since_limit(trades, symbol, since, limit, True)

    async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]:
        """

        https://api-docs.pro.apex.exchange/#private-websocket

        watch all open positions
        :param str[] [symbols]: list of unified market symbols
        :param int [since]: the earliest time in ms to fetch positions for
        :param int [limit]: the maximum number of positions to retrieve
        :param dict params: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
        """
        await self.load_markets()
        messageHash = ''
        if not self.is_empty(symbols):
            symbols = self.market_symbols(symbols)
            messageHash = '::' + ','.join(symbols)
        timeStamp = str(self.milliseconds())
        url = self.urls['api']['ws']['private'] + '&timestamp=' + timeStamp
        messageHash = 'positions' + messageHash
        client = self.client(url)
        await self.authenticate(url)
        self.set_positions_cache(client, symbols)
        cache = self.positions
        if cache is None:
            snapshot = await client.future('fetchPositionsSnapshot')
            return self.filter_by_symbols_since_limit(snapshot, symbols, since, limit, True)
        topics = ['positions']
        newPositions = await self.watch_topics(url, [messageHash], topics, params)
        if self.newUpdates:
            return newPositions
        return self.filter_by_symbols_since_limit(cache, symbols, since, limit, True)

    async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
        """
        watches information on multiple orders made by the user

        https://api-docs.pro.apex.exchange/#private-websocket

        :param str symbol: unified market symbol of the market orders were made in
        :param int [since]: the earliest time in ms to fetch orders for
        :param int [limit]: the maximum number of order structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        await self.load_markets()
        messageHash = 'orders'
        if symbol is not None:
            symbol = self.symbol(symbol)
            messageHash += ':' + symbol
        timeStamp = str(self.milliseconds())
        url = self.urls['api']['ws']['private'] + '&timestamp=' + timeStamp
        await self.authenticate(url)
        topics = ['orders']
        orders = await self.watch_topics(url, [messageHash], topics, params)
        if self.newUpdates:
            limit = orders.getLimit(symbol, limit)
        return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)

    def handle_my_trades(self, client: Client, lists):
        # [
        #     {
        #         "symbol":"ETH-USDT",
        #         "side":"BUY",
        #         "orderId":"2048046080",
        #         "fee":"0.625000",
        #         "liquidity":"TAKER",
        #         "accountId":"1024000",
        #         "createdAt":1652185521361,
        #         "isOpen":true,
        #         "size":"0.500",
        #         "price":"2500.0",
        #         "quoteAmount":"1250.0000",
        #         "id":"2048000182272",
        #         "updatedAt":1652185678345
        #     }
        # ]
        if self.myTrades is None:
            limit = self.safe_integer(self.options, 'tradesLimit', 1000)
            self.myTrades = ArrayCacheBySymbolById(limit)
        trades = self.myTrades
        symbols: dict = {}
        for i in range(0, len(lists)):
            rawTrade = lists[i]
            parsed = None
            parsed = self.parse_ws_trade(rawTrade)
            symbol = parsed['symbol']
            symbols[symbol] = True
            trades.append(parsed)
        keys = list(symbols.keys())
        for i in range(0, len(keys)):
            currentMessageHash = 'myTrades:' + keys[i]
            client.resolve(trades, currentMessageHash)
        # non-symbol specific
        messageHash = 'myTrades'
        client.resolve(trades, messageHash)

    def handle_order(self, client: Client, lists):
        # [
        #     {
        #         "symbol":"ETH-USDT",
        #         "cumSuccessFillFee":"0.625000",
        #         "trailingPercent":"0",
        #         "type":"LIMIT",
        #         "unfillableAt":1654779600000,
        #         "isDeleverage":false,
        #         "createdAt":1652185521339,
        #         "price":"2500.0",
        #         "cumSuccessFillValue":"0",
        #         "id":"2048046080",
        #         "cancelReason":"",
        #         "timeInForce":1,
        #         "updatedAt":1652185521392,
        #         "limitFee":"0.625000",
        #         "side":"BUY",
        #         "clientOrderId":"522843990",
        #         "triggerPrice":"",
        #         "expiresAt":1654779600000,
        #         "cumSuccessFillSize":"0",
        #         "accountId":"1024000",
        #         "size":"0.500",
        #         "reduceOnly":false,
        #         "isLiquidate":false,
        #         "remainingSize":"0.000",
        #         "status":"PENDING"
        #     }
        # ]
        if self.orders is None:
            limit = self.safe_integer(self.options, 'ordersLimit', 1000)
            self.orders = ArrayCacheBySymbolById(limit)
        orders = self.orders
        symbols: dict = {}
        for i in range(0, len(lists)):
            parsed = None
            parsed = self.parse_order(lists[i])
            symbol = parsed['symbol']
            symbols[symbol] = True
            orders.append(parsed)
        symbolsArray = list(symbols.keys())
        for i in range(0, len(symbolsArray)):
            currentMessageHash = 'orders:' + symbolsArray[i]
            client.resolve(orders, currentMessageHash)
        messageHash = 'orders'
        client.resolve(orders, messageHash)

    def set_positions_cache(self, client: Client, symbols: Strings = None):
        if self.positions is not None:
            return
        messageHash = 'fetchPositionsSnapshot'
        if not (messageHash in client.futures):
            client.future(messageHash)
            self.spawn(self.load_positions_snapshot, client, messageHash)

    async def load_positions_snapshot(self, client, messageHash):
        # one ws channel gives positions for all types, for snapshot must load all positions
        fetchFunctions = [
            self.fetch_positions(None),
        ]
        promises = await asyncio.gather(*fetchFunctions)
        self.positions = ArrayCacheBySymbolBySide()
        cache = self.positions
        for i in range(0, len(promises)):
            positions = promises[i]
            for ii in range(0, len(positions)):
                position = positions[ii]
                cache.append(position)
        # don't remove the future from the .futures cache
        future = client.futures[messageHash]
        future.resolve(cache)
        client.resolve(cache, 'positions')

    def handle_positions(self, client, lists):
        #
        # [
        #     {
        #         "symbol":"ETH-USDT",
        #         "exitPrice":"0",
        #         "side":"LONG",
        #         "maxSize":"2820.000",
        #         "sumOpen":"1.820",
        #         "sumClose":"0.000",
        #         "netFunding":"0.000000",
        #         "entryPrice":"2500.000000000000000000",
        #         "accountId":"1024000",
        #         "createdAt":1652179377769,
        #         "size":"1.820",
        #         "realizedPnl":"0",
        #         "closedAt":1652185521392,
        #         "updatedAt":1652185521392
        #     }
        # ]
        #
        # each account is connected to a different endpoint
        # and has exactly one subscriptionhash which is the account type
        if self.positions is None:
            self.positions = ArrayCacheBySymbolBySide()
        cache = self.positions
        newPositions = []
        for i in range(0, len(lists)):
            rawPosition = lists[i]
            position = self.parse_position(rawPosition)
            side = self.safe_string(position, 'side')
            # hacky solution to handle closing positions
            # without crashing, we should handle self properly later
            newPositions.append(position)
            if side is None or side == '':
                # closing update, adding both sides to "reset" both sides
                # since we don't know which side is being closed
                position['side'] = 'long'
                cache.append(position)
                position['side'] = 'short'
                cache.append(position)
                position['side'] = None
            else:
                # regular update
                cache.append(position)
        messageHashes = self.find_message_hashes(client, 'positions::')
        for i in range(0, len(messageHashes)):
            messageHash = messageHashes[i]
            parts = messageHash.split('::')
            symbolsString = parts[1]
            symbols = symbolsString.split(',')
            positions = self.filter_by_array(newPositions, 'symbol', symbols, False)
            if not self.is_empty(positions):
                client.resolve(positions, messageHash)
        client.resolve(newPositions, 'positions')

    async def authenticate(self, url, params={}):
        self.check_required_credentials()
        timestamp = str(self.milliseconds())
        request_path = '/ws/accounts'
        http_method = 'GET'
        messageString = (timestamp + http_method + request_path)
        signature = self.hmac(self.encode(messageString), self.encode(self.string_to_base64(self.secret)), hashlib.sha256, 'base64')
        messageHash = 'authenticated'
        client = self.client(url)
        future = client.future(messageHash)
        authenticated = self.safe_value(client.subscriptions, messageHash)
        if authenticated is None:
            # auth sign
            request = {
                'type': 'login',
                'topics': ['ws_zk_accounts_v3'],
                'httpMethod': http_method,
                'requestPath': request_path,
                'apiKey': self.apiKey,
                'passphrase': self.password,
                'timestamp': timestamp,
                'signature': signature,
            }
            message = {
                'op': 'login',
                'args': [json.dumps(request)],
            }
            self.watch(url, messageHash, message, messageHash)
        return await future

    def handle_error_message(self, client: Client, message):
        #
        #   {
        #       "success": False,
        #       "ret_msg": "error:invalid op",
        #       "conn_id": "5e079fdd-9c7f-404d-9dbf-969d650838b5",
        #       "request": {op: '', args: null}
        #   }
        #
        # auth error
        #
        #   {
        #       "success": False,
        #       "ret_msg": "error:USVC1111",
        #       "conn_id": "e73770fb-a0dc-45bd-8028-140e20958090",
        #       "request": {
        #         "op": "auth",
        #         "args": [
        #           "9rFT6uR4uz9Imkw4Wx",
        #           "1653405853543",
        #           "542e71bd85597b4db0290f0ce2d13ed1fd4bb5df3188716c1e9cc69a879f7889"
        #         ]
        #   }
        #
        #   {code: '-10009', desc: "Invalid period!"}
        #
        #   {
        #       "reqId":"1",
        #       "retCode":170131,
        #       "retMsg":"Insufficient balance.",
        #       "op":"order.create",
        #       "data":{
        #
        #       },
        #       "header":{
        #           "X-Bapi-Limit":"20",
        #           "X-Bapi-Limit-Status":"19",
        #           "X-Bapi-Limit-Reset-Timestamp":"1714236608944",
        #           "Traceid":"3d7168a137bf32a947b7e5e6a575ac7f",
        #           "Timenow":"1714236608946"
        #       },
        #       "connId":"cojifin88smerbj9t560-406"
        #   }
        #
        code = self.safe_string_n(message, ['code', 'ret_code', 'retCode'])
        try:
            if code is not None and code != '0':
                feedback = self.id + ' ' + self.json(message)
                self.throw_exactly_matched_exception(self.exceptions['exact'], code, feedback)
                msg = self.safe_string_2(message, 'retMsg', 'ret_msg')
                self.throw_broadly_matched_exception(self.exceptions['broad'], msg, feedback)
                raise ExchangeError(feedback)
            success = self.safe_value(message, 'success')
            if success is not None and not success:
                ret_msg = self.safe_string(message, 'ret_msg')
                request = self.safe_value(message, 'request', {})
                op = self.safe_string(request, 'op')
                if op == 'auth':
                    raise AuthenticationError('Authentication failed: ' + ret_msg)
                else:
                    raise ExchangeError(self.id + ' ' + ret_msg)
            return False
        except Exception as error:
            if isinstance(error, AuthenticationError):
                messageHash = 'authenticated'
                client.reject(error, messageHash)
                if messageHash in client.subscriptions:
                    del client.subscriptions[messageHash]
            else:
                messageHash = self.safe_string(message, 'reqId')
                client.reject(error, messageHash)
            return True

    def handle_message(self, client: Client, message):
        if self.handle_error_message(client, message):
            return
        topic = self.safe_string_2(message, 'topic', 'op', '')
        methods: dict = {
            'ws_zk_accounts_v3': self.handle_account,
            'orderBook': self.handle_order_book,
            'depth': self.handle_order_book,
            'candle': self.handle_ohlcv,
            'kline': self.handle_ohlcv,
            'ticker': self.handle_ticker,
            'instrumentInfo': self.handle_ticker,
            'trade': self.handle_trades,
            'recentlyTrade': self.handle_trades,
            'pong': self.handle_pong,
            'auth': self.handle_authenticate,
        }
        exacMethod = self.safe_value(methods, topic)
        if exacMethod is not None:
            exacMethod(client, message)
            return
        keys = list(methods.keys())
        for i in range(0, len(keys)):
            key = keys[i]
            if topic.find(keys[i]) >= 0:
                method = methods[key]
                method(client, message)
                return
        # unified auth acknowledgement
        type = self.safe_string(message, 'type')
        if type == 'AUTH_RESP':
            self.handle_authenticate(client, message)

    def ping(self, client: Client):
        timeStamp = str(self.milliseconds())
        return {
            'args': [timeStamp],
            'op': 'ping',
        }

    def handle_pong(self, client: Client, message):
        #
        #   {
        #       "success": True,
        #       "ret_msg": "pong",
        #       "conn_id": "db3158a0-8960-44b9-a9de-ac350ee13158",
        #       "request": {op: "ping", args: null}
        #   }
        #
        #   {pong: 1653296711335}
        #
        client.lastPong = self.safe_integer(message, 'pong')
        return message

    def handle_account(self, client: Client, message):
        contents = self.safe_dict(message, 'contents', {})
        fills = self.safe_list(contents, 'fills', [])
        if fills is not None:
            self.handle_my_trades(client, fills)
        positions = self.safe_list(contents, 'positions', [])
        if positions is not None:
            self.handle_positions(client, positions)
        orders = self.safe_list(contents, 'orders', [])
        if orders is not None:
            self.handle_order(client, orders)

    def handle_authenticate(self, client: Client, message):
        #
        #    {
        #        "success": True,
        #        "ret_msg": '',
        #        "op": "auth",
        #        "conn_id": "ce3dpomvha7dha97tvp0-2xh"
        #    }
        #
        success = self.safe_value(message, 'success')
        code = self.safe_integer(message, 'retCode')
        messageHash = 'authenticated'
        if success or code == 0:
            future = self.safe_value(client.futures, messageHash)
            future.resolve(True)
        else:
            error = AuthenticationError(self.id + ' ' + self.json(message))
            client.reject(error, messageHash)
            if messageHash in client.subscriptions:
                del client.subscriptions[messageHash]
        return message

    def handle_subscription_status(self, client: Client, message):
        #
        #    {
        #        "topic": "kline",
        #        "event": "sub",
        #        "params": {
        #          "symbol": "LTCUSDT",
        #          "binary": "false",
        #          "klineType": "1m",
        #          "symbolName": "LTCUSDT"
        #        },
        #        "code": "0",
        #        "msg": "Success"
        #    }
        #
        return message
