Concurrent.futures and Class object

1

I'm trying to create a class (Processor) that performs a series of functions to get results from a State-to-State API. (STATE)

1- READ DATA FROM A TABLE. 2- SEND request for the API to process.
3- WAIT jobs to be complete. 4- DOWNLOAD the results when complete.

I would like to implement concurrency so that you can run to multiple states at the same time. I was trying something similar to what Luciano Ramalho implements with the concurrent.futures lib in Fluent Python. (Chapter 17. Concurrency with Futures) but I can not compete

ERROR:
    res = executor.map(run_single(), sorted(states))
    TypeError: run_single() missing 1 required positional argument: 'state'

But when I include state in run single it only runs sequentially state by state.

I'm using python 3.5. ~ Part of my code below - Thankful for any guidance.

        import datetime
        import os
        import time
        import xml.etree.ElementTree as ET
        import psycopg2
        import psycopg2.extras
        import requests
        from concurrent import futures


        class Processor(object):    
            database = 'db'
            user = 'user'
            password = 'password'

            def __init__(self, state):
                self.base_url = 'api.com'
                self.state = state
                self.status = None
                self.fetch_size = 1000000
                self.job_id = ''

                self.send_requests(self.state)

            def send_requests(self, state):
                payload = dict(params)

                # connection to postgres db table , fetch data.
                conn = psycopg2.connect(
                    "dbname='%s' user='%s' host='host' password='%s'" % (database, user, password))
                cursor = conn.cursor('%s' % state, cursor_factory=psycopg2.extras.DictCursor)
                sql = ("select * from table where state='%s' limit 1" % state)
                cursor.execute(sql)

                try:
                    # function to build/send requests fetching data by chunks of fetch_size limited.
                    while True:
                        fetchs = cursor.fetchmany(self.fetch_size)
                        if len(fetchs) != 0:
                            chunk = ''
                            for fetch in fetchs:
                                try:
                                    row = fetch[0] + '|' + fetch[1] + '|' + fetch[2] + '\n'
                                    chunk += row
                                except:
                                    print('>ERROR ->', fetch[0])
                                    pass
                            header = 'header\n'
                            row = requests.post(self.base_url, params=payload, data=header + chunk)
                            response = row.text
                            print('-> %s: response job_xml: %s' % (state, response))
                            root = ET.fromstring(response)
                            self.job_id = root.find('Response/MetaInfo/RequestId').text
                            print('-> %s: response job_id: %s' % (state, self.job_id))
                            self.check_jobs(state)
                        else:
                            break
                except Exception as e:
                    print(e)
                    pass

                # Function checking the status of the job_id if completed download() the results if not wait and retry.
            def check_jobs(self, state):
                print('->>> %s: Checking job %s  <<<-' % (state, self.job_id))
                status = self.get_status(self.job_id)
                if status == 'completed':
                    print('-> %s: status: %s, job_id: %s  ' % (state, status, self.job_id))
                    self.download_results(self.job_id)
                else:
                    time.sleep(4)  # 480 large million requests
                    self.check_jobs(state)

                # Function to return status of job_id
            def get_status(self, job_id):
                url_status = 'url that get status of job_id'
                req_status = requests.get(url_status)
                root = ET.fromstring(req_status.text)
                status = root.find('Response/Status').text
                return status

                # Function download the results
            def download_results(self, job_id):
                url_download = 'url to download job_id'
                print('-> %s: downloading jod_id: %s @ URL [%s]' % (self.state, job_id, url_download))
                r = requests.get(url_download, stream=True)

                # create folder for state if not exists
                download = os.path.join(self.responses_folder, self.state)
                if not os.path.exists(download):
                    os.makedirs(download)

                # Save result to folder
                save_as = os.path.join(download, str(job_id + '.zip'))
                with open(save_as, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=1024):
                        if chunk:
                            f.write(chunk)

                print('-> %s: downloaded job_id: %s @ folder [ %s ] ' % (self.state, job_id, save_as))
                self.delete_results(job_id)


if __name__ == "__main__":
    states = ['AK', 'AL', 'AR']
    workers = 20

    def run_single(state):
        Processor(state)

    for state in states:
        with futures.ThreadPoolExecutor(workers) as executor:
            res = executor.map(run_single(), sorted(states))
    
asked by anonymous 02.10.2016 / 23:30

1 answer

1

Your problem is that when creating a task in your Executor you are calling the function, when you use the expression run_single() . In this case the question is not very well placed, or has fragments of your code that do not allow you to see the problem itself - they are partesso, the program would call the function and the return value of it would be passed to the call to the Executor .map - and here comes your error: the function requires a state parameter, which is not passed.

What you should do is pass the function itself as a parameter to the Executor - which then takes care of calling it within a separate thread. For this, you should not place parentheses after the function name (which in Python causes it to be treated like any other object and simply passed as a parameter).

In short, just rewrite your executor's calling line as:

res = executor.map(run_single, states)

Notice that I removed sorted too - the executor can not guarantee the order in which each of the tasks will be processed, then try to sort the entry list can only give you the false impression that the tasks would be performed in some specific order. It is best to make it clear that this order is arbitrary.

(The class name is also wrong inside run_single - must be Processor , not Geocode )

And finally, you are creating a new connection to the bank for each task you are running - this is usually not a good practice - creating a connection to the bank is a relatively costly task - it is best to use some kind of pool of connections, or just use a global connection and create a cursor from within your class.

    
05.10.2016 / 01:37