An evening with Python, Multi-threading, STEEM and Beem

in #programming6 years ago (edited)

Imagine, you have a bot operates on STEEM blockchain. It listens all comments and if any comment includes "hey, upvote me" text, it upvotes the comment.

This looks like a pretty straight-forward requirement. Let's create a simple workflow:

  • Listen all new blocks
  • Filter comment operations
  • Check the comment includes "hey, upvote me"
  • Upvote comment
  • Repeat

Let's implement this workflow with the Beem library.

import logging

from beem.blockchain import Blockchain
from beem.steem import Steem
from beem.comment import Comment


TARGET_TEXT = "hey, upvote me"
BOT_ACCOUNT = "beemtutorials"
BOT_POSTING_KEY = "<posting_wif>"

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.basicConfig()


def listener(steem_instance):
    blockchain = Blockchain(steem_instance=steem_instance)
    for op in blockchain.stream(opNames=["comment"]):
        if TARGET_TEXT not in op.get("body"):
            # The bot is only interested in the with the TARGET_TEXT.
            continue

        comment = Comment(op, steem_instance=steem_instance)
        logger.info("Found a comment: %s" % comment.identifier)
        upvote(comment)


def upvote(comment):
    comment.vote(+50, account=BOT_ACCOUNT)
    logger.info("Upvoted: %s" % comment.identifier)


def main():
    s = Steem(
        node=["https://rpc.buildteam.io"],
        keys=[BOT_POSTING_KEY, ]
    )
    listener(s)


if __name__ == '__main__':
    main()

After running a while and commenting with hey, upvote me I see that output:

INFO:__main__:Found a comment: @emrebeyler/re-beemtutorials-re-test-case-for-beem-upvote-20180710t102831z-20180710t103044632z
INFO:__main__:Upvoted: @emrebeyler/re-beemtutorials-re-test-case-for-beem-upvote-20180710t102831z-20180710t103044632z

Works like a charm. But, what if I spam the bot with these comments from multiple accounts? Any educated guess?

Let's write a simple script to push more comments from different accounts at the same block:


from beem.steem import Steem
from beem.comment import Comment


TARGET_TEXT = "hey, upvote me"
BOT_ACCOUNT = "beemtutorials"
ACCOUNT_NAMES = ["beemtutorials", "espoemfacts"]
PRIVATE_POSTING_KEYS = [
    "<posting_wif_1>",
    "<posting_wif_2>"
]


def comment(steem_instance):
    main_comment = Comment(
        "@beemtutorials/test-case-for-beem-upvote",
        steem_instance=steem_instance)
    for account in ACCOUNT_NAMES:
        main_comment.reply("hey, upvote me please", author=account)


def main():
    s = Steem(
        node=["https://rpc.buildteam.io"],
        keys=PRIVATE_POSTING_KEYS,
    )
    comment(s)


if __name__ == '__main__':
    main()

See the block includes two comments, asking for upvote.

Let's see what happens if our main upvoter script sees two suitable comments to upvote in one block:


INFO:__main__:Found a comment: @beemtutorials/re-test-case-for-beem-upvote-20180710t104441z
INFO:__main__:Upvoted: @beemtutorials/re-test-case-for-beem-upvote-20180710t104441z
INFO:__main__:Found a comment: @espoemfacts/re-test-case-for-beem-upvote-20180710t104443z
Traceback (most recent call last):
  File "/Users/emre/Environments/beemtutorials/lib/python3.6/site-packages/beemapi/steemnoderpc.py", line 65, in rpcexec
    reply = super(SteemNodeRPC, self).rpcexec(payload)
  File "/Users/emre/Environments/beemtutorials/lib/python3.6/site-packages/beemapi/graphenerpc.py", line 401, in rpcexec
    raise RPCError(ret['error']['message'])
beemapi.exceptions.RPCError: 10 assert_exception: Assert Exception
elapsed_seconds >= STEEMIT_MIN_VOTE_INTERVAL_SEC: Can only vote once every 3 seconds.
    {}
    th_a  steem_evaluator.cpp:1180 do_apply

    {"o":{"voter":"beemtutorials","author":"espoemfacts","permlink":"re-test-case-for-beem-upvote-20180710t104443z","weight":5000}}
    th_a  steem_evaluator.cpp:1521 do_apply

    {"op":["vote",{"voter":"beemtutorials","author":"espoemfacts","permlink":"re-test-case-for-beem-upvote-20180710t104443z","weight":5000}]}
    th_a  database.cpp:2912 _apply_transaction

    {"trx":{"ref_block_num":65013,"ref_block_prefix":4070809687,"expiration":"2018-07-10T10:46:18","operations":[["vote",{"voter":"beemtutorials","author":"espoemfacts","permlink":"re-test-case-for-beem-upvote-20180710t104443z","weight":5000}]],"extensions":[],"signatures":["1f0c60bda9944bd68dc1a986a3276d7566eebad1af06d64362d9301adc3050f5ad4ac24e1b7295c434c49ed508a3afa1e5bda80f1340fb6495efef1c966d0b3d3f"]}}
    th_a  database.cpp:2916 _apply_transaction

    {"trx":{"ref_block_num":65013,"ref_block_prefix":4070809687,"expiration":"2018-07-10T10:46:18","operations":[["vote",{"voter":"beemtutorials","author":"espoemfacts","permlink":"re-test-case-for-beem-upvote-20180710t104443z","weight":5000}]],"extensions":[],"signatures":["1f0c60bda9944bd68dc1a986a3276d7566eebad1af06d64362d9301adc3050f5ad4ac24e1b7295c434c49ed508a3afa1e5bda80f1340fb6495efef1c966d0b3d3f"]}}
    th_a  database.cpp:672 push_transaction

    {"call.method":"call","call.params":["network_broadcast_api","broadcast_transaction",[{"expiration":"2018-07-10T10:46:18","ref_block_num":65013,"ref_block_prefix":4070809687,"operations":[["vote",{"voter":"beemtutorials","author":"espoemfacts","permlink":"re-test-case-for-beem-upvote-20180710t104443z","weight":5000}]],"extensions":[],"signatures":["1f0c60bda9944bd68dc1a986a3276d7566eebad1af06d64362d9301adc3050f5ad4ac24e1b7295c434c49ed508a3afa1e5bda80f1340fb6495efef1c966d0b3d3f"]}]]}
    th_a  websocket_api.cpp:124 on_message

We got an error from the RPC node that an account can vote once every 3 seconds.

So, what's the solution here? First thing comes into the mind that: Let's put a time.sleep(3) to the voter script, so we will not vote more than once in the 3 seconds window.

That may work well on most cases. However, it's not possible to do that on:

  • multi-thread, multi-process environments (time.sleep blocks only the thread/process runs the scripts.)

  • scripts requires io/cpu intensive work. Blocking the all process 3 seconds may cause lag on your
    script since you can't keep up with the new blocks.

If you're not aware of already, I have developed @abusereports for a bounty. It does a lot of checks like

  • listening every Vote operation
  • checking if the vote is a last minute vote or not
  • comment/flag the post if certain conditions matched.

Considering the amount of Vote operations, it was impossible to do this single threaded. I had to took advantage of multi threads.

Let's make our upvoter script multi-threaded:

import logging
import concurrent.futures

from beem.blockchain import Blockchain
from beem.steem import Steem
from beem.comment import Comment


TARGET_TEXT = "hey, upvote me"
BOT_ACCOUNT = "account_name"
BOT_POSTING_KEY = "<posting_wif>"

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO,
                    format='%(relativeCreated)6d %(threadName)s %(message)s')


def listener(steem_instance, thread_pool):
    blockchain = Blockchain(steem_instance=steem_instance)
    for op in blockchain.stream(opNames=["comment"]):
        if TARGET_TEXT not in op.get("body"):
            # The bot is only interested in the with the TARGET_TEXT.
            continue
        comment = Comment(op, steem_instance=steem_instance)
        logger.info("Found a comment: %s" % comment.identifier)
        thread_pool.submit(upvote, comment)


def upvote(comment):
    try:
        comment.vote(+1, account=BOT_ACCOUNT)
        logger.info("Upvoted: %s" % comment.identifier)
    except Exception as error:
        logger.error(error)


def main():
    s = Steem(
        node=["https://rpc.buildteam.io"],
        keys=[BOT_POSTING_KEY, ]
    )
    thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=4)
    listener(s, thread_pool)


if __name__ == '__main__':
    main()

I have used a simple thread pool and every time I find a suitable comment, delegated the upvote function to the threads.

It started working much faster now since I do not block the main thread (listener) now. However, time.sleep is not a solution anymore.

Every thread sleeps in their scope, multiple threads may try to upvote different comments in the 3 seconds timeframe. And errors comes up if the script see the same comment in one block.

Note that, it may be hard to simulate that behavior, so if the bot replies each comment after upvoting, it will be much easier to simulate since re-comment windows is 20 seconds. You may try to add a comment feature to make things easier if you try that at home.

Critical Section

Critical Section.

In concurrent programming, concurrent accesses to shared resources can lead to unexpected or erroneous behavior, so parts of the program where the shared resource is accessed are protected. This protected section is the critical section or critical region. It cannot be executed by more than one process at a time.

Our critical section in the script is the upvote function and threads should wait each other to use that resource. That way, a time.sleep(3) will work fine. Howewer, how can I guarentee that the multiple threads use the shared resource one thread at a time?

Mutual Exclusion with Semaphores

I will use a simple Semaphore which is a simple threading primitive. It's basically a counter. It's default value is 1. Every time a thread acquire the semaphore, it's value decrements to 0. So, other threads wait for its value to get back to 1 and continue with their runtime.

Let's implement a simple semaphore to our script:

import logging
import concurrent.futures
import threading
import time

from beem.blockchain import Blockchain
from beem.steem import Steem
from beem.comment import Comment


TARGET_TEXT = "hey, upvote me"
BOT_ACCOUNT = "account"
BOT_POSTING_KEY = "<posting_wif>"

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO,
                    format='%(created)f %(threadName)s %(message)s')

mutex = threading.Semaphore()


def listener(steem_instance, thread_pool):
    blockchain = Blockchain(steem_instance=steem_instance)
    for op in blockchain.stream(opNames=["comment"]):
        if TARGET_TEXT not in op.get("body"):
            # The bot is only interested in the with the TARGET_TEXT.
            continue
        comment = Comment(op, steem_instance=steem_instance)
        logger.info("Found a comment: %s" % comment.identifier)
        thread_pool.submit(upvote, comment)


def upvote(comment):
    global mutex

    mutex.acquire()
    logger.info('Upvote Mutex acquired.')

    try:
        comment.vote(+1, account=BOT_ACCOUNT)
        logger.info("Upvoted: %s. Sleeping for 3 seconds." % comment.identifier)
        time.sleep(3)
        mutex.release()
        logger.info("Mutex released")
    except Exception as error:
        logger.error(error)
        mutex.release()
        logger.info("Mutex released")



def main():
    s = Steem(
        node=["https://rpc.buildteam.io"],
        keys=[BOT_POSTING_KEY, ]
    )
    thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=4)
    listener(s, thread_pool)


if __name__ == '__main__':
    main()

Output:

1531223276.082151 MainThread Found a comment: @beemtutorials/re-test-case-for-beem-upvote-20180710t114656z
1531223276.082833 ThreadPoolExecutor-0_0 Upvote Mutex acquired.
1531223277.025912 MainThread Found a comment: @espoemfacts/re-test-case-for-beem-upvote-20180710t114658z
1531223277.763806 ThreadPoolExecutor-0_0 Upvoted: @beemtutorials/re-test-case-for-beem-upvote-20180710t114656z. Sleeping for 3 seconds.
1531223277.976112 MainThread Found a comment: @emrebeyler/re-emrebeyler-re-emrebeyler-re-turbot-beem-tutorials-20180710t114702800z
1531223280.766959 ThreadPoolExecutor-0_0 Mutex released
1531223280.767166 ThreadPoolExecutor-0_1 Upvote Mutex acquired.
1531223282.574555 ThreadPoolExecutor-0_1 Upvoted: @espoemfacts/re-test-case-for-beem-upvote-20180710t114658z. Sleeping for 3 seconds.
1531223285.579141 ThreadPoolExecutor-0_1 Mutex released
1531223285.579317 ThreadPoolExecutor-0_2 Upvote Mutex acquired.
1531223287.070003 ThreadPoolExecutor-0_2 Upvoted: @emrebeyler/re-emrebeyler-re-emrebeyler-re-turbot-beem-tutorials-20180710t114702800z. Sleeping for 3 seconds.
1531223290.071216 ThreadPoolExecutor-0_2 Mutex released

Cool stuff. Threads waiting each other to use the critical resource. Just make sure, you have released the acquired semaphore no matter what. (try/except everything!) otherwise you may come up with a deadlock :).

Sort:  

This also seems like a good place where something simpler than a spin lock could be useful: a simple queue.

After all, the posting of comments is a single conceptual entity that must remain sequential because of the wait. There is no need to parallelize it because it can't function in parallel and it doesn't involve any processing which would take longer than three real-world seconds.

So instead of having things spin on a mutex, it's conceptually simpler to have the streaming monitor (which does do enough computation to require its own thread) be able to throw comment IDs up on a "blackboard" list from which a relatively straightforward comment poster thread function takes the first, up votes, makes its log entry, waits three seconds, and then does it all over again forever. If the blackboard is empty, do nothing and go straight to the wait.

This as the main advantage of not requiring a thread pool at all with a predefined number of workers. The maximum amount of concurrency this flow can possibly have is two threads in a reasonable sense so there's no real sense in having more than that.

(I'm pretty sure that concurrent has some built-in magic for shared variables probably using a mutex under the hood, but there's no need to reimplement the wheel, really. Then again, I may have been spoiled by years of writing in Erlang for concurrency, where passing information between independent threads is built into the base of the language via a straightforward message passing system.)

I'm pretty sure there are libraries to implement a similar messaging system in Python.

It's basically the same thing as a web spider, and I'm pretty sure I heard it mentioned when reading a tutorial on writing web spiders. They might have even mentioned Erlang...but I can't remember for sure. Maybe I should try implementing one of these...but the only use cases I have in mind are a bit large of an undertaking.

Every large problem is an accumulation of interesting small problems, especially if you are looking for something to write for an audience.

Why not think about some sort of more ambitious undertaking and show how it's done?

Wow, you are fast! This looks really cool, thanks a lot!

haha, nice read, been there as well :) I must admit I took the quick & dirty way with

while retries < 3:
  try:
    c.vote()
  except:
    time.sleep(3)
    retries -= 1

mutexes is definitely the cleaner solution!

I had to create a mix :) (recursion on retry mechanism, though.)

Screen Shot 2018-07-10 at 8.36.20 PM.png

Thanks for sharing this knowledge and taking us through problems and solutions!

You probably should have come up with a better example than a spam comment. XD

Coin Marketplace

STEEM 0.29
TRX 0.12
JST 0.033
BTC 62559.43
ETH 3092.10
USDT 1.00
SBD 3.86