Multiprocessing in Python: with Windows machine

# -*- coding: utf-8 -*-
"""
Created on Tue Apr 17 08:53:57 2018
@author: swagat
"""
from concurrent.futures import ProcessPoolExecutor
import nltk
from nltk.corpus import inaugural
from nltk.corpus import stopwords
#filter out all punctuations and non-alphabetic characters
def filterPunctAndStopwords(word_list,funct):
if len(word_list) == 0:
return []
filtered_list=[]
if funct == "punct":
filtered_list= [w for w in word_list if w.isalnum()]
elif funct == "stopwords":
if type(word_list[0]) == tuple:
filtered_list = [(w1,w2) for (w1,w2) in word_list if not w1 in list(stopwords.words('english')) and not w2 in list(stopwords.words('english')) ]
else:
filtered_list = [w for w in word_list if not w in list(stopwords.words('english'))]
elif funct == "both":
if type(word_list[0]) == tuple:
filtered_list = [(w1,w2) for (w1,w2) in word_list if w1.isalnum() and w2.isalnum() and not w1 in list(stopwords.words('english')) and not w2 in list(stopwords.words('english')) ]
else:
filtered_list = [w for w in word_list if w.isalnum() and not w in list(stopwords.words('english')) ]
return filtered_list
def calc_freq_dist(text,size=3):
"""
Extract unigrams and two-word tuples in the windows and create a
FrequencyDist dictionary for both (and returns them in a list)
"""
print(size)
unigrams = []
tuples = []
# Scan over windows of the appropriate size.
for center in range(size, len(text)-size):
# enter the coocurrence (center word and each of all other words) in the dictionary
wunis = set()
wtuples = set() # for tuples in this context; set is to count only once
thisword = text[center]
# iterate though the test of the window
for i in range(1, size+1): # i starts from 1 (center +/- i)
nextleft = text[center-i]
nextright = text[center+i]
# add them next word in this window's unigram set
wunis.add(nextleft)
wunis.add(nextright)
# create the next left tuple
if not thisword == nextleft:
if thisword < nextleft:
tup = (thisword,nextleft)
else:
tup = (nextleft,thisword)
# and add it in this window's tuple set
wtuples.add(tup) #
# create the next right tuple
if not thisword == nextright:
if thisword < nextright:
tup = (thisword,nextright)
else:
tup = (nextright,thisword)
# and add it in this window's tuple set
wtuples.add(tup) #
# add all unigrams in the text tuples list
for wuni in wunis:
unigrams.append(wuni)
# add all tuples in the text tuples list
for wtup in wtuples:
tuples.append(wtup)
# unigrams = [w for w in unigrams if len(w) > 2]
#unigrams = filterPunctAndStopwords(unigrams,"stopwords")
#import pdb;pdb.set_trace()
#tuples = [(w1,w2) for (w1,w2) in tuples if len(w1) >2 and len(w2) >2]
#tuples = filterPunctAndStopwords(tuples,"stopwords")
if(i%100 == 0):
print("Processed %d records.........." %(i))
unigrams = [w for w in unigrams if len(w) > 2]
unigrams = filterPunctAndStopwords(unigrams,"stopwords")
#import pdb;pdb.set_trace()
tuples = [(w1,w2) for (w1,w2) in tuples if len(w1) >2 and len(w2) >2]
tuples = filterPunctAndStopwords(tuples,"stopwords")
# create a frequency dictionary from unigrams and tuples
ufd = nltk.FreqDist(unigrams)
cfd = nltk.FreqDist(tuples)
# and return the dictionaries in a list
return [ufd, cfd]
def main():
#Divide the corpus in pre-post WWI
preWWIFileIds=[]
postWWIFileIds=[]
for filename in inaugural.fileids():
if filename.split('-')[0] <= '1913':
preWWIFileIds.append(filename)
elif filename.split('-')[0] >'1917':
postWWIFileIds.append(filename)
print("Items in preWWI corpus = %d\nItems in postWWI corpus = %d" %(len(preWWIFileIds),len(postWWIFileIds)))
#creating unigrams out of preWWI text
#unigrams_preWWI = nltk.wordpunct_tokenize(preWWIText)
unigrams_preWWI = [w.lower() for w in inaugural.words(preWWIFileIds)]
#Creating unigrams out of postWWI text
#unigrams_postWWI = nltk.wordpunct_tokenize(postWWIText)
unigrams_postWWI = [w.lower() for w in inaugural.words(postWWIFileIds)]
print("len of preWWI tokens = %d\nlen of postWWI tokens=%d\n" %(len(unigrams_preWWI),len(unigrams_postWWI)))
list_text = []
step = 1000
sliding_win_size = 3
all_token_list = unigrams_postWWI[:10000]
for ind in range(0,len(all_token_list),step):
start_ind = ind -sliding_win_size if ind > 0 else ind
end_ind = ind + step + sliding_win_size if ind +step+sliding_win_size < len(all_token_list) else len(all_token_list)
list_text.append(all_token_list[start_ind:end_ind])
print("Size of partitions = %d" %(len(list_text)))
postWWI_unigrams_fd = []
postWWI_bigrams_fd = []
#sliding_win_size = 3
#list_text = [unigrams_postWWI[0:100],unigrams_postWWI[97:203]]
with ProcessPoolExecutor(5) as executor:
for token_list, token_fds in zip(list_text, executor.map(calc_freq_dist, list_text,[sliding_win_size]*len(list_text))):
print('Len of tokens processed = %d\nLen of unigrams =%d\nLen of tuples = %d' % (len(token_list), len(token_fds[0]),len(token_fds[1])))
postWWI_unigrams_fd.extend(token_fds[0])
postWWI_bigrams_fd.extend(token_fds[1])
print('Len of unigrams processed = %d\nLen of bigrams =%d' % (len(postWWI_unigrams_fd), len(postWWI_bigrams_fd)))
if __name__ == '__main__':
main()
view raw PMI_parallel.py hosted with ❤ by GitHub

Multiprocessing generally involves two approaches: via threads or via multiple processes. Although multiple processes come with an overhead of combining the results from various processes; it makes better utilization of multiple CPU cores which is commonplace these days. There are 3 main elements to the execution of this concept: dividing the workload in logical chunks, passing the chunks to individual processes, and compiling the output from each of those processes.

There are various libraries available in python for achieving real parallel programming, one popular choice being multiprocessing. There are various blog posts available on the internet describing the nitty-gritty of working with this library (I have listed a couple in the references section). However, this post explores two other choices we can easily use to achieve the same.

Concurrent.futures

This module provides an interface for asynchronously executing callables. A good thing is that it comes packaged with the standard Python Library, no need to install anything. To achieve true process parallelization, this module provides an implementation through the class ProcessPoolExecutor.
It accepts the no. of parallel processes to be forked as an argument. If not given, this will default to the no. of logical processes available for all the cores present (For my quad-core machine, this translates to 8 processes). In the newer version of this module, few other arguments have been added e.g. mp_context (multiprocessing context), initializer and initargs to provide greater control to the execution and initialization of various processes. These, however, are not covered in the current post.

I have used the above-mentioned class in this Multiprocessing Example Code. This is a simple program which extracts unigrams and two-word tuples from a given text and creates a frequency distribution out of it. The magic of parallel processing though happens in this piece:

 with ProcessPoolExecutor(5)  as executor:  
     for token_list, token_fds in zip(list_text, executor.map(calc_freq_dist, list_text,[sliding_win_size]*len(list_text))):  
       print('Len of tokens processed = %d\nLen of unigrams =%d\nLen of tuples = %d' % (len(token_list), len(token_fds[0]),len(token_fds[1])))  
       postWWI_unigrams_fd.extend(token_fds[0])  
       postWWI_bigrams_fd.extend(token_fds[1])  

Here, we first initialize the ProcessPoolExecutor instance with 5 workers/processes and with that pool of workers we run a loop where we pass on the workload to individual workers through this statement

zip(list_text, executor.map(calc_freq_dist, list_text,[sliding_win_size]*len(list_text)))

list_text is nothing but the list of text segments which needs to be processed (workload division) and calc_freq_dist is the function that does all the work. The function that passes the workload to individual processes is executor.map. First argument here is the function name (calc_freq_dist) and rest of the arguments are nothing but the input needed for callable. I used the zipping function to pass the individual elements of list_text to the individual process. Lastly, the compilation of output happens inside the for loop where I'm just adding the output to two separate lists in the order that it is received.

Parallel Python(pp)

This is another module built to support parallel processing on SMP (systems with multiple cores) or in a clustered environment. It gives many advantages over the module concurrent.futures, which are listed in the home page of the module.
This, however, requires installation; instructions and downloadables for which can be found through the home page. Due to portability issues with python3, the authors have provided a separate zip file to deal with that and the instructions can be found here.

I have created a sample program to append two columns of a dataset here. The snippet that uses pp for parallelization is given below:

 #parallel processing code with pp  
 ppservers = () #this is mainly used in clustered env and names of the nodes are given here.  
         #used here for demonstration purposes only  
 job_server = pp.Server(ncpus =5,ppservers=ppservers) #start the job with 5 workers  
 print ("Starting pp with %d workers" %( job_server.get_ncpus()))  
 start_time = time.time()  
 x_all = []  
 #input load division  
 inputs = (data[0:60000],data[60000:120000],data[120000:180000],data[180000:240000],data[240000:300000],data[300000:360000],data[360000:len(data)])  
 #job submission  
 jobs = [(input_x,job_server.submit(test_func,(input_x,),(None,),("pandas",))) for input_x in inputs]  
 #result accumulation  
 for input_x,job in jobs:  
   x_all.append(job())  
 len(x_all)  
 print ("Time elapsed: %f s" %(time.time() - start_time))  
 job_server.print_stats()  

To register the processes, we call the function pp.Server with no. of processes (ncpus). Input load is divided with a simple list creation. When submitting the job we need to call the function  submit with the following parameters:
  • callable: function name (test_func in this case)
  • input: input_x (element of the list of list of inputs)
  • depfuncs: Functions internally called by the "callable" (in this case None)
  • modules: Module to be imported for the function to work (in this case "pandas")
Finally, we accumulate the results by calling the function job().  The result of this code snippet looks like this:

 Time elapsed: 24.610267 s  
 Job execution statistics:  
  job count | % of all jobs | job time sum | time per job | job server  
      7 |    100.00 |   27.4447 |   3.920671 | local  
 Time elapsed since server creation 45.127434968948364  
 0 active tasks, 5 cores  

Thanks for reading!

References

Multiprocessing

  • https://sebastianraschka.com/Articles/2014_multiprocessing.html
  • https://www.ploggingdev.com/2017/01/multiprocessing-and-multithreading-in-python-3

Concurrent.futures


  • https://docs.python.org/3/library/concurrent.futures.html

Comments

Popular posts from this blog

HBase Setup: Standalone Mode on windows machine

Plotting Choropleths with Shapefiles in R- ggplot2 tutorial

Self-Organizing Maps: An interesting Neural Network