Websocket Trading on Bitfinex Using Python: Part 2 - Reducing Latency When Maintaining Multiple Data Streams

in #utopian-io5 years ago

Repository With Sample Code

https://github.com/imwatsi/crypto-market-samples
Find the complete Python script on GitHub: bitfinex_websocket_multi.py

My Profile On GitHub

You can find code used in this tutorial as well as other tools and examples in the GitHub repositories under my profile:

https://github.com/imwatsi

What Will I Learn

  • Subscribing to multiple ticker and candle channels
  • Maintaining ticker and candle state in memory
  • Reducing websocket latency using multithreading

Requirements

  • Python 3.6+
  • Dependencies:
    • websocket-client
  • Active internet connection

Difficulty

Intermediate

header_img

Tutorial Content

To follow this tutorial, you should already know how to setup a basic websocket connection to the Bitfinex API. You can learn how to do this by following the previous tutorial in this series.

In this tutorial, you will learn how to subscribe to multiple ticker and candle channels, and maintain their state in memory, with low latency. Python's multithreading feature helps reduce latency by allowing operations to be performed synchronously.

Latency bottleneck created by on_message() function

Using the websocket-client Python module to create a connection for fast data streams to Bitfinex's API introduces a potential bottleneck that can create latency. For strategies or scripts that subscribe to a large number of data streams, for example hundreds of order books or candles, the time it takes to receive and process updates needs to be as low as possible.

Since all updates are received on the on_message function and it is not asynchronous, it means that any delay in that function will delay the processing of other messages. This can cause an outdated state of market data to be maintained in memory.

In this tutorial, we explore various optimizations we can use to reduce latency and improve the performance of this function as part of a larger trading system.

Using new threads to process messages

We use two functions to handle the parsing of candle and ticker updates, and start them in a new thread each time they are called. Like so:

chan_id = data[0]
if chan_id in channels:
    if 'ticker' in channels[chan_id]:
        # if channel is for ticker
        if data[1] == 'hb':
            # Ignore heartbeat messages
            pass
        else:
            # parse ticker and save to memory
            Thread(target=update_tickers, args=(data,)).start()
    elif 'candles' in channels[chan_id]:
        # if channel is for candles
        if data[1] == 'hb':
            # Ignore heartbeat messages
            pass
        else:
            # parse candle update and save to memory
            Thread(target=update_candles, args=(data,)).start()

This means that the message function will only have to start the new thread and free itself up for more messages, while the process of parsing and other related processes can be managed by individual threads that all update the same variables (candles and tickers).

Below is code for the two functions used in the script for this tutorial:

update_tickers()

def update_tickers(data):
    global tickers
    sym = channels[data[0]][1]
    ticker_raw = data[1]
    ticker_parsed = {
        'bid': ticker_raw[0],
        'ask': ticker_raw[2],
        'last_price': ticker_raw[6],
        'volume': ticker_raw[7],
    }
    tickers[sym] = ticker_parsed

This function takes the input and parses ticker info from it, and then saves it to the tickers variable, with the symbol as a key.

update_candles()

def update_candles(data):
    global candles
    def truncate_market(str_data):
        # Get market symbol from channel key
        col1 = str_data.find(':t')
        res = str_data[col1+2:]
        return res
    def parse_candle(lst_data):
        # Get candle dictionary from list
        return {
            'mts': lst_data[0],
            'open': lst_data[1],
            'close': lst_data[2],
            'high': lst_data[3],
            'low': lst_data[4],
            'vol': lst_data[5]
        }

    market = truncate_market(channels[data[0]][1])
    # Identify snapshot (list=snapshot, int=update)
    if type(data[1][0]) is list: 
        lst_candles = []
        for raw_candle in data[1]:
            candle = parse_candle(raw_candle)
            lst_candles.append(candle)
        candles[market] = lst_candles
    elif type(data[1][0]) is int:
        raw_candle = data[1]
        lst_candles = candles[market]
        candle = parse_candle(raw_candle)
        if candle['mts'] == candles[market][0]['mts']:
            # Update latest candle
            lst_candles[0] = candle
            candles[market] = lst_candles
        elif candle['mts'] > candles[market][0]['mts']:
            # Insert new (latest) candle
            lst_candles.insert(0, candle)
            candles[market] = lst_candles

This function takes a message from the candles channel, parses it and does one of the following:

  • Initial Snapshot: save the whole list of candles
  • Update to latest candle: save the new values to memory
  • New candle addition: create a new candle entry and insert it in memory

Two nested functions, truncate_market and parse_candle help retrieve market symbol and parse individual candles to a format that is easy to work with.

Less code is desirable

Here is how the whole on_message() function looks in our tutorial's script:

def on_message(ws, message):
    global channels, balances, tickers
    data = json.loads(message)
    # Handle events
    if 'event' in data:
        if data['event'] == 'info':
            pass # ignore info messages
        elif data['event'] == 'auth':
            if data['status'] == 'OK':
                print('API authentication successful')
            else:
                print(data['status'])
        # Capture all subscribed channels
        elif data['event'] == 'subscribed':
            if data['channel'] == 'ticker':
                channels[data['chanId']] = [data['channel'], data['pair']]
            elif data['channel'] == 'candles':
                channels[data['chanId']] = [data['channel'], data['key']]
    # Handle channel data
    else:
        chan_id = data[0]
        if chan_id in channels:
            if 'ticker' in channels[chan_id]:
                # if channel is for ticker
                if data[1] == 'hb':
                    # Ignore heartbeat messages
                    pass
                else:
                    # parse ticker and save to memory
                    Thread(target=update_tickers, args=(data,)).start()
            elif 'candles' in channels[chan_id]:
                # if channel is for candles
                if data[1] == 'hb':
                    # Ignore heartbeat messages
                    pass
                else:
                    # parse candle update and save to memory
                    Thread(target=update_candles, args=(data,)).start()

By taking processes like parsing candles and tickers away from this function, the script can handle messages quickly. The general guideline here is to keep the code as lightweight as possible. Any other calculations such as technical analysis or strategy implementation should be in different threads, all referencing main variables (e.g. tickers and candles in this tutorial), that are updated frequently and in realtime by many different threads spawned off websocket messages.

You can further optimize the code in this tutorial by delegating most of the code found in the on_message() function to another function opened in a new thread of its own. Things like checking message contents for its type, extracting channel IDs, etc can be handled by that separate function. We did not implement that in this example, for simplicity's sake, but it is possible.

Subscribe to multiple ticker channels

The rest of the code that makes up this tutorial's script includes the following:

def on_open(ws):
    print('API connected')
    for sym in symbols:
        sub_tickers = {
            'event': 'subscribe',
            'channel': 'ticker',
            'symbol': sym
        }
        ws.send(json.dumps(sub_tickers))
        sub_candles = {
            'event': 'subscribe',
            'channel': 'candles',
            'key': 'trade:15m:t' + sym
        }
        ws.send(json.dumps(sub_candles))
    # start printing the books
    Thread(target=print_details).start()

This subscribes to tickers and symbols for all chosen symbols. As an example, the script attached to this tutorial gets a list of all symbols available on the Bitfinex exchange and then filters the results to take only USD pairs. The code for this is below:

# load USD tickers
res = requests.get("https://api.bitfinex.com/v1/symbols")
all_sym = json.loads(res.content)
for x in all_sym:
    if "usd" in x:
        symbols.append(x.upper())
print('Found (%s) USD symbols' %(len(symbols)))

The interactive tool

There is an interactive function running in the background, that you can use to query for a symbol's latest ticker info and candle data. Here's the code:

def print_details():
    # interactive function to view tickers and candles
    while len(tickers) == 0 or len(candles) == 0:
        # wait for tickers to populate
        time.sleep(1)
    print('Tickers and candles loaded. You may query a symbol now.')
    while True:
        symbol = input()
        symbol = symbol.upper()
        if symbol not in symbols:
            print('%s not in list of symbols.' %(symbol))
            continue
        details = tickers[symbol]
        print('%s:  Bid: %s, Ask: %s, Last Price: %s, Volume: %s'\
            %(symbol, details['bid'], details['ask'],\
            details['last_price'], details['volume']))
        print('%s: currently has (%s) candles, latest candle: %s'\
            %(symbol, len(candles[symbol]), str(candles[symbol][0])))

To initialize the script, this is added at the end:

# initialize api connection
connect_api()

Running the script

The script is interactive, because of the print_details() function. Once all data is loaded, you can type in a symbol name, e.g. BTCUSD and it will print the current ticker and latest candle data for it, as seen below.

screencast(1).gif

Using the interactive tool

  • After running the script, wait for it to print a statement saying Tickers and candles loaded. You may query a symbol now.
  • Type in a symbol name and press ENTER, e.g. btcusd (it is not case-sensitive) or ltcusd
  • The current ticker details as well as the latest candle will be printed

Find the complete Python script on GitHub: bitfinex_websocket_multi.py

Other Tutorials In The Series

Sort:  

Thank you for your contribution @imwatsi
After reviewing your contribution, we suggest you following points:

  • The tutorial is well structured and explained, however it is necessary to put more functionality into your contributions.

  • The GIF with the script results looks great.

  • Thank you for following some suggestions we put on your previous tutorial.

Looking forward to your upcoming tutorials.

Your contribution has been evaluated according to Utopian policies and guidelines, as well as a predefined set of questions pertaining to the category.

To view those questions and the relevant answers related to your post, click here.


Need help? Chat with us on Discord.

[utopian-moderator]

Thank you for the review.

Thank you for your review, @portugalcoin! Keep up the good work!

Hi @imwatsi!

Your post was upvoted by @steem-ua, new Steem dApp, using UserAuthority for algorithmic post curation!
Your post is eligible for our upvote, thanks to our collaboration with @utopian-io!
Feel free to join our @steem-ua Discord server

Hey, @imwatsi!

Thanks for contributing on Utopian.
We’re already looking forward to your next contribution!

Get higher incentives and support Utopian.io!
Simply set @utopian.pay as a 5% (or higher) payout beneficiary on your contribution post (via SteemPlus or Steeditor).

Want to chat? Join us on Discord https://discord.gg/h52nFrV.

Vote for Utopian Witness!

Coin Marketplace

STEEM 0.30
TRX 0.12
JST 0.034
BTC 63900.40
ETH 3140.82
USDT 1.00
SBD 3.98