# -*- coding: utf-8 -*-

# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code

import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp
import hashlib
from ccxt.base.types import Any, Balances, Int, Liquidation, Num, Order, OrderBook, OrderSide, OrderType, Position, Str, Strings, Ticker, Tickers, FundingRate, FundingRates, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import AuthenticationError
from ccxt.base.errors import ArgumentsRequired
from ccxt.base.errors import BadRequest
from ccxt.base.errors import InvalidNonce
from ccxt.base.errors import ChecksumError


class okx(ccxt.async_support.okx):

    def describe(self) -> Any:
        return self.deep_extend(super(okx, self).describe(), {
            'has': {
                'ws': True,
                'watchTicker': True,
                'watchMarkPrice': True,
                'watchMarkPrices': True,
                'watchTickers': True,
                'watchBidsAsks': True,
                'watchOrderBook': True,
                'watchTrades': True,
                'watchTradesForSymbols': True,
                'watchOrderBookForSymbols': True,
                'watchBalance': True,
                'watchLiquidations': 'emulated',
                'watchLiquidationsForSymbols': True,
                'watchMyLiquidations': 'emulated',
                'watchMyLiquidationsForSymbols': True,
                'watchOHLCV': True,
                'watchOHLCVForSymbols': True,
                'watchOrders': True,
                'watchMyTrades': True,
                'watchPositions': True,
                'watchFundingRate': True,
                'watchFundingRates': True,
                'createOrderWs': True,
                'editOrderWs': True,
                'cancelOrderWs': True,
                'cancelOrdersWs': True,
                'cancelAllOrdersWs': True,
            },
            'urls': {
                'api': {
                    'ws': 'wss://ws.okx.com:8443/ws/v5',
                },
                'test': {
                    'ws': 'wss://wspap.okx.com:8443/ws/v5',
                },
            },
            'options': {
                'watchOrderBook': {
                    'checksum': True,
                    #
                    # bbo-tbt
                    # 1. Newly added channel that sends tick-by-tick Level 1 data
                    # 2. All API users can subscribe
                    # 3. Public depth channel, verification not required
                    #
                    # books-l2-tbt
                    # 1. Only users who're VIP5 and above can subscribe
                    # 2. Identity verification required before subscription
                    #
                    # books50-l2-tbt
                    # 1. Only users who're VIP4 and above can subscribe
                    # 2. Identity verification required before subscription
                    #
                    # books
                    # 1. All API users can subscribe
                    # 2. Public depth channel, verification not required
                    #
                    # books5
                    # 1. All API users can subscribe
                    # 2. Public depth channel, verification not required
                    # 3. Data feeds will be delivered every 100ms(vs. every 200ms now)
                    #
                    'depth': 'books',
                },
                'watchBalance': 'spot',  # margin, futures, swap
                'watchTicker': {
                    'channel': 'tickers',  # tickers, sprd-tickers, index-tickers, block-tickers
                },
                'watchTickers': {
                    'channel': 'tickers',  # tickers, sprd-tickers, index-tickers, block-tickers
                },
                'watchOrders': {
                    'type': 'ANY',  # SPOT, MARGIN, SWAP, FUTURES, OPTION, ANY
                },
                'watchMyTrades': {
                    'type': 'ANY',  # SPOT, MARGIN, SWAP, FUTURES, OPTION, ANY
                },
                'createOrderWs': {
                    'op': 'batch-orders',  # order, batch-orders
                },
                'editOrderWs': {
                    'op': 'amend-order',  # amend-order, batch-amend-orders
                },
                'ws': {
                    # 'inflate': True,
                },
            },
            'streaming': {
                # okex does not support built-in ws protocol-level ping-pong
                # instead it requires a custom text-based ping-pong
                'ping': self.ping,
                'keepAlive': 18000,
            },
        })

    def get_url(self, channel: str, access='public'):
        # for context: https://www.okx.com/help-center/changes-to-v5-api-websocket-subscription-parameter-and-url
        isSandbox = self.options['sandboxMode']
        sandboxSuffix = '?brokerId=9999' if isSandbox else ''
        isBusiness = (access == 'business')
        isPublic = (access == 'public')
        url = self.urls['api']['ws']
        if isBusiness or (channel.find('candle') > -1) or (channel == 'orders-algo'):
            return url + '/business' + sandboxSuffix
        elif isPublic:
            return url + '/public' + sandboxSuffix
        return url + '/private' + sandboxSuffix

    async def subscribe_multiple(self, access, channel, symbols: Strings = None, params={}):
        await self.load_markets()
        if symbols is None:
            symbols = self.symbols
        symbols = self.market_symbols(symbols)
        url = self.get_url(channel, access)
        messageHashes = []
        args = []
        for i in range(0, len(symbols)):
            marketId = self.market_id(symbols[i])
            arg: dict = {
                'channel': channel,
                'instId': marketId,
            }
            args.append(self.extend(arg, params))
            messageHashes.append(channel + '::' + symbols[i])
        request: dict = {
            'op': 'subscribe',
            'args': args,
        }
        return await self.watch_multiple(url, messageHashes, request, messageHashes)

    async def subscribe(self, access, messageHash, channel, symbol, params={}):
        await self.load_markets()
        url = self.get_url(channel, access)
        firstArgument: dict = {
            'channel': channel,
        }
        if symbol is not None:
            market = self.market(symbol)
            messageHash += ':' + market['id']
            firstArgument['instId'] = market['id']
        request: dict = {
            'op': 'subscribe',
            'args': [
                self.deep_extend(firstArgument, params),
            ],
        }
        return await self.watch(url, messageHash, request, messageHash)

    async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol
        :param str symbol: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        return await self.watch_trades_for_symbols([symbol], since, limit, params)

    async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol
        :param str symbols:
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        symbolsLength = len(symbols)
        if symbolsLength == 0:
            raise ArgumentsRequired(self.id + ' watchTradesForSymbols() requires a non-empty array of symbols')
        await self.load_markets()
        symbols = self.market_symbols(symbols)
        channel = 'trades'
        topics = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            messageHashes.append(channel + ':' + symbol)
            marketId = self.market_id(symbol)
            topic: dict = {
                'channel': channel,
                'instId': marketId,
            }
            topics.append(topic)
        request: dict = {
            'op': 'subscribe',
            'args': topics,
        }
        url = self.get_url(channel, 'public')
        trades = await self.watch_multiple(url, messageHashes, request, messageHashes)
        if self.newUpdates:
            first = self.safe_value(trades, 0)
            tradeSymbol = self.safe_string(first, 'symbol')
            limit = trades.getLimit(tradeSymbol, limit)
        return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)

    async def un_watch_trades_for_symbols(self, symbols: List[str], params={}) -> Any:
        """
        unWatches from the stream channel
        :param str[] symbols:
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False)
        channel = 'trades'
        topics = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            messageHashes.append('unsubscribe:trades:' + symbol)
            marketId = self.market_id(symbol)
            topic: dict = {
                'channel': channel,
                'instId': marketId,
            }
            topics.append(topic)
        request: dict = {
            'op': 'unsubscribe',
            'args': topics,
        }
        url = self.get_url(channel, 'public')
        return await self.watch_multiple(url, messageHashes, request, messageHashes)

    async def un_watch_trades(self, symbol: str, params={}) -> Any:
        """
        unWatches from the stream channel
        :param str symbol: unified symbol of the market to fetch trades for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        return await self.un_watch_trades_for_symbols([symbol], params)

    def handle_trades(self, client: Client, message):
        #
        #     {
        #         "arg": {channel: "trades", instId: "BTC-USDT"},
        #         "data": [
        #             {
        #                 "instId": "BTC-USDT",
        #                 "tradeId": "216970876",
        #                 "px": "31684.5",
        #                 "sz": "0.00001186",
        #                 "side": "buy",
        #                 "ts": "1626531038288"
        #             }
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        marketId = self.safe_string(arg, 'instId')
        symbol = self.safe_symbol(marketId)
        data = self.safe_value(message, 'data', [])
        tradesLimit = self.safe_integer(self.options, 'tradesLimit', 1000)
        for i in range(0, len(data)):
            trade = self.parse_trade(data[i])
            messageHash = channel + ':' + symbol
            stored = self.safe_value(self.trades, symbol)
            if stored is None:
                stored = ArrayCache(tradesLimit)
                self.trades[symbol] = stored
            stored.append(trade)
            client.resolve(stored, messageHash)

    async def watch_funding_rate(self, symbol: str, params={}) -> FundingRate:
        """
        watch the current funding rate

        https://www.okx.com/docs-v5/en/#public-data-websocket-funding-rate-channel

        :param str symbol: unified market symbol
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `funding rate structure <https://docs.ccxt.com/#/?id=funding-rate-structure>`
        """
        symbol = self.symbol(symbol)
        fr = await self.watch_funding_rates([symbol], params)
        return fr[symbol]

    async def watch_funding_rates(self, symbols: List[str], params={}) -> FundingRates:
        """
        watch the funding rate for multiple markets

        https://www.okx.com/docs-v5/en/#public-data-websocket-funding-rate-channel

        :param str[] symbols: list of unified market symbols
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a dictionary of `funding rates structures <https://docs.ccxt.com/#/?id=funding-rates-structure>`, indexe by market symbols
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols)
        channel = 'funding-rate'
        topics = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            messageHashes.append(channel + ':' + symbol)
            marketId = self.market_id(symbol)
            topic: dict = {
                'channel': channel,
                'instId': marketId,
            }
            topics.append(topic)
        request: dict = {
            'op': 'subscribe',
            'args': topics,
        }
        url = self.get_url(channel, 'public')
        fundingRate = await self.watch_multiple(url, messageHashes, request, messageHashes)
        if self.newUpdates:
            symbol = self.safe_string(fundingRate, 'symbol')
            result: dict = {}
            result[symbol] = fundingRate
            return result
        return self.filter_by_array(self.fundingRates, 'symbol', symbols)

    def handle_funding_rate(self, client: Client, message):
        #
        # "data":[
        #     {
        #        "fundingRate":"0.0001875391284828",
        #        "fundingTime":"1700726400000",
        #        "instId":"BTC-USD-SWAP",
        #        "instType":"SWAP",
        #        "method": "next_period",
        #        "maxFundingRate":"0.00375",
        #        "minFundingRate":"-0.00375",
        #        "nextFundingRate":"0.0002608059239328",
        #        "nextFundingTime":"1700755200000",
        #        "premium": "0.0001233824646391",
        #        "settFundingRate":"0.0001699799259033",
        #        "settState":"settled",
        #        "ts":"1700724675402"
        #     }
        # ]
        #
        data = self.safe_list(message, 'data', [])
        for i in range(0, len(data)):
            rawfr = data[i]
            fundingRate = self.parse_funding_rate(rawfr)
            symbol = fundingRate['symbol']
            self.fundingRates[symbol] = fundingRate
            client.resolve(fundingRate, 'funding-rate' + ':' + fundingRate['symbol'])

    async def watch_ticker(self, symbol: str, params={}) -> Ticker:
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel

        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
        :param str symbol: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchTicker', 'channel', 'tickers')
        params['channel'] = channel
        market = self.market(symbol)
        symbol = market['symbol']
        ticker = await self.watch_tickers([symbol], params)
        return self.safe_value(ticker, symbol)

    async def un_watch_ticker(self, symbol: str, params={}) -> Any:
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel

        unWatches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
        :param str symbol: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        return await self.un_watch_tickers([symbol], params)

    async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel

        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list
        :param str[] [symbols]: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False)
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchTickers', 'channel', 'tickers')
        newTickers = await self.subscribe_multiple('public', channel, symbols, params)
        if self.newUpdates:
            return newTickers
        return self.filter_by_array(self.tickers, 'symbol', symbols)

    async def watch_mark_price(self, symbol: str, params={}) -> Ticker:
        """

        https://www.okx.com/docs-v5/en/#public-data-websocket-mark-price-channel

        watches a mark price
        :param str symbol: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchMarkPrice', 'channel', 'mark-price')
        params['channel'] = channel
        market = self.market(symbol)
        symbol = market['symbol']
        ticker = await self.watch_mark_prices([symbol], params)
        return ticker[symbol]

    async def watch_mark_prices(self, symbols: Strings = None, params={}) -> Tickers:
        """

        https://www.okx.com/docs-v5/en/#public-data-websocket-mark-price-channel

        watches mark prices
        :param str[] [symbols]: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False)
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchMarkPrices', 'channel', 'mark-price')
        newTickers = await self.subscribe_multiple('public', channel, symbols, params)
        if self.newUpdates:
            return newTickers
        return self.filter_by_array(self.tickers, 'symbol', symbols)

    async def un_watch_tickers(self, symbols: Strings = None, params={}) -> Any:
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel

        unWatches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list
        :param str[] [symbols]: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False)
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchTickers', 'channel', 'tickers')
        topics = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            messageHashes.append('unsubscribe:ticker:' + symbol)
            marketId = self.market_id(symbol)
            topic: dict = {
                'channel': channel,
                'instId': marketId,
            }
            topics.append(topic)
        request: dict = {
            'op': 'unsubscribe',
            'args': topics,
        }
        url = self.get_url(channel, 'public')
        return await self.watch_multiple(url, messageHashes, request, messageHashes)

    def handle_ticker(self, client: Client, message):
        #
        #     {
        #         "arg": {channel: "tickers", instId: "BTC-USDT"},
        #         "data": [
        #             {
        #                 "instType": "SPOT",
        #                 "instId": "BTC-USDT",
        #                 "last": "31500.1",
        #                 "lastSz": "0.00001754",
        #                 "askPx": "31500.1",
        #                 "askSz": "0.00998144",
        #                 "bidPx": "31500",
        #                 "bidSz": "3.05652439",
        #                 "open24h": "31697",
        #                 "high24h": "32248",
        #                 "low24h": "31165.6",
        #                 "sodUtc0": "31385.5",
        #                 "sodUtc8": "32134.9",
        #                 "volCcy24h": "503403597.38138519",
        #                 "vol24h": "15937.10781721",
        #                 "ts": "1626526618762"
        #             }
        #         ]
        #     }
        #
        self.handle_bid_ask(client, message)
        arg = self.safe_value(message, 'arg', {})
        marketId = self.safe_string(arg, 'instId')
        market = self.safe_market(marketId, None, '-')
        symbol = market['symbol']
        channel = self.safe_string(arg, 'channel')
        data = self.safe_value(message, 'data', [])
        newTickers: dict = {}
        for i in range(0, len(data)):
            ticker = self.parse_ticker(data[i])
            self.tickers[symbol] = ticker
            newTickers[symbol] = ticker
        messageHash = channel + '::' + symbol
        client.resolve(newTickers, messageHash)

    async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers:
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel

        watches best bid & ask for symbols
        :param str[] symbols: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False)
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchBidsAsks', 'channel', 'tickers')
        url = self.get_url(channel, 'public')
        messageHashes = []
        args = []
        for i in range(0, len(symbols)):
            marketId = self.market_id(symbols[i])
            arg: dict = {
                'channel': channel,
                'instId': marketId,
            }
            args.append(self.extend(arg, params))
            messageHashes.append('bidask::' + symbols[i])
        request: dict = {
            'op': 'subscribe',
            'args': args,
        }
        newTickers = await self.watch_multiple(url, messageHashes, request, messageHashes)
        if self.newUpdates:
            tickers: dict = {}
            tickers[newTickers['symbol']] = newTickers
            return tickers
        return self.filter_by_array(self.bidsasks, 'symbol', symbols)

    def handle_bid_ask(self, client: Client, message):
        #
        #     {
        #         "arg": {channel: "tickers", instId: "BTC-USDT"},
        #         "data": [
        #             {
        #                 "instType": "SPOT",
        #                 "instId": "BTC-USDT",
        #                 "last": "31500.1",
        #                 "lastSz": "0.00001754",
        #                 "askPx": "31500.1",
        #                 "askSz": "0.00998144",
        #                 "bidPx": "31500",
        #                 "bidSz": "3.05652439",
        #                 "open24h": "31697",
        #                 "high24h": "32248",
        #                 "low24h": "31165.6",
        #                 "sodUtc0": "31385.5",
        #                 "sodUtc8": "32134.9",
        #                 "volCcy24h": "503403597.38138519",
        #                 "vol24h": "15937.10781721",
        #                 "ts": "1626526618762"
        #             }
        #         ]
        #     }
        #
        data = self.safe_list(message, 'data', [])
        ticker = self.safe_dict(data, 0, {})
        parsedTicker = self.parse_ws_bid_ask(ticker)
        symbol = parsedTicker['symbol']
        self.bidsasks[symbol] = parsedTicker
        messageHash = 'bidask::' + symbol
        client.resolve(parsedTicker, messageHash)

    def parse_ws_bid_ask(self, ticker, market=None):
        marketId = self.safe_string(ticker, 'instId')
        market = self.safe_market(marketId, market)
        symbol = self.safe_string(market, 'symbol')
        timestamp = self.safe_integer(ticker, 'ts')
        return self.safe_ticker({
            'symbol': symbol,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'ask': self.safe_string(ticker, 'askPx'),
            'askVolume': self.safe_string(ticker, 'askSz'),
            'bid': self.safe_string(ticker, 'bidPx'),
            'bidVolume': self.safe_string(ticker, 'bidSz'),
            'info': ticker,
        }, market)

    async def watch_liquidations_for_symbols(self, symbols: List[str] = None, since: Int = None, limit: Int = None, params={}) -> List[Liquidation]:
        """
        watch the public liquidations of a trading pair

        https://www.okx.com/docs-v5/en/#public-data-websocket-liquidation-orders-channel

        :param str symbols:
        :param int [since]: the earliest time in ms to fetch liquidations for
        :param int [limit]: the maximum number of liquidation structures to retrieve
        :param dict [params]: exchange specific parameters for the okx api endpoint
        :returns dict: an array of `liquidation structures <https://github.com/ccxt/ccxt/wiki/Manual#liquidation-structure>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, True, True)
        messageHash = 'liquidations'
        messageHashes = []
        if symbols is not None:
            for i in range(0, len(symbols)):
                symbol = symbols[i]
                messageHashes.append(messageHash + '::' + symbol)
        else:
            messageHashes.append(messageHash)
        market = self.get_market_from_symbols(symbols)
        type = None
        type, params = self.handle_market_type_and_params('watchliquidationsForSymbols', market, params)
        channel = 'liquidation-orders'
        if type == 'spot':
            type = 'SWAP'
        elif type == 'future':
            type = 'futures'
        uppercaseType = type.upper()
        request = {
            'op': 'subscribe',
            'args': [
                {
                    'channel': channel,
                    'instType': uppercaseType,
                },
            ],
        }
        url = self.get_url(channel, 'public')
        newLiquidations = await self.watch_multiple(url, messageHashes, request, messageHashes)
        if self.newUpdates:
            return newLiquidations
        return self.filter_by_symbols_since_limit(self.liquidations, symbols, since, limit, True)

    def handle_liquidation(self, client: Client, message):
        #
        #    {
        #        "arg": {
        #            "channel": "liquidation-orders",
        #            "instType": "SWAP"
        #        },
        #        "data": [
        #            {
        #                "details": [
        #                    {
        #                        "bkLoss": "0",
        #                        "bkPx": "0.007831",
        #                        "ccy": "",
        #                        "posSide": "short",
        #                        "side": "buy",
        #                        "sz": "13",
        #                        "ts": "1692266434010"
        #                    }
        #                ],
        #                "instFamily": "IOST-USDT",
        #                "instId": "IOST-USDT-SWAP",
        #                "instType": "SWAP",
        #                "uly": "IOST-USDT"
        #            }
        #        ]
        #    }
        #
        rawLiquidations = self.safe_list(message, 'data', [])
        for i in range(0, len(rawLiquidations)):
            rawLiquidation = rawLiquidations[i]
            liquidation = self.parse_ws_liquidation(rawLiquidation)
            symbol = self.safe_string(liquidation, 'symbol')
            liquidations = self.safe_value(self.liquidations, symbol)
            if liquidations is None:
                limit = self.safe_integer(self.options, 'liquidationsLimit', 1000)
                liquidations = ArrayCache(limit)
            liquidations.append(liquidation)
            self.liquidations[symbol] = liquidations
            client.resolve([liquidation], 'liquidations')
            client.resolve([liquidation], 'liquidations::' + symbol)

    async def watch_my_liquidations_for_symbols(self, symbols: List[str] = None, since: Int = None, limit: Int = None, params={}) -> List[Liquidation]:
        """
        watch the private liquidations of a trading pair

        https://www.okx.com/docs-v5/en/#trading-account-websocket-balance-and-position-channel

        :param str[] symbols:
        :param int [since]: the earliest time in ms to fetch liquidations for
        :param int [limit]: the maximum number of liquidation structures to retrieve
        :param dict [params]: exchange specific parameters for the okx api endpoint
        :returns dict: an array of `liquidation structures <https://github.com/ccxt/ccxt/wiki/Manual#liquidation-structure>`
        """
        await self.load_markets()
        isTrigger = self.safe_value_2(params, 'stop', 'trigger', False)
        params = self.omit(params, ['stop', 'trigger'])
        await self.authenticate({'access': 'business' if isTrigger else 'private'})
        symbols = self.market_symbols(symbols, None, True, True)
        messageHash = 'myLiquidations'
        messageHashes = []
        if symbols is not None:
            for i in range(0, len(symbols)):
                symbol = symbols[i]
                messageHashes.append(messageHash + '::' + symbol)
        else:
            messageHashes.append(messageHash)
        channel = 'balance_and_position'
        request: dict = {
            'op': 'subscribe',
            'args': [
                {
                    'channel': channel,
                },
            ],
        }
        url = self.get_url(channel, 'private')
        newLiquidations = await self.watch_multiple(url, messageHashes, self.deep_extend(request, params), messageHashes)
        if self.newUpdates:
            return newLiquidations
        return self.filter_by_symbols_since_limit(self.liquidations, symbols, since, limit, True)

    def handle_my_liquidation(self, client: Client, message):
        #
        #    {
        #        "arg": {
        #            "channel": "balance_and_position",
        #            "uid": "77982378738415879"
        #        },
        #        "data": [{
        #            "pTime": "1597026383085",
        #            "eventType": "snapshot",
        #            "balData": [{
        #                "ccy": "BTC",
        #                "cashBal": "1",
        #                "uTime": "1597026383085"
        #            }],
        #            "posData": [{
        #                "posId": "1111111111",
        #                "tradeId": "2",
        #                "instId": "BTC-USD-191018",
        #                "instType": "FUTURES",
        #                "mgnMode": "cross",
        #                "posSide": "long",
        #                "pos": "10",
        #                "ccy": "BTC",
        #                "posCcy": "",
        #                "avgPx": "3320",
        #                "uTIme": "1597026383085"
        #            }],
        #            "trades": [{
        #                "instId": "BTC-USD-191018",
        #                "tradeId": "2",
        #            }]
        #        }]
        #    }
        #
        rawLiquidations = self.safe_list(message, 'data', [])
        for i in range(0, len(rawLiquidations)):
            rawLiquidation = rawLiquidations[i]
            eventType = self.safe_string(rawLiquidation, 'eventType')
            if eventType != 'liquidation':
                return
            liquidation = self.parse_ws_my_liquidation(rawLiquidation)
            symbol = self.safe_string(liquidation, 'symbol')
            liquidations = self.safe_value(self.liquidations, symbol)
            if liquidations is None:
                limit = self.safe_integer(self.options, 'myLiquidationsLimit', 1000)
                liquidations = ArrayCache(limit)
            liquidations.append(liquidation)
            self.liquidations[symbol] = liquidations
            client.resolve([liquidation], 'myLiquidations')
            client.resolve([liquidation], 'myLiquidations::' + symbol)

    def parse_ws_my_liquidation(self, liquidation, market=None):
        #
        #    {
        #        "pTime": "1597026383085",
        #        "eventType": "snapshot",
        #        "balData": [{
        #            "ccy": "BTC",
        #            "cashBal": "1",
        #            "uTime": "1597026383085"
        #        }],
        #        "posData": [{
        #            "posId": "1111111111",
        #            "tradeId": "2",
        #            "instId": "BTC-USD-191018",
        #            "instType": "FUTURES",
        #            "mgnMode": "cross",
        #            "posSide": "long",
        #            "pos": "10",
        #            "ccy": "BTC",
        #            "posCcy": "",
        #            "avgPx": "3320",
        #            "uTIme": "1597026383085"
        #        }],
        #        "trades": [{
        #            "instId": "BTC-USD-191018",
        #            "tradeId": "2",
        #        }]
        #    }
        #
        posData = self.safe_list(liquidation, 'posData', [])
        firstPosData = self.safe_dict(posData, 0, {})
        marketId = self.safe_string(firstPosData, 'instId')
        market = self.safe_market(marketId, market)
        timestamp = self.safe_integer(firstPosData, 'uTIme')
        return self.safe_liquidation({
            'info': liquidation,
            'symbol': self.safe_symbol(marketId, market),
            'contracts': self.safe_number(firstPosData, 'pos'),
            'contractSize': self.safe_number(market, 'contractSize'),
            'price': self.safe_number(liquidation, 'avgPx'),
            'baseValue': None,
            'quoteValue': None,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
        })

    def parse_ws_liquidation(self, liquidation, market=None):
        #
        # public liquidation
        #    {
        #        "details": [
        #            {
        #                "bkLoss": "0",
        #                "bkPx": "0.007831",
        #                "ccy": "",
        #                "posSide": "short",
        #                "side": "buy",
        #                "sz": "13",
        #                "ts": "1692266434010"
        #            }
        #        ],
        #        "instFamily": "IOST-USDT",
        #        "instId": "IOST-USDT-SWAP",
        #        "instType": "SWAP",
        #        "uly": "IOST-USDT"
        #    }
        #
        details = self.safe_list(liquidation, 'details', [])
        liquidationDetails = self.safe_dict(details, 0, {})
        marketId = self.safe_string(liquidation, 'instId')
        market = self.safe_market(marketId, market)
        timestamp = self.safe_integer(liquidationDetails, 'ts')
        return self.safe_liquidation({
            'info': liquidation,
            'symbol': self.safe_symbol(marketId, market),
            'contracts': self.safe_number(liquidationDetails, 'sz'),
            'contractSize': self.safe_number(market, 'contractSize'),
            'price': self.safe_number(liquidationDetails, 'bkPx'),
            'baseValue': None,
            'quoteValue': None,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
        })

    async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
        :param str symbol: unified symbol of the market to fetch OHLCV data for
        :param str timeframe: the length of time each candle represents
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        await self.load_markets()
        symbol = self.symbol(symbol)
        interval = self.safe_string(self.timeframes, timeframe, timeframe)
        name = 'candle' + interval
        ohlcv = await self.subscribe('public', name, name, symbol, params)
        if self.newUpdates:
            limit = ohlcv.getLimit(symbol, limit)
        return self.filter_by_since_limit(ohlcv, since, limit, 0, True)

    async def un_watch_ohlcv(self, symbol: str, timeframe='1m', params={}) -> Any:
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
        :param str symbol: unified symbol of the market to fetch OHLCV data for
        :param str timeframe: the length of time each candle represents
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        await self.load_markets()
        return await self.un_watch_ohlcv_for_symbols([[symbol, timeframe]], params)

    async def watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], since: Int = None, limit: Int = None, params={}):
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
        :param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        symbolsLength = len(symbolsAndTimeframes)
        if symbolsLength == 0 or not isinstance(symbolsAndTimeframes[0], list):
            raise ArgumentsRequired(self.id + " watchOHLCVForSymbols() requires a an array of symbols and timeframes, like  [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]")
        await self.load_markets()
        topics = []
        messageHashes = []
        for i in range(0, len(symbolsAndTimeframes)):
            symbolAndTimeframe = symbolsAndTimeframes[i]
            sym = symbolAndTimeframe[0]
            tf = symbolAndTimeframe[1]
            marketId = self.market_id(sym)
            interval = self.safe_string(self.timeframes, tf, tf)
            channel = 'candle' + interval
            topic: dict = {
                'channel': channel,
                'instId': marketId,
            }
            topics.append(topic)
            messageHashes.append('multi:' + channel + ':' + sym)
        request: dict = {
            'op': 'subscribe',
            'args': topics,
        }
        url = self.get_url('candle', 'public')
        symbol, timeframe, candles = await self.watch_multiple(url, messageHashes, request, messageHashes)
        if self.newUpdates:
            limit = candles.getLimit(symbol, limit)
        filtered = self.filter_by_since_limit(candles, since, limit, 0, True)
        return self.create_ohlcv_object(symbol, timeframe, filtered)

    async def un_watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], params={}) -> Any:
        """
        unWatches historical candlestick data containing the open, high, low, and close price, and the volume of a market
        :param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        symbolsLength = len(symbolsAndTimeframes)
        if symbolsLength == 0 or not isinstance(symbolsAndTimeframes[0], list):
            raise ArgumentsRequired(self.id + " watchOHLCVForSymbols() requires a an array of symbols and timeframes, like  [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]")
        await self.load_markets()
        topics = []
        messageHashes = []
        for i in range(0, len(symbolsAndTimeframes)):
            symbolAndTimeframe = symbolsAndTimeframes[i]
            sym = symbolAndTimeframe[0]
            tf = symbolAndTimeframe[1]
            marketId = self.market_id(sym)
            interval = self.safe_string(self.timeframes, tf, tf)
            channel = 'candle' + interval
            topic: dict = {
                'channel': channel,
                'instId': marketId,
            }
            topics.append(topic)
            messageHashes.append('unsubscribe:multi:' + channel + ':' + sym)
        request: dict = {
            'op': 'unsubscribe',
            'args': topics,
        }
        url = self.get_url('candle', 'public')
        return await self.watch_multiple(url, messageHashes, request, messageHashes)

    def handle_ohlcv(self, client: Client, message):
        #
        #     {
        #         "arg": {channel: "candle1m", instId: "BTC-USDT"},
        #         "data": [
        #             [
        #                 "1626690720000",
        #                 "31334",
        #                 "31334",
        #                 "31334",
        #                 "31334",
        #                 "0.0077",
        #                 "241.2718"
        #             ]
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        data = self.safe_value(message, 'data', [])
        marketId = self.safe_string(arg, 'instId')
        market = self.safe_market(marketId)
        symbol = market['symbol']
        interval = channel.replace('candle', '')
        # use a reverse lookup in a static map instead
        timeframe = self.find_timeframe(interval)
        for i in range(0, len(data)):
            parsed = self.parse_ohlcv(data[i], market)
            self.ohlcvs[symbol] = self.safe_value(self.ohlcvs, symbol, {})
            stored = self.safe_value(self.ohlcvs[symbol], timeframe)
            if stored is None:
                limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
                stored = ArrayCacheByTimestamp(limit)
                self.ohlcvs[symbol][timeframe] = stored
            stored.append(parsed)
            messageHash = channel + ':' + market['id']
            client.resolve(stored, messageHash)
            # for multiOHLCV we need special object, to other "multi"
            # methods, because OHLCV response item does not contain symbol
            # or timeframe, thus otherwise it would be unrecognizable
            messageHashForMulti = 'multi:' + channel + ':' + symbol
            client.resolve([symbol, timeframe, stored], messageHashForMulti)

    async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel

        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
        :param str symbol: unified symbol of the market to fetch the order book for
        :param int [limit]: the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        #
        # bbo-tbt
        # 1. Newly added channel that sends tick-by-tick Level 1 data
        # 2. All API users can subscribe
        # 3. Public depth channel, verification not required
        #
        # books-l2-tbt
        # 1. Only users who're VIP5 and above can subscribe
        # 2. Identity verification required before subscription
        #
        # books50-l2-tbt
        # 1. Only users who're VIP4 and above can subscribe
        # 2. Identity verification required before subscription
        #
        # books
        # 1. All API users can subscribe
        # 2. Public depth channel, verification not required
        #
        # books5
        # 1. All API users can subscribe
        # 2. Public depth channel, verification not required
        # 3. Data feeds will be delivered every 100ms(vs. every 200ms now)
        #
        return await self.watch_order_book_for_symbols([symbol], limit, params)

    async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel

        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
        :param str[] symbols: unified array of symbols
        :param int [limit]: 1,5, 400, 50(l2-tbt, vip4+) or 40000(vip5+) the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols)
        depth = None
        depth, params = self.handle_option_and_params(params, 'watchOrderBook', 'depth', 'books')
        if limit is not None:
            if limit == 1:
                depth = 'bbo-tbt'
            elif limit > 1 and limit <= 5:
                depth = 'books5'
            elif limit == 50:
                depth = 'books50-l2-tbt'  # Make sure you have VIP4 and above
            elif limit == 400:
                depth = 'books'
        if (depth == 'books-l2-tbt') or (depth == 'books50-l2-tbt'):
            if not self.check_required_credentials(False):
                raise AuthenticationError(self.id + ' watchOrderBook/watchOrderBookForSymbols requires authentication for self depth. Add credentials or change the depth option to books or books5')
            await self.authenticate({'access': 'public'})
        topics = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            messageHashes.append(depth + ':' + symbol)
            marketId = self.market_id(symbol)
            topic: dict = {
                'channel': depth,
                'instId': marketId,
            }
            topics.append(topic)
        request: dict = {
            'op': 'subscribe',
            'args': topics,
        }
        url = self.get_url(depth, 'public')
        orderbook = await self.watch_multiple(url, messageHashes, request, messageHashes)
        return orderbook.limit()

    async def un_watch_order_book_for_symbols(self, symbols: List[str], params={}) -> Any:
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel

        unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
        :param str[] symbols: unified array of symbols
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param int [params.limit]: the maximum amount of order book entries to return
        :param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False)
        depth = None
        depth, params = self.handle_option_and_params(params, 'watchOrderBook', 'depth', 'books')
        limit = self.safe_integer(params, 'limit')
        if limit is not None:
            if limit == 1:
                depth = 'bbo-tbt'
            elif limit > 1 and limit <= 5:
                depth = 'books5'
            elif limit == 50:
                depth = 'books50-l2-tbt'  # Make sure you have VIP4 and above
            elif limit == 400:
                depth = 'books'
        topics = []
        subMessageHashes = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            subMessageHashes.append(depth + ':' + symbol)
            messageHashes.append('unsubscribe:orderbook:' + symbol)
            marketId = self.market_id(symbol)
            topic: dict = {
                'channel': depth,
                'instId': marketId,
            }
            topics.append(topic)
        request: dict = {
            'op': 'unsubscribe',
            'args': topics,
        }
        url = self.get_url(depth, 'public')
        return await self.watch_multiple(url, messageHashes, request, messageHashes)

    async def un_watch_order_book(self, symbol: str, params={}) -> Any:
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel

        unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
        :param str symbol: unified array of symbols
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param int [params.limit]: the maximum amount of order book entries to return
        :param str [params.depth]: okx order book depth, can be books, books5, books-l2-tbt, books50-l2-tbt, bbo-tbt
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        return await self.un_watch_order_book_for_symbols([symbol], params)

    def handle_delta(self, bookside, delta):
        #
        #     [
        #         "31685",  # price
        #         "0.78069158",  # amount
        #         "0",  # liquidated orders
        #         "17"  # orders
        #     ]
        #
        price = self.safe_float(delta, 0)
        amount = self.safe_float(delta, 1)
        bookside.store(price, amount)

    def handle_deltas(self, bookside, deltas):
        for i in range(0, len(deltas)):
            self.handle_delta(bookside, deltas[i])

    def handle_order_book_message(self, client: Client, message, orderbook, messageHash, market=None):
        #
        #     {
        #         "asks": [
        #             ['31738.3', '0.05973179', "0", "3"],
        #             ['31738.5', '0.11035404', "0", "2"],
        #             ['31739.6', '0.01', "0", "1"],
        #         ],
        #         "bids": [
        #             ['31738.2', '0.67557666', "0", "9"],
        #             ['31738', '0.02466947', "0", "2"],
        #             ['31736.3', '0.01705046', "0", "2"],
        #         ],
        #         "instId": "BTC-USDT",
        #         "ts": "1626537446491"
        #         "checksum": -855196043,
        #         "prevSeqId": 123456,
        #         "seqId": 123457
        #     }
        #
        asks = self.safe_value(message, 'asks', [])
        bids = self.safe_value(message, 'bids', [])
        storedAsks = orderbook['asks']
        storedBids = orderbook['bids']
        self.handle_deltas(storedAsks, asks)
        self.handle_deltas(storedBids, bids)
        marketId = self.safe_string(message, 'instId')
        symbol = self.safe_symbol(marketId, market)
        checksum = self.handle_option('watchOrderBook', 'checksum', True)
        seqId = self.safe_integer(message, 'seqId')
        if checksum:
            prevSeqId = self.safe_integer(message, 'prevSeqId')
            nonce = orderbook['nonce']
            asksLength = len(storedAsks)
            bidsLength = len(storedBids)
            payloadArray = []
            for i in range(0, 25):
                if i < bidsLength:
                    payloadArray.append(self.number_to_string(storedBids[i][0]))
                    payloadArray.append(self.number_to_string(storedBids[i][1]))
                if i < asksLength:
                    payloadArray.append(self.number_to_string(storedAsks[i][0]))
                    payloadArray.append(self.number_to_string(storedAsks[i][1]))
            payload = ':'.join(payloadArray)
            responseChecksum = self.safe_integer(message, 'checksum')
            localChecksum = self.crc32(payload, True)
            error = None
            if prevSeqId != -1 and nonce != prevSeqId:
                error = InvalidNonce(self.id + ' watchOrderBook received invalid nonce')
            if responseChecksum != localChecksum:
                error = ChecksumError(self.id + ' ' + self.orderbook_checksum_message(symbol))
            if error is not None:
                del client.subscriptions[messageHash]
                del self.orderbooks[symbol]
                client.reject(error, messageHash)
        timestamp = self.safe_integer(message, 'ts')
        orderbook['nonce'] = seqId
        orderbook['timestamp'] = timestamp
        orderbook['datetime'] = self.iso8601(timestamp)
        return orderbook

    def handle_order_book(self, client: Client, message):
        #
        # snapshot
        #
        #     {
        #         "arg": {channel: 'books-l2-tbt', instId: "BTC-USDT"},
        #         "action": "snapshot",
        #         "data": [
        #             {
        #                 "asks": [
        #                     ['31685', '0.78069158', "0", "17"],
        #                     ['31685.1', '0.0001', "0", "1"],
        #                     ['31685.6', '0.04543165', "0", "1"],
        #                 ],
        #                 "bids": [
        #                     ['31684.9', '0.01', "0", "1"],
        #                     ['31682.9', '0.0001', "0", "1"],
        #                     ['31680.7', '0.01', "0", "1"],
        #                 ],
        #                 "ts": "1626532416403",
        #                 "checksum": -1023440116
        #             }
        #         ]
        #     }
        #
        # update
        #
        #     {
        #         "arg": {channel: 'books-l2-tbt', instId: "BTC-USDT"},
        #         "action": "update",
        #         "data": [
        #             {
        #                 "asks": [
        #                     ['31657.7', '0', "0", "0"],
        #                     ['31659.7', '0.01', "0", "1"],
        #                     ['31987.3', '0.01', "0", "1"]
        #                 ],
        #                 "bids": [
        #                     ['31642.9', '0.50296385', "0", "4"],
        #                     ['31639.9', '0', "0", "0"],
        #                     ['31638.7', '0.01', "0", "1"],
        #                 ],
        #                 "ts": "1626535709008",
        #                 "checksum": 830931827
        #             }
        #         ]
        #     }
        #
        # books5
        #
        #     {
        #         "arg": {channel: "books5", instId: "BTC-USDT"},
        #         "data": [
        #             {
        #                 "asks": [
        #                     ['31738.3', '0.05973179', "0", "3"],
        #                     ['31738.5', '0.11035404', "0", "2"],
        #                     ['31739.6', '0.01', "0", "1"],
        #                 ],
        #                 "bids": [
        #                     ['31738.2', '0.67557666', "0", "9"],
        #                     ['31738', '0.02466947', "0", "2"],
        #                     ['31736.3', '0.01705046', "0", "2"],
        #                 ],
        #                 "instId": "BTC-USDT",
        #                 "ts": "1626537446491"
        #             }
        #         ]
        #     }
        #
        # bbo-tbt
        #
        #     {
        #         "arg":{
        #             "channel":"bbo-tbt",
        #             "instId":"BTC-USDT"
        #         },
        #         "data":[
        #             {
        #                 "asks":[["36232.2","1.8826134","0","17"]],
        #                 "bids":[["36232.1","0.00572212","0","2"]],
        #                 "ts":"1651826598363"
        #             }
        #         ]
        #     }
        #
        arg = self.safe_dict(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        action = self.safe_string(message, 'action')
        data = self.safe_list(message, 'data', [])
        marketId = self.safe_string(arg, 'instId')
        market = self.safe_market(marketId)
        symbol = market['symbol']
        depths: dict = {
            'bbo-tbt': 1,
            'books': 400,
            'books5': 5,
            'books-l2-tbt': 400,
            'books50-l2-tbt': 50,
        }
        limit = self.safe_integer(depths, channel)
        messageHash = channel + ':' + symbol
        if action == 'snapshot':
            for i in range(0, len(data)):
                update = data[i]
                orderbook = self.order_book({}, limit)
                self.orderbooks[symbol] = orderbook
                orderbook['symbol'] = symbol
                self.handle_order_book_message(client, update, orderbook, messageHash)
                client.resolve(orderbook, messageHash)
        elif action == 'update':
            if symbol in self.orderbooks:
                orderbook = self.orderbooks[symbol]
                for i in range(0, len(data)):
                    update = data[i]
                    self.handle_order_book_message(client, update, orderbook, messageHash, market)
                    client.resolve(orderbook, messageHash)
        elif (channel == 'books5') or (channel == 'bbo-tbt'):
            if not (symbol in self.orderbooks):
                self.orderbooks[symbol] = self.order_book({}, limit)
            orderbook = self.orderbooks[symbol]
            for i in range(0, len(data)):
                update = data[i]
                timestamp = self.safe_integer(update, 'ts')
                snapshot = self.parse_order_book(update, symbol, timestamp, 'bids', 'asks', 0, 1)
                orderbook.reset(snapshot)
                client.resolve(orderbook, messageHash)
        return message

    async def authenticate(self, params={}):
        self.check_required_credentials()
        access = self.safe_string(params, 'access', 'private')
        params = self.omit(params, ['access'])
        url = self.get_url('users', access)
        messageHash = 'authenticated'
        client = self.client(url)
        future = client.future(messageHash)
        authenticated = self.safe_value(client.subscriptions, messageHash)
        if authenticated is None:
            timestamp = str(self.seconds())
            method = 'GET'
            path = '/users/self/verify'
            auth = timestamp + method + path
            signature = self.hmac(self.encode(auth), self.encode(self.secret), hashlib.sha256, 'base64')
            operation = 'login'
            request: dict = {
                'op': operation,
                'args': [
                    {
                        'apiKey': self.apiKey,
                        'passphrase': self.password,
                        'timestamp': timestamp,
                        'sign': signature,
                    },
                ],
            }
            # Only add params['access'] to prevent sending custom parameters, such.
            if 'access' in params:
                request['access'] = params['access']
            self.watch(url, messageHash, request, messageHash)
        return await future

    async def watch_balance(self, params={}) -> Balances:
        """
        watch balance and get the amount of funds available for trading or funds locked in orders
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `balance structure <https://docs.ccxt.com/#/?id=balance-structure>`
        """
        await self.load_markets()
        await self.authenticate()
        return await self.subscribe('private', 'account', 'account', None, params)

    def handle_balance_and_position(self, client: Client, message):
        self.handle_my_liquidation(client, message)

    def handle_balance(self, client: Client, message):
        #
        #     {
        #         "arg": {channel: "account"},
        #         "data": [
        #             {
        #                 "adjEq": '',
        #                 "details": [
        #                     {
        #                         "availBal": '',
        #                         "availEq": "8.21009913",
        #                         "cashBal": "8.21009913",
        #                         "ccy": "USDT",
        #                         "coinUsdPrice": "0.99994",
        #                         "crossLiab": '',
        #                         "disEq": "8.2096065240522",
        #                         "eq": "8.21009913",
        #                         "eqUsd": "8.2096065240522",
        #                         "frozenBal": "0",
        #                         "interest": '',
        #                         "isoEq": "0",
        #                         "isoLiab": '',
        #                         "liab": '',
        #                         "maxLoan": '',
        #                         "mgnRatio": '',
        #                         "notionalLever": "0",
        #                         "ordFrozen": "0",
        #                         "twap": "0",
        #                         "uTime": "1621927314996",
        #                         "upl": "0"
        #                     },
        #                 ],
        #                 "imr": '',
        #                 "isoEq": "0",
        #                 "mgnRatio": '',
        #                 "mmr": '',
        #                 "notionalUsd": '',
        #                 "ordFroz": '',
        #                 "totalEq": "22.1930992296832",
        #                 "uTime": "1626692120916"
        #             }
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        type = 'spot'
        balance = self.parseTradingBalance(message)
        oldBalance = self.safe_value(self.balance, type, {})
        newBalance = self.deep_extend(oldBalance, balance)
        self.balance[type] = self.safe_balance(newBalance)
        client.resolve(self.balance[type], channel)

    def order_to_trade(self, order, market=None):
        info = self.safe_value(order, 'info', {})
        timestamp = self.safe_integer(info, 'fillTime')
        feeMarketId = self.safe_string(info, 'fillFeeCcy')
        isTaker = self.safe_string(info, 'execType', '') == 'T'
        return self.safe_trade({
            'info': info,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'symbol': self.safe_string(order, 'symbol'),
            'id': self.safe_string(info, 'tradeId'),
            'order': self.safe_string(order, 'id'),
            'type': self.safe_string(order, 'type'),
            'takerOrMaker': 'taker' if (isTaker) else 'maker',
            'side': self.safe_string(order, 'side'),
            'price': self.safe_number(info, 'fillPx'),
            'amount': self.safe_number(info, 'fillSz'),
            'cost': self.safe_number(order, 'cost'),
            'fee': {
                'cost': self.safe_number(info, 'fillFee'),
                'currency': self.safe_currency_code(feeMarketId),
            },
        }, market)

    async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        watches information on multiple trades made by the user

        https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-order-channel

        :param str [symbol]: unified market symbol of the market trades were made in
        :param int [since]: the earliest time in ms to fetch trades for
        :param int [limit]: the maximum number of trade structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param bool [params.trigger]: True if fetching trigger or conditional trades
        :param str [params.type]: 'spot', 'swap', 'future', 'option', 'ANY', 'SPOT', 'MARGIN', 'SWAP', 'FUTURES' or 'OPTION'
        :param str [params.marginMode]: 'cross' or 'isolated', for automatically setting the type to spot margin
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=trade-structure>`
        """
        # By default, receive order updates from any instrument type
        type = None
        type, params = self.handle_option_and_params(params, 'watchMyTrades', 'type', 'ANY')
        isTrigger = self.safe_bool_2(params, 'trigger', 'stop', False)
        params = self.omit(params, ['trigger', 'stop'])
        await self.load_markets()
        await self.authenticate({'access': 'business' if isTrigger else 'private'})
        channel = 'orders-algo' if isTrigger else 'orders'
        messageHash = channel + '::myTrades'
        market = None
        if symbol is not None:
            market = self.market(symbol)
            symbol = market['symbol']
            type = market['type']
            messageHash = messageHash + '::' + symbol
        if type == 'future':
            type = 'futures'
        uppercaseType = type.upper()
        marginMode = None
        marginMode, params = self.handle_margin_mode_and_params('watchMyTrades', params)
        if uppercaseType == 'SPOT':
            if marginMode is not None:
                uppercaseType = 'MARGIN'
        request: dict = {
            'instType': uppercaseType,
        }
        orders = await self.subscribe('private', messageHash, channel, None, self.extend(request, params))
        if self.newUpdates:
            limit = orders.getLimit(symbol, limit)
        return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)

    async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]:
        """

        https://www.okx.com/docs-v5/en/#trading-account-websocket-positions-channel

        watch all open positions
        :param str[]|None symbols: list of unified market symbols
 @param since
 @param limit
        :param dict params: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
        """
        await self.load_markets()
        await self.authenticate(params)
        symbols = self.market_symbols(symbols)
        request: dict = {
            'instType': 'ANY',
        }
        channel = 'positions'
        newPositions = None
        if symbols is None:
            arg: dict = {
                'channel': 'positions',
                'instType': 'ANY',
            }
            args = [self.extend(arg, params)]
            nonSymbolRequest: dict = {
                'op': 'subscribe',
                'args': args,
            }
            url = self.get_url(channel, 'private')
            newPositions = await self.watch(url, channel, nonSymbolRequest, channel)
        else:
            newPositions = await self.subscribe_multiple('private', channel, symbols, self.extend(request, params))
        if self.newUpdates:
            return newPositions
        return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit, True)

    def handle_positions(self, client, message):
        #
        #    {
        #        arg: {
        #            channel: 'positions',
        #            instType: 'ANY',
        #            instId: 'XRP-USDT-SWAP',
        #            uid: '464737184507959869'
        #        },
        #        data: [{
        #            adl: '1',
        #            availPos: '',
        #            avgPx: '0.52668',
        #            baseBal: '',
        #            baseBorrowed: '',
        #            baseInterest: '',
        #            bizRefId: '',
        #            bizRefType: '',
        #            cTime: '1693151444408',
        #            ccy: 'USDT',
        #            closeOrderAlgo: [],
        #            deltaBS: '',
        #            deltaPA: '',
        #            gammaBS: '',
        #            gammaPA: '',
        #            idxPx: '0.52683',
        #            imr: '17.564000000000004',
        #            instId: 'XRP-USDT-SWAP',
        #            instType: 'SWAP',
        #            interest: '',
        #            last: '0.52691',
        #            lever: '3',
        #            liab: '',
        #            liabCcy: '',
        #            liqPx: '0.3287514731020614',
        #            margin: '',
        #            markPx: '0.52692',
        #            mgnMode: 'cross',
        #            mgnRatio: '69.00363001456147',
        #            mmr: '0.26346',
        #            notionalUsd: '52.68620388000001',
        #            optVal: '',
        #            pTime: '1693151906023',
        #            pendingCloseOrdLiabVal: '',
        #            pos: '1',
        #            posCcy: '',
        #            posId: '616057041198907393',
        #            posSide: 'net',
        #            quoteBal: '',
        #            quoteBorrowed: '',
        #            quoteInterest: '',
        #            spotInUseAmt: '',
        #            spotInUseCcy: '',
        #            thetaBS: '',
        #            thetaPA: '',
        #            tradeId: '138745402',
        #            uTime: '1693151444408',
        #            upl: '0.0240000000000018',
        #            uplLastPx: '0.0229999999999952',
        #            uplRatio: '0.0013670539986328',
        #            uplRatioLastPx: '0.001310093415356',
        #            usdPx: '',
        #            vegaBS: '',
        #            vegaPA: ''
        #        }]
        #    }
        #
        arg = self.safe_value(message, 'arg', {})
        marketId = self.safe_string(arg, 'instId')
        market = self.safe_market(marketId, None, '-')
        symbol = market['symbol']
        channel = self.safe_string(arg, 'channel', '')
        data = self.safe_value(message, 'data', [])
        if self.positions is None:
            self.positions = ArrayCacheBySymbolBySide()
        cache = self.positions
        newPositions = []
        for i in range(0, len(data)):
            rawPosition = data[i]
            position = self.parse_position(rawPosition)
            if position['contracts'] == 0:
                position['side'] = 'long'
                shortPosition = self.clone(position)
                shortPosition['side'] = 'short'
                cache.append(shortPosition)
                newPositions.append(shortPosition)
            newPositions.append(position)
            cache.append(position)
        messageHash = channel
        if symbol is not None:
            messageHash = channel + '::' + symbol
        client.resolve(newPositions, messageHash)

    async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
        """
        watches information on multiple orders made by the user

        https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-order-channel

        :param str [symbol]: unified market symbol of the market the orders were made in
        :param int [since]: the earliest time in ms to fetch orders for
        :param int [limit]: the maximum number of order structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param bool [params.trigger]: True if fetching trigger or conditional orders
        :param str [params.type]: 'spot', 'swap', 'future', 'option', 'ANY', 'SPOT', 'MARGIN', 'SWAP', 'FUTURES' or 'OPTION'
        :param str [params.marginMode]: 'cross' or 'isolated', for automatically setting the type to spot margin
        :returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        type = None
        # By default, receive order updates from any instrument type
        type, params = self.handle_option_and_params(params, 'watchOrders', 'type', 'ANY')
        isTrigger = self.safe_value_2(params, 'stop', 'trigger', False)
        params = self.omit(params, ['stop', 'trigger'])
        await self.load_markets()
        await self.authenticate({'access': 'business' if isTrigger else 'private'})
        market = None
        if symbol is not None:
            market = self.market(symbol)
            symbol = market['symbol']
            type = market['type']
        if type == 'future':
            type = 'futures'
        uppercaseType = type.upper()
        marginMode = None
        marginMode, params = self.handle_margin_mode_and_params('watchOrders', params)
        if uppercaseType == 'SPOT':
            if marginMode is not None:
                uppercaseType = 'MARGIN'
        request: dict = {
            'instType': uppercaseType,
        }
        channel = 'orders-algo' if isTrigger else 'orders'
        orders = await self.subscribe('private', channel, channel, symbol, self.extend(request, params))
        if self.newUpdates:
            limit = orders.getLimit(symbol, limit)
        return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)

    def handle_orders(self, client: Client, message, subscription=None):
        #
        #     {
        #         "arg":{
        #             "channel":"orders",
        #             "instType":"SPOT"
        #         },
        #         "data":[
        #             {
        #                 "accFillSz":"0",
        #                 "amendResult":"",
        #                 "avgPx":"",
        #                 "cTime":"1634548275191",
        #                 "category":"normal",
        #                 "ccy":"",
        #                 "clOrdId":"e847386590ce4dBC330547db94a08ba0",
        #                 "code":"0",
        #                 "execType":"",
        #                 "fee":"0",
        #                 "feeCcy":"USDT",
        #                 "fillFee":"0",
        #                 "fillFeeCcy":"",
        #                 "fillNotionalUsd":"",
        #                 "fillPx":"",
        #                 "fillSz":"0",
        #                 "fillTime":"",
        #                 "instId":"ETH-USDT",
        #                 "instType":"SPOT",
        #                 "lever":"",
        #                 "msg":"",
        #                 "notionalUsd":"451.4516256",
        #                 "ordId":"370257534141235201",
        #                 "ordType":"limit",
        #                 "pnl":"0",
        #                 "posSide":"",
        #                 "px":"60000",
        #                 "rebate":"0",
        #                 "rebateCcy":"ETH",
        #                 "reqId":"",
        #                 "side":"sell",
        #                 "slOrdPx":"",
        #                 "slTriggerPx":"",
        #                 "state":"live",
        #                 "sz":"0.007526",
        #                 "tag":"",
        #                 "tdMode":"cash",
        #                 "tgtCcy":"",
        #                 "tpOrdPx":"",
        #                 "tpTriggerPx":"",
        #                 "tradeId":"",
        #                 "uTime":"1634548275191"
        #             }
        #         ]
        #     }
        #
        self.handle_my_trades(client, message)
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        orders = self.safe_value(message, 'data', [])
        ordersLength = len(orders)
        if ordersLength > 0:
            limit = self.safe_integer(self.options, 'ordersLimit', 1000)
            if self.orders is None:
                self.orders = ArrayCacheBySymbolById(limit)
                self.triggerOrders = ArrayCacheBySymbolById(limit)
            stored = self.triggerOrders if (channel == 'orders-algo') else self.orders
            marketIds = []
            parsed = self.parse_orders(orders)
            for i in range(0, len(parsed)):
                order = parsed[i]
                stored.append(order)
                symbol = order['symbol']
                market = self.market(symbol)
                marketIds.append(market['id'])
            client.resolve(stored, channel)
            for i in range(0, len(marketIds)):
                messageHash = channel + ':' + marketIds[i]
                client.resolve(stored, messageHash)

    def handle_my_trades(self, client: Client, message):
        #
        #     {
        #         "arg":{
        #             "channel":"orders",
        #             "instType":"SPOT"
        #         },
        #         "data":[
        #             {
        #                 "accFillSz":"0",
        #                 "amendResult":"",
        #                 "avgPx":"",
        #                 "cTime":"1634548275191",
        #                 "category":"normal",
        #                 "ccy":"",
        #                 "clOrdId":"e847386590ce4dBC330547db94a08ba0",
        #                 "code":"0",
        #                 "execType":"",
        #                 "fee":"0",
        #                 "feeCcy":"USDT",
        #                 "fillFee":"0",
        #                 "fillFeeCcy":"",
        #                 "fillNotionalUsd":"",
        #                 "fillPx":"",
        #                 "fillSz":"0",
        #                 "fillTime":"",
        #                 "instId":"ETH-USDT",
        #                 "instType":"SPOT",
        #                 "lever":"",
        #                 "msg":"",
        #                 "notionalUsd":"451.4516256",
        #                 "ordId":"370257534141235201",
        #                 "ordType":"limit",
        #                 "pnl":"0",
        #                 "posSide":"",
        #                 "px":"60000",
        #                 "rebate":"0",
        #                 "rebateCcy":"ETH",
        #                 "reqId":"",
        #                 "side":"sell",
        #                 "slOrdPx":"",
        #                 "slTriggerPx":"",
        #                 "state":"live",
        #                 "sz":"0.007526",
        #                 "tag":"",
        #                 "tdMode":"cash",
        #                 "tgtCcy":"",
        #                 "tpOrdPx":"",
        #                 "tpTriggerPx":"",
        #                 "tradeId":"",
        #                 "uTime":"1634548275191"
        #             }
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        rawOrders = self.safe_value(message, 'data', [])
        filteredOrders = []
        # filter orders with no last trade id
        for i in range(0, len(rawOrders)):
            rawOrder = rawOrders[i]
            tradeId = self.safe_string(rawOrder, 'tradeId', '')
            if len(tradeId) > 0:
                order = self.parse_order(rawOrder)
                filteredOrders.append(order)
        tradesLength = len(filteredOrders)
        if tradesLength == 0:
            return
        if self.myTrades is None:
            limit = self.safe_integer(self.options, 'tradesLimit', 1000)
            self.myTrades = ArrayCacheBySymbolById(limit)
        myTrades = self.myTrades
        symbols: dict = {}
        for i in range(0, len(filteredOrders)):
            rawTrade = filteredOrders[i]
            trade = self.order_to_trade(rawTrade)
            myTrades.append(trade)
            symbol = trade['symbol']
            symbols[symbol] = True
        messageHash = channel + '::myTrades'
        client.resolve(self.myTrades, messageHash)
        tradeSymbols = list(symbols.keys())
        for i in range(0, len(tradeSymbols)):
            symbolMessageHash = messageHash + '::' + tradeSymbols[i]
            client.resolve(self.myTrades, symbolMessageHash)

    def request_id(self):
        ts = str(self.milliseconds())
        randomNumber = self.rand_number(4)
        randomPart = str(randomNumber)
        return ts + randomPart

    async def create_order_ws(self, symbol: str, type: OrderType, side: OrderSide, amount: float, price: Num = None, params={}) -> Order:
        """

        https://www.okx.com/docs-v5/en/#websocket-api-trade-place-order

        create a trade order
        :param str symbol: unified symbol of the market to create an order in
        :param str type: 'market' or 'limit'
        :param str side: 'buy' or 'sell'
        :param float amount: how much of currency you want to trade in units of base currency
        :param float|None [price]: the price at which the order is to be fulfilled, in units of the quote currency, ignored in market orders
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param boolean params['test']: test order, default False
        :returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
        """
        await self.load_markets()
        await self.authenticate()
        url = self.get_url('private', 'private')
        messageHash = self.request_id()
        op = None
        op, params = self.handle_option_and_params(params, 'createOrderWs', 'op', 'batch-orders')
        args = self.create_order_request(symbol, type, side, amount, price, params)
        ordType = self.safe_string(args, 'ordType')
        if (ordType == 'trigger') or (ordType == 'conditional') or (type == 'oco') or (type == 'move_order_stop') or (type == 'iceberg') or (type == 'twap'):
            raise BadRequest(self.id + ' createOrderWs() does not support algo trading. self.options["createOrderWs"]["op"] must be either order or batch-order')
        if (op != 'order') and (op != 'batch-orders'):
            raise BadRequest(self.id + ' createOrderWs() does not support algo trading. self.options["createOrderWs"]["op"] must be either order or privatePostTradeOrder or privatePostTradeOrderAlgo')
        request: dict = {
            'id': messageHash,
            'op': op,
            'args': [args],
        }
        return await self.watch(url, messageHash, request, messageHash)

    def handle_place_orders(self, client: Client, message):
        #
        #  batch-orders/order/cancel-order
        #    {
        #        "id": "1689281055",
        #        "op": "batch-orders",
        #        "code": "0",
        #        "msg": '',
        #        "data": [{
        #            "tag": "e847386590ce4dBC",
        #            "ordId": "599823446566084608",
        #            "clOrdId": "e847386590ce4dBCb939511604f394b0",
        #            "sCode": "0",
        #            "sMsg": "Order successfully placed."
        #        },
        #        ...
        #        ]
        #    }
        #
        messageHash = self.safe_string(message, 'id')
        args = self.safe_value(message, 'data', [])
        # filter out partial errors
        args = self.filter_by(args, 'sCode', '0')
        # if empty means request failed and handle error
        if self.is_empty(args):
            method = self.safe_string(message, 'op')
            stringMsg = self.json(message)
            self.handle_errors(None, None, client.url, method, None, stringMsg, message, None, None)
        orders = self.parse_orders(args, None, None, None)
        first = self.safe_dict(orders, 0, {})
        client.resolve(first, messageHash)

    async def edit_order_ws(self, id: str, symbol: str, type: OrderType, side: OrderSide, amount: Num = None, price: Num = None, params={}) -> Order:
        """
        edit a trade order

        https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-amend-order
        https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-amend-multiple-orders

        :param str id: order id
        :param str symbol: unified symbol of the market to create an order in
        :param str type: 'market' or 'limit'
        :param str side: 'buy' or 'sell'
        :param float amount: how much of the currency you want to trade in units of the base currency
        :param float|None [price]: the price at which the order is to be fulfilled, in units of the quote currency, ignored in market orders
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
        """
        await self.load_markets()
        await self.authenticate()
        url = self.get_url('private', 'private')
        messageHash = self.request_id()
        op = None
        op, params = self.handle_option_and_params(params, 'editOrderWs', 'op', 'amend-order')
        args = self.edit_order_request(id, symbol, type, side, amount, price, params)
        request: dict = {
            'id': messageHash,
            'op': op,
            'args': [args],
        }
        return await self.watch(url, messageHash, self.extend(request, params), messageHash)

    async def cancel_order_ws(self, id: str, symbol: Str = None, params={}) -> Order:
        """

        https://okx-docs.github.io/apidocs/websocket_api/en/#cancel-order-trade

        cancel multiple orders
        :param str id: order id
        :param str symbol: unified market symbol, default is None
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.clOrdId]: client order id
        :returns dict: an list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        if symbol is None:
            raise BadRequest(self.id + ' cancelOrderWs() requires a symbol argument')
        await self.load_markets()
        await self.authenticate()
        url = self.get_url('private', 'private')
        messageHash = self.request_id()
        clientOrderId = self.safe_string_2(params, 'clOrdId', 'clientOrderId')
        params = self.omit(params, ['clientOrderId', 'clOrdId'])
        arg: dict = {
            'instId': self.market_id(symbol),
        }
        if clientOrderId is not None:
            arg['clOrdId'] = clientOrderId
        else:
            arg['ordId'] = id
        request: dict = {
            'id': messageHash,
            'op': 'cancel-order',
            'args': [self.extend(arg, params)],
        }
        return await self.watch(url, messageHash, request, messageHash)

    async def cancel_orders_ws(self, ids: List[str], symbol: Str = None, params={}):
        """

        https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-mass-cancel-order

        cancel multiple orders
        :param str[] ids: order ids
        :param str symbol: unified market symbol, default is None
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: an list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        idsLength: number = len(ids)
        if idsLength > 20:
            raise BadRequest(self.id + ' cancelOrdersWs() accepts up to 20 ids at a time')
        if symbol is None:
            raise BadRequest(self.id + ' cancelOrdersWs() requires a symbol argument')
        await self.load_markets()
        await self.authenticate()
        url = self.get_url('private', 'private')
        messageHash = self.request_id()
        args = []
        for i in range(0, idsLength):
            arg: dict = {
                'instId': self.market_id(symbol),
                'ordId': ids[i],
            }
            args.append(arg)
        request: dict = {
            'id': messageHash,
            'op': 'batch-cancel-orders',
            'args': args,
        }
        return await self.watch(url, messageHash, self.deep_extend(request, params), messageHash)

    async def cancel_all_orders_ws(self, symbol: Str = None, params={}):
        """

        https://docs.okx.com/websockets/#message-cancelAll

        cancel all open orders of a type. Only applicable to Option in Portfolio Margin mode, and MMP privilege is required.
        :param str symbol: unified market symbol, only orders in the market of self symbol are cancelled when symbol is not None
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        if symbol is None:
            raise BadRequest(self.id + ' cancelAllOrdersWs() requires a symbol argument')
        await self.load_markets()
        await self.authenticate()
        market = self.market(symbol)
        if market['type'] != 'option':
            raise BadRequest(self.id + ' cancelAllOrdersWs is only applicable to Option in Portfolio Margin mode, and MMP privilege is required.')
        url = self.get_url('private', 'private')
        messageHash = self.request_id()
        request: dict = {
            'id': messageHash,
            'op': 'mass-cancel',
            'args': [self.extend({
                'instType': 'OPTION',
                'instFamily': market['id'],
            }, params)],
        }
        return await self.watch(url, messageHash, request, messageHash)

    def handle_cancel_all_orders(self, client: Client, message):
        #
        #    {
        #        "id": "1512",
        #        "op": "mass-cancel",
        #        "data": [
        #            {
        #                "result": True
        #            }
        #        ],
        #        "code": "0",
        #        "msg": ""
        #    }
        #
        messageHash = self.safe_string(message, 'id')
        data = self.safe_value(message, 'data', [])
        client.resolve(data, messageHash)

    def handle_subscription_status(self, client: Client, message):
        #
        #     {event: 'subscribe', arg: {channel: "tickers", instId: "BTC-USDT"}}
        #
        # channel = self.safe_string(message, "channel")
        # client.subscriptions[channel] = message
        return message

    def handle_authenticate(self, client: Client, message):
        #
        #     {event: "login", success: True}
        #
        future = self.safe_value(client.futures, 'authenticated')
        future.resolve(True)

    def ping(self, client: Client):
        # OKX does not support the built-in WebSocket protocol-level ping-pong.
        # Instead, it requires a custom text-based ping-pong mechanism.
        return 'ping'

    def handle_pong(self, client: Client, message):
        client.lastPong = self.milliseconds()
        return message

    def handle_error_message(self, client: Client, message):
        #
        #     {event: 'error', msg: "Illegal request: {"op":"subscribe","args":["spot/ticker:BTC-USDT"]}", code: "60012"}
        #     {event: 'error", msg: "channel:ticker,instId:BTC-USDT doesn"t exist", code: "60018"}
        #     {"event":"error","msg":"Illegal request: {\\"id\\":\\"17321173472466905\\",\\"op\\":\\"amend-order\\",\\"args\\":[{\\"instId\\":\\"ETH-USDC\\",\\"ordId\\":\\"2000345622407479296\\",\\"newSz\\":\\"0.050857\\",\\"newPx\\":\\"2949.4\\",\\"postOnly\\":true}],\\"postOnly\\":true}","code":"60012","connId":"0808af6c"}
        #
        errorCode = self.safe_string(message, 'code')
        try:
            if errorCode and errorCode != '0':
                feedback = self.id + ' ' + self.json(message)
                if errorCode != '1':
                    self.throw_exactly_matched_exception(self.exceptions['exact'], errorCode, feedback)
                messageString = self.safe_value(message, 'msg')
                if messageString is not None:
                    self.throw_broadly_matched_exception(self.exceptions['broad'], messageString, feedback)
                else:
                    data = self.safe_list(message, 'data', [])
                    for i in range(0, len(data)):
                        d = data[i]
                        errorCode = self.safe_string(d, 'sCode')
                        if errorCode is not None:
                            self.throw_exactly_matched_exception(self.exceptions['exact'], errorCode, feedback)
                        messageString = self.safe_value(message, 'sMsg')
                        if messageString is not None:
                            self.throw_broadly_matched_exception(self.exceptions['broad'], messageString, feedback)
                raise ExchangeError(feedback)
        except Exception as e:
            # if the message contains an id, it means it is a response to a request
            # so we only reject that promise, instead of deleting all futures, destroying the authentication future
            id = self.safe_string(message, 'id')
            if id is None:
                # try to parse it from the stringified json inside msg
                msg = self.safe_string(message, 'msg')
                if msg is not None and msg.startswith('Illegal request: {'):
                    stringifiedJson = msg.replace('Illegal request: ', '')
                    parsedJson = self.parse_json(stringifiedJson)
                    id = self.safe_string(parsedJson, 'id')
            if id is not None:
                client.reject(e, id)
                return False
            client.reject(e)
            return False
        return message

    def handle_message(self, client: Client, message):
        if not self.handle_error_message(client, message):
            return
        #
        #     {event: 'subscribe', arg: {channel: "tickers", instId: "BTC-USDT"}}
        #     {event: 'login", msg: '", code: "0"}
        #
        #     {
        #         "arg": {channel: "tickers", instId: "BTC-USDT"},
        #         "data": [
        #             {
        #                 "instType": "SPOT",
        #                 "instId": "BTC-USDT",
        #                 "last": "31500.1",
        #                 "lastSz": "0.00001754",
        #                 "askPx": "31500.1",
        #                 "askSz": "0.00998144",
        #                 "bidPx": "31500",
        #                 "bidSz": "3.05652439",
        #                 "open24h": "31697",
        #                 "high24h": "32248",
        #                 "low24h": "31165.6",
        #                 "sodUtc0": "31385.5",
        #                 "sodUtc8": "32134.9",
        #                 "volCcy24h": "503403597.38138519",
        #                 "vol24h": "15937.10781721",
        #                 "ts": "1626526618762"
        #             }
        #         ]
        #     }
        #
        #     {event: 'error', msg: "Illegal request: {"op":"subscribe","args":["spot/ticker:BTC-USDT"]}", code: "60012"}
        #     {event: 'error", msg: "channel:ticker,instId:BTC-USDT doesn"t exist", code: "60018"}
        #     {event: 'error', msg: "Invalid OK_ACCESS_KEY", code: "60005"}
        #     {
        #         "event": "error",
        #         "msg": "Illegal request: {"op":"login","args":["de89b035-b233-44b2-9a13-0ccdd00bda0e","7KUcc8YzQhnxBE3K","1626691289","H57N99mBt5NvW8U19FITrPdOxycAERFMaapQWRqLaSE="]}",
        #         "code": "60012"
        #     }
        #
        #
        #
        if message == 'pong':
            self.handle_pong(client, message)
            return
        # table = self.safe_string(message, 'table')
        # if table is None:
        event = self.safe_string_2(message, 'event', 'op')
        if event is not None:
            methods: dict = {
                # 'info': self.handleSystemStatus,
                # 'book': 'handleOrderBook',
                'login': self.handle_authenticate,
                'subscribe': self.handle_subscription_status,
                'unsubscribe': self.handle_unsubscription,
                'order': self.handle_place_orders,
                'batch-orders': self.handle_place_orders,
                'amend-order': self.handle_place_orders,
                'batch-amend-orders': self.handle_place_orders,
                'cancel-order': self.handle_place_orders,
                'mass-cancel': self.handle_cancel_all_orders,
            }
            method = self.safe_value(methods, event)
            if method is not None:
                method(client, message)
        else:
            arg = self.safe_value(message, 'arg', {})
            channel = self.safe_string(arg, 'channel')
            methods: dict = {
                'bbo-tbt': self.handle_order_book,  # newly added channel that sends tick-by-tick Level 1 data, all API users can subscribe, public depth channel, verification not required
                'books': self.handle_order_book,  # all API users can subscribe, public depth channel, verification not required
                'books5': self.handle_order_book,  # all API users can subscribe, public depth channel, verification not required, data feeds will be delivered every 100ms(vs. every 200ms now)
                'books50-l2-tbt': self.handle_order_book,  # only users who're VIP4 and above can subscribe, identity verification required before subscription
                'books-l2-tbt': self.handle_order_book,  # only users who're VIP5 and above can subscribe, identity verification required before subscription
                'tickers': self.handle_ticker,
                'mark-price': self.handle_ticker,
                'positions': self.handle_positions,
                'index-tickers': self.handle_ticker,
                'sprd-tickers': self.handle_ticker,
                'block-tickers': self.handle_ticker,
                'trades': self.handle_trades,
                'account': self.handle_balance,
                'funding-rate': self.handle_funding_rate,
                # 'margin_account': self.handle_balance,
                'orders': self.handle_orders,
                'orders-algo': self.handle_orders,
                'liquidation-orders': self.handle_liquidation,
                'balance_and_position': self.handle_balance_and_position,
            }
            method = self.safe_value(methods, channel)
            if method is None:
                if channel.find('candle') == 0:
                    self.handle_ohlcv(client, message)
            else:
                method(client, message)

    def handle_un_subscription_trades(self, client: Client, symbol: str):
        subMessageHash = 'trades:' + symbol
        messageHash = 'unsubscribe:trades:' + symbol
        self.clean_unsubscription(client, subMessageHash, messageHash)
        if symbol in self.trades:
            del self.trades[symbol]

    def handle_unsubscription_order_book(self, client: Client, symbol: str, channel: str):
        subMessageHash = channel + ':' + symbol
        messageHash = 'unsubscribe:orderbook:' + symbol
        self.clean_unsubscription(client, subMessageHash, messageHash)
        if symbol in self.orderbooks:
            del self.orderbooks[symbol]

    def handle_unsubscription_ohlcv(self, client: Client, symbol: str, channel: str):
        tf = channel.replace('candle', '')
        timeframe = self.find_timeframe(tf)
        subMessageHash = 'multi:' + channel + ':' + symbol
        messageHash = 'unsubscribe:' + subMessageHash
        self.clean_unsubscription(client, subMessageHash, messageHash)
        if timeframe in self.ohlcvs[symbol]:
            del self.ohlcvs[symbol][timeframe]

    def handle_unsubscription_ticker(self, client: Client, symbol: str, channel):
        subMessageHash = channel + '::' + symbol
        messageHash = 'unsubscribe:ticker:' + symbol
        self.clean_unsubscription(client, subMessageHash, messageHash)
        if symbol in self.tickers:
            del self.tickers[symbol]

    def handle_unsubscription(self, client: Client, message):
        #
        # {
        #     "event": "unsubscribe",
        #     "arg": {
        #       "channel": "tickers",
        #       "instId": "LTC-USD-200327"
        #     },
        #     "connId": "a4d3ae55"
        # }
        # arg might be an array or list
        arg = self.safe_dict(message, 'arg', {})
        channel = self.safe_string(arg, 'channel', '')
        marketId = self.safe_string(arg, 'instId')
        symbol = self.safe_symbol(marketId)
        if channel == 'trades':
            self.handle_un_subscription_trades(client, symbol)
        elif channel.startswith('bbo') or channel.startswith('book'):
            self.handle_unsubscription_order_book(client, symbol, channel)
        elif channel.find('tickers') > -1:
            self.handle_unsubscription_ticker(client, symbol, channel)
        elif channel.startswith('candle'):
            self.handle_unsubscription_ohlcv(client, symbol, channel)
