I'm trying to create a class (Processor) that performs a series of functions to get results from a State-to-State API. (STATE)
1- READ DATA FROM A TABLE.
2- SEND request for the API to process.
3- WAIT jobs to be complete.
4- DOWNLOAD the results when complete.
I would like to implement concurrency so that you can run to multiple states at the same time. I was trying something similar to what Luciano Ramalho implements with the concurrent.futures lib in Fluent Python. (Chapter 17. Concurrency with Futures) but I can not compete
ERROR:
res = executor.map(run_single(), sorted(states))
TypeError: run_single() missing 1 required positional argument: 'state'
But when I include state in run single it only runs sequentially state by state.
I'm using python 3.5. ~ Part of my code below - Thankful for any guidance.
import datetime
import os
import time
import xml.etree.ElementTree as ET
import psycopg2
import psycopg2.extras
import requests
from concurrent import futures
class Processor(object):
database = 'db'
user = 'user'
password = 'password'
def __init__(self, state):
self.base_url = 'api.com'
self.state = state
self.status = None
self.fetch_size = 1000000
self.job_id = ''
self.send_requests(self.state)
def send_requests(self, state):
payload = dict(params)
# connection to postgres db table , fetch data.
conn = psycopg2.connect(
"dbname='%s' user='%s' host='host' password='%s'" % (database, user, password))
cursor = conn.cursor('%s' % state, cursor_factory=psycopg2.extras.DictCursor)
sql = ("select * from table where state='%s' limit 1" % state)
cursor.execute(sql)
try:
# function to build/send requests fetching data by chunks of fetch_size limited.
while True:
fetchs = cursor.fetchmany(self.fetch_size)
if len(fetchs) != 0:
chunk = ''
for fetch in fetchs:
try:
row = fetch[0] + '|' + fetch[1] + '|' + fetch[2] + '\n'
chunk += row
except:
print('>ERROR ->', fetch[0])
pass
header = 'header\n'
row = requests.post(self.base_url, params=payload, data=header + chunk)
response = row.text
print('-> %s: response job_xml: %s' % (state, response))
root = ET.fromstring(response)
self.job_id = root.find('Response/MetaInfo/RequestId').text
print('-> %s: response job_id: %s' % (state, self.job_id))
self.check_jobs(state)
else:
break
except Exception as e:
print(e)
pass
# Function checking the status of the job_id if completed download() the results if not wait and retry.
def check_jobs(self, state):
print('->>> %s: Checking job %s <<<-' % (state, self.job_id))
status = self.get_status(self.job_id)
if status == 'completed':
print('-> %s: status: %s, job_id: %s ' % (state, status, self.job_id))
self.download_results(self.job_id)
else:
time.sleep(4) # 480 large million requests
self.check_jobs(state)
# Function to return status of job_id
def get_status(self, job_id):
url_status = 'url that get status of job_id'
req_status = requests.get(url_status)
root = ET.fromstring(req_status.text)
status = root.find('Response/Status').text
return status
# Function download the results
def download_results(self, job_id):
url_download = 'url to download job_id'
print('-> %s: downloading jod_id: %s @ URL [%s]' % (self.state, job_id, url_download))
r = requests.get(url_download, stream=True)
# create folder for state if not exists
download = os.path.join(self.responses_folder, self.state)
if not os.path.exists(download):
os.makedirs(download)
# Save result to folder
save_as = os.path.join(download, str(job_id + '.zip'))
with open(save_as, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
print('-> %s: downloaded job_id: %s @ folder [ %s ] ' % (self.state, job_id, save_as))
self.delete_results(job_id)
if __name__ == "__main__":
states = ['AK', 'AL', 'AR']
workers = 20
def run_single(state):
Processor(state)
for state in states:
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(run_single(), sorted(states))