# -*- coding: utf-8 -*-

# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code

import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheByTimestamp
from ccxt.base.types import Any, Balances, Int, Order, OrderBook, Position, Str, Strings, Ticker, Tickers, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import ArgumentsRequired


class kucoinfutures(ccxt.async_support.kucoinfutures):

    def describe(self) -> Any:
        return self.deep_extend(super(kucoinfutures, self).describe(), {
            'has': {
                'ws': True,
                'watchLiquidations': False,
                'watchLiquidatinsForSymbols': False,
                'watchMyLiquidations': None,
                'watchMyLiquidationsForSymbols': None,
                'watchTicker': True,
                'watchTickers': True,
                'watchBidsAsks': True,
                'watchTrades': True,
                'watchOHLCV': True,
                'watchOrderBook': True,
                'watchOrders': True,
                'watchBalance': True,
                'watchPosition': True,
                'watchPositions': False,
                'watchPositionForSymbols': False,
                'watchTradesForSymbols': True,
                'watchOrderBookForSymbols': True,
            },
            'options': {
                'timeframes': {
                    '1m': '1min',
                    '3m': '1min',
                    '5m': '5min',
                    '15m': '15min',
                    '30m': '30min',
                    '1h': '1hour',
                    '2h': '2hour',
                    '4h': '4hour',
                    '8h': '8hour',
                    '12h': '12hour',
                    '1d': '1day',
                    '1w': '1week',
                    '1M': '1month',
                },
                'accountsByType': {
                    'swap': 'future',
                    'cross': 'margin',
                    # 'spot': ,
                    # 'margin': ,
                    # 'main': ,
                    # 'funding': ,
                    # 'future': ,
                    # 'mining': ,
                    # 'trade': ,
                    # 'contract': ,
                    # 'pool': ,
                },
                'tradesLimit': 1000,
                'watchOrderBook': {
                    'snapshotDelay': 20,
                    'snapshotMaxRetries': 3,
                },
                'watchPosition': {
                    'fetchPositionSnapshot': True,  # or False
                    'awaitPositionSnapshot': True,  # whether to wait for the position snapshot before providing updates
                },
            },
            'streaming': {
                # kucoin does not support built-in ws protocol-level ping-pong
                # instead it requires a custom json-based text ping-pong
                # https://docs.kucoin.com/#ping
                'ping': self.ping,
            },
        })

    async def negotiate(self, privateChannel, params={}):
        connectId = 'private' if privateChannel else 'public'
        urls = self.safe_value(self.options, 'urls', {})
        spawaned = self.safe_value(urls, connectId)
        if spawaned is not None:
            return await spawaned
        # we store an awaitable to the url
        # so that multiple calls don't asynchronously
        # fetch different urls and overwrite each other
        urls[connectId] = self.spawn(self.negotiate_helper, privateChannel, params)  # we have to wait here otherwsie in c# will not work
        self.options['urls'] = urls
        future = urls[connectId]
        return await future

    async def negotiate_helper(self, privateChannel, params={}):
        response = None
        connectId = 'private' if privateChannel else 'public'
        try:
            if privateChannel:
                response = await self.futuresPrivatePostBulletPrivate(params)
                #
                #     {
                #         "code": "200000",
                #         "data": {
                #             "instanceServers": [
                #                 {
                #                     "pingInterval":  50000,
                #                     "endpoint": "wss://push-private.kucoin.com/endpoint",
                #                     "protocol": "websocket",
                #                     "encrypt": True,
                #                     "pingTimeout": 10000
                #                 }
                #             ],
                #             "token": "2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJ1UQy47YbpY4zVdzilNP-Bj3iXzrjjGlWtiYB9J6i9GjsxUuhPw3BlrzazF6ghq4Lzf7scStOz3KkxjwpsOBCH4=.WNQmhZQeUKIkh97KYgU0Lg=="
                #         }
                #     }
                #
            else:
                response = await self.futuresPublicPostBulletPublic(params)
            data = self.safe_value(response, 'data', {})
            instanceServers = self.safe_value(data, 'instanceServers', [])
            firstInstanceServer = self.safe_value(instanceServers, 0)
            pingInterval = self.safe_integer(firstInstanceServer, 'pingInterval')
            endpoint = self.safe_string(firstInstanceServer, 'endpoint')
            token = self.safe_string(data, 'token')
            result = endpoint + '?' + self.urlencode({
                'token': token,
                'privateChannel': privateChannel,
                'connectId': connectId,
            })
            client = self.client(result)
            client.keepAlive = pingInterval
            return result
        except Exception as e:
            future = self.safe_value(self.options['urls'], connectId)
            future.reject(e)
            del self.options['urls'][connectId]
        return None

    def request_id(self):
        requestId = self.sum(self.safe_integer(self.options, 'requestId', 0), 1)
        self.options['requestId'] = requestId
        return requestId

    async def subscribe(self, url, messageHash, subscriptionHash, subscription, params={}):
        requestId = str(self.request_id())
        request: dict = {
            'id': requestId,
            'type': 'subscribe',
            'topic': subscriptionHash,
            'response': True,
        }
        message = self.extend(request, params)
        subscriptionRequest: dict = {
            'id': requestId,
        }
        if subscription is None:
            subscription = subscriptionRequest
        else:
            subscription = self.extend(subscriptionRequest, subscription)
        return await self.watch(url, messageHash, message, subscriptionHash, subscription)

    async def subscribe_multiple(self, url, messageHashes, topic, subscriptionHashes, subscriptionArgs, params={}):
        requestId = str(self.request_id())
        request: dict = {
            'id': requestId,
            'type': 'subscribe',
            'topic': topic,
            'response': True,
        }
        return await self.watch_multiple(url, messageHashes, self.extend(request, params), subscriptionHashes, subscriptionArgs)

    async def un_subscribe_multiple(self, url, messageHashes, topic, subscriptionHashes, params={}, subscription: dict = None):
        requestId = str(self.request_id())
        request: dict = {
            'id': requestId,
            'type': 'unsubscribe',
            'topic': topic,
            'response': True,
        }
        message = self.extend(request, params)
        if subscription is not None:
            subscription[requestId] = requestId
        client = self.client(url)
        for i in range(0, len(subscriptionHashes)):
            subscriptionHash = subscriptionHashes[i]
            if not (subscriptionHash in client.subscriptions):
                client.subscriptions[requestId] = subscriptionHash
        return await self.watch_multiple(url, messageHashes, message, subscriptionHashes, subscription)

    async def watch_ticker(self, symbol: str, params={}) -> Ticker:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market

        https://www.kucoin.com/docs/websocket/futures-trading/public-channels/get-ticker

        :param str symbol: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        market = self.market(symbol)
        symbol = market['symbol']
        params['callerMethodName'] = 'watchTicker'
        tickers = await self.watch_tickers([symbol], params)
        return tickers[symbol]

    async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list
        :param str[] symbols: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        ticker = await self.watch_multi_request('watchTickers', '/contractMarket/ticker:', symbols, params)
        if self.newUpdates:
            tickers: dict = {}
            tickers[ticker['symbol']] = ticker
            return tickers
        return self.filter_by_array(self.tickers, 'symbol', symbols)

    def handle_ticker(self, client: Client, message):
        #
        # ticker(v1)
        #
        #    {
        #     "subject": "ticker",
        #     "topic": "/contractMarket/ticker:XBTUSDM",
        #     "data": {
        #         "symbol": "XBTUSDM",  #Market of the symbol
        #         "sequence": 45,  #Sequence number which is used to judge the continuity of the pushed messages
        #         "side": "sell",  #Transaction side of the last traded taker order
        #         "price": "3600.0",  #Filled price
        #         "size": 16,  #Filled quantity
        #         "tradeId": "5c9dcf4170744d6f5a3d32fb",  #Order ID
        #         "bestBidSize": 795,  #Best bid size
        #         "bestBidPrice": "3200.0",  #Best bid
        #         "bestAskPrice": "3600.0",  #Best ask size
        #         "bestAskSize": 284,  #Best ask
        #         "ts": 1553846081210004941  #Filled time - nanosecond
        #     }
        #    }
        #
        data = self.safe_value(message, 'data', {})
        marketId = self.safe_value(data, 'symbol')
        market = self.safe_market(marketId, None, '-')
        ticker = self.parse_ticker(data, market)
        self.tickers[market['symbol']] = ticker
        client.resolve(ticker, self.get_message_hash('ticker', market['symbol']))

    async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers:
        """

        https://www.kucoin.com/docs/websocket/futures-trading/public-channels/get-ticker-v2

        watches best bid & ask for symbols
        :param str[] symbols: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        ticker = await self.watch_multi_request('watchBidsAsks', '/contractMarket/tickerV2:', symbols, params)
        if self.newUpdates:
            tickers: dict = {}
            tickers[ticker['symbol']] = ticker
            return tickers
        return self.filter_by_array(self.bidsasks, 'symbol', symbols)

    async def watch_multi_request(self, methodName, channelName: str, symbols: Strings = None, params={}):
        await self.load_markets()
        methodName, params = self.handle_param_string(params, 'callerMethodName', methodName)
        isBidsAsks = (methodName == 'watchBidsAsks')
        symbols = self.market_symbols(symbols, None, False, True, False)
        length = len(symbols)
        if length > 100:
            raise ArgumentsRequired(self.id + ' ' + methodName + '() accepts a maximum of 100 symbols')
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            market = self.market(symbol)
            prefix = 'bidask' if isBidsAsks else 'ticker'
            messageHashes.append(self.get_message_hash(prefix, market['symbol']))
        url = await self.negotiate(False)
        marketIds = self.market_ids(symbols)
        joined = ','.join(marketIds)
        requestId = str(self.request_id())
        request: dict = {
            'id': requestId,
            'type': 'subscribe',
            'topic': channelName + joined,
            'response': True,
        }
        subscription: dict = {
            'id': requestId,
        }
        return await self.watch_multiple(url, messageHashes, self.extend(request, params), messageHashes, subscription)

    def handle_bid_ask(self, client: Client, message):
        #
        # arrives one symbol dict
        #
        # {
        #   "subject": "tickerV2",
        #   "topic": "/contractMarket/tickerV2:XBTUSDM",
        #   "data": {
        #     "symbol": "XBTUSDM",  #Market of the symbol
        #     "bestBidSize": 795,  # Best bid size
        #     "bestBidPrice": 3200.0,  # Best bid
        #     "bestAskPrice": 3600.0,  # Best ask
        #     "bestAskSize": 284,  # Best ask size
        #     "ts": 1553846081210004941  # Filled time - nanosecond
        #   }
        # }
        #
        parsedTicker = self.parse_ws_bid_ask(message)
        symbol = parsedTicker['symbol']
        self.bidsasks[symbol] = parsedTicker
        client.resolve(parsedTicker, self.get_message_hash('bidask', symbol))

    def parse_ws_bid_ask(self, ticker, market=None):
        data = self.safe_dict(ticker, 'data', {})
        marketId = self.safe_string(data, 'symbol')
        market = self.safe_market(marketId, market)
        symbol = self.safe_string(market, 'symbol')
        timestamp = self.safe_integer_product(data, 'ts', 0.000001)
        return self.safe_ticker({
            'symbol': symbol,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'ask': self.safe_number(data, 'bestAskPrice'),
            'askVolume': self.safe_number(data, 'bestAskSize'),
            'bid': self.safe_number(data, 'bestBidPrice'),
            'bidVolume': self.safe_number(data, 'bestBidSize'),
            'info': ticker,
        }, market)

    async def watch_position(self, symbol: Str = None, params={}) -> Position:
        """
        watch open positions for a specific symbol

        https://docs.kucoin.com/futures/#position-change-events

        :param str|None symbol: unified market symbol
        :param dict params: extra parameters specific to the exchange API endpoint
        :returns dict: a `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
        """
        if symbol is None:
            raise ArgumentsRequired(self.id + ' watchPosition() requires a symbol argument')
        await self.load_markets()
        url = await self.negotiate(True)
        market = self.market(symbol)
        topic = '/contract/position:' + market['id']
        request: dict = {
            'privateChannel': True,
        }
        messageHash = 'position:' + market['symbol']
        client = self.client(url)
        self.set_position_cache(client, symbol)
        fetchPositionSnapshot = self.handle_option('watchPosition', 'fetchPositionSnapshot', True)
        awaitPositionSnapshot = self.handle_option('watchPosition', 'awaitPositionSnapshot', True)
        currentPosition = self.get_current_position(symbol)
        if fetchPositionSnapshot and awaitPositionSnapshot and currentPosition is None:
            snapshot = await client.future('fetchPositionSnapshot:' + symbol)
            return snapshot
        return await self.subscribe(url, messageHash, topic, None, self.extend(request, params))

    def get_current_position(self, symbol):
        if self.positions is None:
            return None
        cache = self.positions.hashmap
        symbolCache = self.safe_value(cache, symbol, {})
        values = list(symbolCache.values())
        return self.safe_value(values, 0)

    def set_position_cache(self, client: Client, symbol: str):
        fetchPositionSnapshot = self.handle_option('watchPosition', 'fetchPositionSnapshot', False)
        if fetchPositionSnapshot:
            messageHash = 'fetchPositionSnapshot:' + symbol
            if not (messageHash in client.futures):
                client.future(messageHash)
                self.spawn(self.load_position_snapshot, client, messageHash, symbol)

    async def load_position_snapshot(self, client, messageHash, symbol):
        position = await self.fetch_position(symbol)
        self.positions = ArrayCacheBySymbolById()
        cache = self.positions
        cache.append(position)
        # don't remove the future from the .futures cache
        future = client.futures[messageHash]
        future.resolve(cache)
        client.resolve(position, 'position:' + symbol)

    def handle_position(self, client: Client, message):
        #
        # Position Changes Caused Operations
        #    {
        #        "type": "message",
        #        "userId": "5c32d69203aa676ce4b543c7",  # Deprecated, will detele later
        #        "channelType": "private",
        #        "topic": "/contract/position:XBTUSDM",
        #        "subject": "position.change",
        #        "data": {
        #            "realisedGrossPnl": 0E-8,  #Accumulated realised profit and loss
        #            "symbol": "XBTUSDM",  #Symbol
        #            "crossMode": False,  #Cross mode or not
        #            "liquidationPrice": 1000000.0,  #Liquidation price
        #            "posLoss": 0E-8,  #Manually added margin amount
        #            "avgEntryPrice": 7508.22,  #Average entry price
        #            "unrealisedPnl": -0.00014735,  #Unrealised profit and loss
        #            "markPrice": 7947.83,  #Mark price
        #            "posMargin": 0.00266779,  #Position margin
        #            "autoDeposit": False,  #Auto deposit margin or not
        #            "riskLimit": 100000,  #Risk limit
        #            "unrealisedCost": 0.00266375,  #Unrealised value
        #            "posComm": 0.00000392,  #Bankruptcy cost
        #            "posMaint": 0.00001724,  #Maintenance margin
        #            "posCost": 0.00266375,  #Position value
        #            "maintMarginReq": 0.005,  #Maintenance margin rate
        #            "bankruptPrice": 1000000.0,  #Bankruptcy price
        #            "realisedCost": 0.00000271,  #Currently accumulated realised position value
        #            "markValue": 0.00251640,  #Mark value
        #            "posInit": 0.00266375,  #Position margin
        #            "realisedPnl": -0.00000253,  #Realised profit and losts
        #            "maintMargin": 0.00252044,  #Position margin
        #            "realLeverage": 1.06,  #Leverage of the order
        #            "changeReason": "positionChange",  #changeReason:marginChange、positionChange、liquidation、autoAppendMarginStatusChange、adl
        #            "currentCost": 0.00266375,  #Current position value
        #            "openingTimestamp": 1558433191000,  #Open time
        #            "currentQty": -20,  #Current position
        #            "delevPercentage": 0.52,  #ADL ranking percentile
        #            "currentComm": 0.00000271,  #Current commission
        #            "realisedGrossCost": 0E-8,  #Accumulated reliased gross profit value
        #            "isOpen": True,  #Opened position or not
        #            "posCross": 1.2E-7,  #Manually added margin
        #            "currentTimestamp": 1558506060394,  #Current timestamp
        #            "unrealisedRoePcnt": -0.0553,  #Rate of return on investment
        #            "unrealisedPnlPcnt": -0.0553,  #Position profit and loss ratio
        #            "settleCurrency": "XBT"  #Currency used to clear and settle the trades
        #        }
        #    }
        # Position Changes Caused by Mark Price
        #    {
        #        "userId": "5cd3f1a7b7ebc19ae9558591",  # Deprecated, will detele later
        #        "topic": "/contract/position:XBTUSDM",
        #        "subject": "position.change",
        #          "data": {
        #              "markPrice": 7947.83,                   #Mark price
        #              "markValue": 0.00251640,                 #Mark value
        #              "maintMargin": 0.00252044,              #Position margin
        #              "realLeverage": 10.06,                   #Leverage of the order
        #              "unrealisedPnl": -0.00014735,           #Unrealised profit and lost
        #              "unrealisedRoePcnt": -0.0553,           #Rate of return on investment
        #              "unrealisedPnlPcnt": -0.0553,            #Position profit and loss ratio
        #              "delevPercentage": 0.52,             #ADL ranking percentile
        #              "currentTimestamp": 1558087175068,      #Current timestamp
        #              "settleCurrency": "XBT"                 #Currency used to clear and settle the trades
        #          }
        #    }
        #  Funding Settlement
        #    {
        #        "userId": "xbc453tg732eba53a88ggyt8c",  # Deprecated, will detele later
        #        "topic": "/contract/position:XBTUSDM",
        #        "subject": "position.settlement",
        #        "data": {
        #            "fundingTime": 1551770400000,          #Funding time
        #            "qty": 100,                            #Position siz
        #            "markPrice": 3610.85,                 #Settlement price
        #            "fundingRate": -0.002966,             #Funding rate
        #            "fundingFee": -296,                   #Funding fees
        #            "ts": 1547697294838004923,             #Current time(nanosecond)
        #            "settleCurrency": "XBT"                #Currency used to clear and settle the trades
        #        }
        #    }
        # Adjustmet result of risk limit level
        #     {
        #         "userId": "xbc453tg732eba53a88ggyt8c",
        #         "topic": "/contract/position:ADAUSDTM",
        #         "subject": "position.adjustRiskLimit",
        #         "data": {
        #           "success": True,  # Successful or not
        #           "riskLimitLevel": 1,  # Current risk limit level
        #           "msg": ""  # Failure reason
        #         }
        #     }
        #
        topic = self.safe_string(message, 'topic', '')
        parts = topic.split(':')
        marketId = self.safe_string(parts, 1)
        symbol = self.safe_symbol(marketId, None, '')
        cache = self.positions
        currentPosition = self.get_current_position(symbol)
        messageHash = 'position:' + symbol
        data = self.safe_value(message, 'data', {})
        newPosition = self.parse_position(data)
        keys = list(newPosition.keys())
        for i in range(0, len(keys)):
            key = keys[i]
            if newPosition[key] is None:
                del newPosition[key]
        position = self.extend(currentPosition, newPosition)
        cache.append(position)
        client.resolve(position, messageHash)

    async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol

        https://docs.kucoin.com/futures/#execution-data

        :param str symbol: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        return await self.watch_trades_for_symbols([symbol], since, limit, params)

    async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol
        :param str[] symbols:
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        symbolsLength = len(symbols)
        if symbolsLength == 0:
            raise ArgumentsRequired(self.id + ' watchTradesForSymbols() requires a non-empty array of symbols')
        await self.load_markets()
        symbols = self.market_symbols(symbols)
        url = await self.negotiate(False)
        symbols = self.market_symbols(symbols)
        marketIds = self.market_ids(symbols)
        topic = '/contractMarket/execution:' + ','.join(marketIds)
        subscriptionHashes = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            marketId = marketIds[i]
            messageHashes.append('trades:' + symbol)
            subscriptionHashes.append('/contractMarket/execution:' + marketId)
        trades = await self.subscribe_multiple(url, messageHashes, topic, subscriptionHashes, None, params)
        if self.newUpdates:
            first = self.safe_value(trades, 0)
            tradeSymbol = self.safe_string(first, 'symbol')
            limit = trades.getLimit(tradeSymbol, limit)
        return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)

    async def un_watch_trades(self, symbol: str, params={}) -> Any:
        """
        unWatches trades stream

        https://docs.kucoin.com/futures/#execution-data

        :param str symbol: unified symbol of the market to fetch trades for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        return await self.un_watch_trades_for_symbols([symbol], params)

    async def un_watch_trades_for_symbols(self, symbols: List[str], params={}) -> Any:
        """
        get the list of most recent trades for a particular symbol
        :param str[] symbols:
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False)
        url = await self.negotiate(False)
        symbols = self.market_symbols(symbols)
        marketIds = self.market_ids(symbols)
        topic = '/contractMarket/execution:' + ','.join(marketIds)
        subscriptionHashes = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            messageHashes.append('unsubscribe:trades:' + symbol)
            subscriptionHashes.append('trades:' + symbol)
        subscription = {
            'messageHashes': messageHashes,
            'subMessageHashes': subscriptionHashes,
            'topic': 'trades',
            'unsubscribe': True,
            'symbols': symbols,
        }
        return await self.un_subscribe_multiple(url, messageHashes, topic, messageHashes, params, subscription)

    def handle_trade(self, client: Client, message):
        #
        #    {
        #        "type": "message",
        #        "topic": "/contractMarket/execution:ADAUSDTM",
        #        "subject": "match",
        #        "data": {
        #            "makerUserId": "62286a4d720edf0001e81961",
        #            "symbol": "ADAUSDTM",
        #            "sequence": 41320766,
        #            "side": "sell",
        #            "size": 2,
        #            "price": 0.35904,
        #            "takerOrderId": "636dd9da9857ba00010cfa44",
        #            "makerOrderId": "636dd9c8df149d0001e62bc8",
        #            "takerUserId": "6180be22b6ab210001fa3371",
        #            "tradeId": "636dd9da0000d400d477eca7",
        #            "ts": 1668143578987357700
        #        }
        #    }
        #
        data = self.safe_value(message, 'data', {})
        trade = self.parse_trade(data)
        symbol = trade['symbol']
        trades = self.safe_value(self.trades, symbol)
        if trades is None:
            limit = self.safe_integer(self.options, 'tradesLimit', 1000)
            trades = ArrayCache(limit)
            self.trades[symbol] = trades
        trades.append(trade)
        messageHash = 'trades:' + symbol
        client.resolve(trades, messageHash)
        return message

    async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
        """

        https://www.kucoin.com/docs/websocket/futures-trading/public-channels/klines

        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
        :param str symbol: unified symbol of the market to fetch OHLCV data for
        :param str timeframe: the length of time each candle represents
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        await self.load_markets()
        symbol = self.symbol(symbol)
        url = await self.negotiate(False)
        marketId = self.market_id(symbol)
        timeframes = self.safe_dict(self.options, 'timeframes')
        timeframeId = self.safe_string(timeframes, timeframe, timeframe)
        topic = '/contractMarket/limitCandle:' + marketId + '_' + timeframeId
        messageHash = 'ohlcv::' + symbol + '_' + timeframe
        ohlcv = await self.subscribe(url, messageHash, topic, None, params)
        if self.newUpdates:
            limit = ohlcv.getLimit(symbol, limit)
        return self.filter_by_since_limit(ohlcv, since, limit, 0, True)

    def handle_ohlcv(self, client: Client, message):
        #
        #    {
        #        "topic":"/contractMarket/limitCandle:LTCUSDTM_1min",
        #        "type":"message",
        #        "data":{
        #            "symbol":"LTCUSDTM",
        #            "candles":[
        #                "1715470980",
        #                "81.38",
        #                "81.38",
        #                "81.38",
        #                "81.38",
        #                "61.0",
        #                "61"
        #            ],
        #            "time":1715470994801
        #        },
        #        "subject":"candle.stick"
        #    }
        #
        topic = self.safe_string(message, 'topic')
        parts = topic.split('_')
        timeframeId = self.safe_string(parts, 1)
        data = self.safe_dict(message, 'data')
        timeframes = self.safe_dict(self.options, 'timeframes')
        timeframe = self.find_timeframe(timeframeId, timeframes)
        marketId = self.safe_string(data, 'symbol')
        symbol = self.safe_symbol(marketId)
        messageHash = 'ohlcv::' + symbol + '_' + timeframe
        ohlcv = self.safe_list(data, 'candles')
        parsed = [
            self.safe_integer(ohlcv, 0),
            self.safe_number(ohlcv, 1),
            self.safe_number(ohlcv, 2),
            self.safe_number(ohlcv, 3),
            self.safe_number(ohlcv, 4),
            self.safe_number(ohlcv, 6),  # Note value 5 is incorrect and will be fixed in subsequent versions of kucoin
        ]
        self.ohlcvs[symbol] = self.safe_dict(self.ohlcvs, symbol, {})
        if not (timeframe in self.ohlcvs[symbol]):
            limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
            self.ohlcvs[symbol][timeframe] = ArrayCacheByTimestamp(limit)
        stored = self.ohlcvs[symbol][timeframe]
        stored.append(parsed)
        client.resolve(stored, messageHash)

    async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
   1. After receiving the websocket Level 2 data flow, cache the data.
   2. Initiate a REST request to get the snapshot data of Level 2 order book.
   3. Playback the cached Level 2 data flow.
   4. Apply the new Level 2 data flow to the local snapshot to ensure that the sequence of the new Level 2 update lines up with the sequence of the previous Level 2 data. Discard all the message prior to that sequence, and then playback the change to snapshot.
   5. Update the level2 full data based on sequence according to the size. If the price is 0, ignore the messages and update the sequence. If the size=0, update the sequence and remove the price of which the size is 0 out of level 2. For other cases, please update the price.
   6. If the sequence of the newly pushed message does not line up to the sequence of the last message, you could pull through REST Level 2 message request to get the updated messages. Please note that the difference between the start and end parameters cannot exceed 500.

        https://docs.kucoin.com/futures/#level-2-market-data

        :param str symbol: unified symbol of the market to fetch the order book for
        :param int [limit]: the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        return await self.watch_order_book_for_symbols([symbol], limit, params)

    async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://docs.kucoin.com/futures/#level-2-market-data

        :param str[] symbols: unified array of symbols
        :param int [limit]: the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        symbolsLength = len(symbols)
        if symbolsLength == 0:
            raise ArgumentsRequired(self.id + ' watchOrderBookForSymbols() requires a non-empty array of symbols')
        if limit is not None:
            if (limit != 20) and (limit != 100):
                raise ExchangeError(self.id + " watchOrderBook 'limit' argument must be None, 20 or 100")
        await self.load_markets()
        symbols = self.market_symbols(symbols)
        marketIds = self.market_ids(symbols)
        url = await self.negotiate(False)
        topic = '/contractMarket/level2:' + ','.join(marketIds)
        subscriptionArgs: dict = {
            'limit': limit,
        }
        subscriptionHashes = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            marketId = marketIds[i]
            messageHashes.append('orderbook:' + symbol)
            subscriptionHashes.append('/contractMarket/level2:' + marketId)
        orderbook = await self.subscribe_multiple(url, messageHashes, topic, subscriptionHashes, subscriptionArgs, params)
        return orderbook.limit()

    async def un_watch_order_book(self, symbol: str, params={}) -> Any:
        """
        unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://docs.kucoin.com/futures/#level-2-market-data

        :param str symbol: unified symbol of the market to fetch the order book for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        return await self.un_watch_order_book_for_symbols([symbol], params)

    async def un_watch_order_book_for_symbols(self, symbols: List[str], params={}) -> Any:
        """
        unWatches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
        :param str[] symbols: unified array of symbols
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols)
        marketIds = self.market_ids(symbols)
        url = await self.negotiate(False)
        topic = '/contractMarket/level2:' + ','.join(marketIds)
        subscriptionHashes = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            messageHashes.append('unsubscribe:orderbook:' + symbol)
            subscriptionHashes.append('orderbook:' + symbol)
        subscription = {
            'messageHashes': messageHashes,
            'symbols': symbols,
            'unsubscribe': True,
            'topic': 'orderbook',
            'subMessageHashes': subscriptionHashes,
        }
        return await self.un_subscribe_multiple(url, messageHashes, topic, messageHashes, params, subscription)

    def handle_delta(self, orderbook, delta):
        orderbook['nonce'] = self.safe_integer(delta, 'sequence')
        timestamp = self.safe_integer(delta, 'timestamp')
        orderbook['timestamp'] = timestamp
        orderbook['datetime'] = self.iso8601(timestamp)
        change = self.safe_value(delta, 'change', {})
        splitChange = change.split(',')
        price = self.safe_number(splitChange, 0)
        side = self.safe_string(splitChange, 1)
        quantity = self.safe_number(splitChange, 2)
        type = 'bids' if (side == 'buy') else 'asks'
        value = [price, quantity]
        if type == 'bids':
            storedBids = orderbook['bids']
            storedBids.storeArray(value)
        else:
            storedAsks = orderbook['asks']
            storedAsks.storeArray(value)

    def handle_deltas(self, bookside, deltas):
        for i in range(0, len(deltas)):
            self.handle_delta(bookside, deltas[i])

    def handle_order_book(self, client: Client, message):
        #
        # initial snapshot is fetched with ccxt's fetchOrderBook
        # the feed does not include a snapshot, just the deltas
        #
        #    {
        #        "type": "message",
        #        "topic": "/contractMarket/level2:ADAUSDTM",
        #        "subject": "level2",
        #        "data": {
        #            "sequence": 1668059586457,
        #            "change": "0.34172,sell,456",  # type, side, quantity
        #            "timestamp": 1668573023223
        #        }
        #    }
        #
        data = self.safe_value(message, 'data')
        topic = self.safe_string(message, 'topic')
        topicParts = topic.split(':')
        marketId = self.safe_string(topicParts, 1)
        symbol = self.safe_symbol(marketId, None, '-')
        messageHash = 'orderbook:' + symbol
        if not (symbol in self.orderbooks):
            subscriptionArgs = self.safe_dict(client.subscriptions, topic, {})
            limit = self.safe_integer(subscriptionArgs, 'limit')
            self.orderbooks[symbol] = self.order_book({}, limit)
        storedOrderBook = self.orderbooks[symbol]
        nonce = self.safe_integer(storedOrderBook, 'nonce')
        deltaEnd = self.safe_integer(data, 'sequence')
        if nonce is None:
            cacheLength = len(storedOrderBook.cache)
            topicPartsNew = topic.split(':')
            topicSymbol = self.safe_string(topicPartsNew, 1)
            topicChannel = self.safe_string(topicPartsNew, 0)
            subscriptions = list(client.subscriptions.keys())
            subscription = None
            for i in range(0, len(subscriptions)):
                key = subscriptions[i]
                if (key.find(topicSymbol) >= 0) and (key.find(topicChannel) >= 0):
                    subscription = client.subscriptions[key]
                    break
            limit = self.safe_integer(subscription, 'limit')
            snapshotDelay = self.handle_option('watchOrderBook', 'snapshotDelay', 5)
            if cacheLength == snapshotDelay:
                self.spawn(self.load_order_book, client, messageHash, symbol, limit, {})
            storedOrderBook.cache.append(data)
            return
        elif nonce >= deltaEnd:
            return
        self.handle_delta(storedOrderBook, data)
        client.resolve(storedOrderBook, messageHash)

    def get_cache_index(self, orderbook, cache):
        firstDelta = self.safe_value(cache, 0)
        nonce = self.safe_integer(orderbook, 'nonce')
        firstDeltaStart = self.safe_integer(firstDelta, 'sequence')
        if nonce < firstDeltaStart - 1:
            return -1
        for i in range(0, len(cache)):
            delta = cache[i]
            deltaStart = self.safe_integer(delta, 'sequence')
            if nonce < deltaStart - 1:
                return i
        return len(cache)

    def handle_system_status(self, client: Client, message):
        #
        # todo: answer the question whether handleSystemStatus should be renamed
        # and unified for any usage pattern that
        # involves system status and maintenance updates
        #
        #     {
        #         "id": "1578090234088",  # connectId
        #         "type": "welcome",
        #     }
        #
        return message

    async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
        """
        watches information on multiple orders made by the user

        https://docs.kucoin.com/futures/#trade-orders-according-to-the-market

        :param str symbol: unified market symbol of the market orders were made in
        :param int [since]: the earliest time in ms to fetch orders for
        :param int [limit]: the maximum number of order structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        await self.load_markets()
        url = await self.negotiate(True)
        topic = '/contractMarket/tradeOrders'
        request: dict = {
            'privateChannel': True,
        }
        messageHash = 'orders'
        if symbol is not None:
            market = self.market(symbol)
            symbol = market['symbol']
            messageHash = messageHash + ':' + symbol
        orders = await self.subscribe(url, messageHash, topic, None, self.extend(request, params))
        if self.newUpdates:
            limit = orders.getLimit(symbol, limit)
        return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)

    def parse_ws_order_status(self, status):
        statuses: dict = {
            'open': 'open',
            'filled': 'closed',
            'match': 'open',
            'update': 'open',
            'canceled': 'canceled',
        }
        return self.safe_string(statuses, status, status)

    def parse_ws_order(self, order, market=None):
        #
        #         "symbol": "XCAD-USDT",
        #     {
        #         "orderType": "limit",
        #         "side": "buy",
        #         "orderId": "6249167327218b000135e749",
        #         "type": "canceled",
        #         "orderTime": 1648957043065280224,
        #         "size": "100.452",
        #         "filledSize": "0",
        #         "price": "2.9635",
        #         "clientOid": "buy-XCAD-USDT-1648957043010159",
        #         "remainSize": "0",
        #         "status": "done",
        #         "ts": 1648957054031001037
        #     }
        #
        id = self.safe_string(order, 'orderId')
        clientOrderId = self.safe_string(order, 'clientOid')
        orderType = self.safe_string_lower(order, 'orderType')
        price = self.safe_string(order, 'price')
        filled = self.safe_string(order, 'filledSize')
        amount = self.safe_string(order, 'size')
        rawType = self.safe_string(order, 'type')
        status = self.parse_ws_order_status(rawType)
        timestamp = self.safe_integer_product(order, 'orderTime', 0.000001)
        marketId = self.safe_string(order, 'symbol')
        market = self.safe_market(marketId, market)
        symbol = market['symbol']
        side = self.safe_string_lower(order, 'side')
        return self.safe_order({
            'info': order,
            'symbol': symbol,
            'id': id,
            'clientOrderId': clientOrderId,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'lastTradeTimestamp': None,
            'type': orderType,
            'timeInForce': None,
            'postOnly': None,
            'side': side,
            'price': price,
            'stopPrice': None,
            'amount': amount,
            'cost': None,
            'average': None,
            'filled': filled,
            'remaining': None,
            'status': status,
            'fee': None,
            'trades': None,
        }, market)

    def handle_order(self, client: Client, message):
        messageHash = 'orders'
        data = self.safe_value(message, 'data')
        parsed = self.parse_ws_order(data)
        symbol = self.safe_string(parsed, 'symbol')
        orderId = self.safe_string(parsed, 'id')
        if symbol is not None:
            if self.orders is None:
                limit = self.safe_integer(self.options, 'ordersLimit', 1000)
                self.orders = ArrayCacheBySymbolById(limit)
            cachedOrders = self.orders
            orders = self.safe_value(cachedOrders.hashmap, symbol, {})
            order = self.safe_value(orders, orderId)
            if order is not None:
                # todo add others to calculate average etc
                stopPrice = self.safe_value(order, 'stopPrice')
                if stopPrice is not None:
                    parsed['stopPrice'] = stopPrice
                if order['status'] == 'closed':
                    parsed['status'] = 'closed'
            cachedOrders.append(parsed)
            client.resolve(self.orders, messageHash)
            symbolSpecificMessageHash = messageHash + ':' + symbol
            client.resolve(self.orders, symbolSpecificMessageHash)

    async def watch_balance(self, params={}) -> Balances:
        """
        watch balance and get the amount of funds available for trading or funds locked in orders

        https://docs.kucoin.com/futures/#account-balance-events

        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `balance structure <https://docs.ccxt.com/#/?id=balance-structure>`
        """
        await self.load_markets()
        url = await self.negotiate(True)
        topic = '/contractAccount/wallet'
        request: dict = {
            'privateChannel': True,
        }
        subscription: dict = {
            'method': self.handle_balance_subscription,
        }
        messageHash = 'balance'
        return await self.subscribe(url, messageHash, topic, subscription, self.extend(request, params))

    def handle_balance(self, client: Client, message):
        #
        #    {
        #        "id": "6375553193027a0001f6566f",
        #        "type": "message",
        #        "topic": "/contractAccount/wallet",
        #        "userId": "613a896885d8660006151f01",
        #        "channelType": "private",
        #        "subject": "availableBalance.change",
        #        "data": {
        #            "currency": "USDT",
        #            "holdBalance": "0.0000000000",
        #            "availableBalance": "14.0350281903",
        #            "timestamp": "1668633905657"
        #        }
        #    }
        #
        data = self.safe_value(message, 'data', {})
        self.balance['info'] = data
        currencyId = self.safe_string(data, 'currency')
        code = self.safe_currency_code(currencyId)
        account = self.account()
        account['free'] = self.safe_string(data, 'availableBalance')
        account['used'] = self.safe_string(data, 'holdBalance')
        self.balance[code] = account
        self.balance = self.safe_balance(self.balance)
        client.resolve(self.balance, 'balance')

    def handle_balance_subscription(self, client: Client, message, subscription):
        self.spawn(self.fetch_balance_snapshot, client, message)

    async def fetch_balance_snapshot(self, client, message):
        await self.load_markets()
        self.check_required_credentials()
        messageHash = 'balance'
        selectedType = self.safe_string_2(self.options, 'watchBalance', 'defaultType', 'swap')  # spot, margin, main, funding, future, mining, trade, contract, pool
        params: dict = {
            'type': selectedType,
        }
        snapshot = await self.fetch_balance(params)
        #
        #    {
        #        "info": {
        #            "code": "200000",
        #            "data": {
        #                "accountEquity": 0.0350281903,
        #                "unrealisedPNL": 0,
        #                "marginBalance": 0.0350281903,
        #                "positionMargin": 0,
        #                "orderMargin": 0,
        #                "frozenFunds": 0,
        #                "availableBalance": 0.0350281903,
        #                "currency": "USDT"
        #            }
        #        },
        #        "timestamp": None,
        #        "datetime": None,
        #        "USDT": {
        #            "free": 0.0350281903,
        #            "used": 0,
        #            "total": 0.0350281903
        #        },
        #        "free": {
        #            "USDT": 0.0350281903
        #        },
        #        "used": {
        #            "USDT": 0
        #        },
        #        "total": {
        #            "USDT": 0.0350281903
        #        }
        #    }
        #
        keys = list(snapshot.keys())
        for i in range(0, len(keys)):
            code = keys[i]
            if code != 'free' and code != 'used' and code != 'total' and code != 'timestamp' and code != 'datetime' and code != 'info':
                self.balance[code] = snapshot[code]
        self.balance['info'] = self.safe_value(snapshot, 'info', {})
        client.resolve(self.balance, messageHash)

    def handle_subject(self, client: Client, message):
        #
        #    {
        #        "type": "message",
        #        "topic": "/contractMarket/level2:ADAUSDTM",
        #        "subject": "level2",
        #        "data": {
        #            "sequence": 1668059586457,
        #            "change": "0.34172,sell,456",  # type, side, quantity
        #            "timestamp": 1668573023223
        #        }
        #    }
        #
        subject = self.safe_string(message, 'subject')
        methods: dict = {
            'level2': self.handle_order_book,
            'ticker': self.handle_ticker,
            'candle.stick': self.handle_ohlcv,
            'tickerV2': self.handle_bid_ask,
            'availableBalance.change': self.handle_balance,
            'match': self.handle_trade,
            'orderChange': self.handle_order,
            'orderUpdated': self.handle_order,
            'position.change': self.handle_position,
            'position.settlement': self.handle_position,
            'position.adjustRiskLimit': self.handle_position,
        }
        method = self.safe_value(methods, subject)
        if method is not None:
            method(client, message)

    def get_message_hash(self, elementName: str, symbol: Str = None):
        # elementName can be 'ticker', 'bidask', ...
        if symbol is not None:
            return elementName + '@' + symbol
        else:
            return elementName + 's@all'

    def ping(self, client: Client):
        # kucoin does not support built-in ws protocol-level ping-pong
        # instead it requires a custom json-based text ping-pong
        # https://docs.kucoin.com/#ping
        id = str(self.request_id())
        return {
            'id': id,
            'type': 'ping',
        }

    def handle_pong(self, client: Client, message):
        # https://docs.kucoin.com/#ping
        client.lastPong = self.milliseconds()
        return message

    def handle_error_message(self, client: Client, message):
        #
        #    {
        #        "id": "64d8732c856851144bded10d",
        #        "type": "error",
        #        "code": 401,
        #        "data": "token is expired"
        #    }
        #
        data = self.safe_string(message, 'data', '')
        if data == 'token is expired':
            type = 'public'
            if client.url.find('connectId=private') >= 0:
                type = 'private'
            self.options['urls'][type] = None
        self.handle_errors(None, None, client.url, None, None, data, message, None, None)

    def handle_subscription_status(self, client: Client, message):
        #
        #     {
        #         "id": "1578090438322",
        #         "type": "ack"
        #     }
        #
        id = self.safe_string(message, 'id')
        if not (id in client.subscriptions):
            return
        subscriptionHash = self.safe_string(client.subscriptions, id)
        subscription = self.safe_value(client.subscriptions, subscriptionHash)
        del client.subscriptions[id]
        method = self.safe_value(subscription, 'method')
        if method is not None:
            method(client, message, subscription)
        isUnSub = self.safe_bool(subscription, 'unsubscribe', False)
        if isUnSub:
            messageHashes = self.safe_list(subscription, 'messageHashes', [])
            subMessageHashes = self.safe_list(subscription, 'subMessageHashes', [])
            for i in range(0, len(messageHashes)):
                messageHash = messageHashes[i]
                subHash = subMessageHashes[i]
                self.clean_unsubscription(client, subHash, messageHash)
            self.clean_cache(subscription)

    def handle_message(self, client: Client, message):
        type = self.safe_string(message, 'type')
        methods: dict = {
            # 'heartbeat': self.handleHeartbeat,
            'welcome': self.handle_system_status,
            'message': self.handle_subject,
            'pong': self.handle_pong,
            'error': self.handle_error_message,
            'ack': self.handle_subscription_status,
        }
        method = self.safe_value(methods, type)
        if method is not None:
            method(client, message)
