Joe Armstrong, creator of Erlang language
Rob Pike, Concurrency is not parallelism
"Classical" approach: threads, semaphors, locks:
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
"""Run thread loop."""
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as exp:
print_exc(exp)
self.tasks.task_done()
from Queue import Queue
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in xrange(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
What about shared resources, hah that's easy use locks, e.g.
lock = threading.Lock()
def get_first_part():
lock.acquire()
try:
... fetch data for first part from shared object
finally:
lock.release()
return data
def get_second_part():
lock.acquire()
try:
... fetch data for second part from shared object
finally:
lock.release()
return data
The standard lock object doesn’t care which thread is currently holding the
lock; if the lock is held, any thread that attempts to acquire the lock will
block, even if the same thread is already holding the lock. Details
here
def get_both_parts():
first = get_first_part()
second = get_second_part()
return first, second
Are you sure it works? Some other thread modifies the resource between the
two calls, we may end up with inconsistent data
def get_both_parts():
lock.acquire()
try:
first = get_first_part()
second = get_second_part()
finally:
lock.release()
return first, second
However, this won’t work; the individual access functions will get stuck,
because the outer function already holds the lock. To work around this,
you can add flags to the access functions that enables the outer function
to disable locking, but this is error-prone, and can quickly get out of hand.
Solution is to use
re-entrant lock
Everything that can go wrong will go wrong (c) Murphy
Chris Jones
Apache vs Yaws (Erlang) (2002)
Thoughput (KBytes/second) vs. load. Apache (blue, run on NFS) and (green, run on local filesystem) dies at 4k parallel requests, Yaws (red, run on NFS) stays alive even at 80k parallel requests.
Joe Armstrong, Concurency is easy
-module(myserver).
server(Data) ->
receive
{From,{request,X}} ->
{R, Data1} = fn(X, Data),
From ! {myserver,{reply, R}},
server(Data1)
end.
-export([request/1]).
request(Req) ->
myserver ! {self(),{request,Req}},
receive
{myserver,{reply,Rep}} ->
Rep
end.
widget w; // (a)
widget w(); // (b)
widget w{}; // (c)
widget w(x); // (d)
widget w{x}; // (e)
widget w = x; // (f)
widget w = {x}; // (g)
auto w = x; // (h)
auto w = widget{x}; // (i)
def gen():
for i in range(0,5): yield i
a = [i for i in gen()]
a = (i for i in gen())
a = 1
Type of solution Time taken (sec) Python (estimate) 1500.0 Python + Psyco (estimate) 1138.0 Python + NumPy Expression 29.3 Blitz 9.5 Inline 4.3 Fast Inline 2.3 Python/Fortran 2.9 Pyrex 2.5 Matlab (estimate) 29.0 Octave (estimate) 60.0 Pure C++ 2.16NumPy/SciPy project
package main
import "fmt"
import "time"
import "math/rand"
func boring(msg string, c chan string) {
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i) // Expression to be sent
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
func main() {
c := make(chan string) // create output channel
go boring("boring!", c) // submit go routine, a la thread
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c) // Receive msg and print it
}
fmt.Println("You're boring; I'm leaving.")
}
code snippets from Rob Pike
concurrency slides
package main
import "fmt"
import "time"
import "math/rand"
func boring(msg string) <-chan string { // Returns receive-only channel
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
func multiplexer(input1, input2 <-chan string) <-chan string {
c := make(chan string) // output channel
go func() {
for {
select {
case s := <-input1: c <- s
case s := <-input2: c <- s
}
}
}()
return c
}
func main() {
c := multiplexer(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring; I'm leaving.")
}
No locks. No condition variables. No callbacks.
Code snippets from Rob Pike
concurrency slides
process_urls(L) ->
process_urls(L, []).
process_urls([Url|T], Results) ->
Id = urlfetch_uuid:new(),
Payload = "",
Headers = "",
spawn(fetch, [Id, Url, get, Payload, Headers]),
Output = get_result(Id),
process_urls(T, Results++[Output]);
process_urls([], Results) -> Results.
Spawn processes with fetch function through the recursion.
func RequestHandler(w http.ResponseWriter, r *http.Request) {
urls := parse_input_request(r)
ch := make(chan []byte)
for _, url := range urls {
go Fetch(url, ch)
}
for i:=0; i < len(urls); i++ {
w.Write(<-ch)
w.Write([]byte("\n"))
}
}
Submit go-routines and collect results via channel
import pycurl
import StringIO
def getdata(urls, ckey, cert, headers=None, num_conn=100):
# Make a queue with urls
queue = [u for u in urls if validate_url(u)]
# Check args
num_urls = len(queue)
num_conn = min(num_conn, num_urls)
# Pre-allocate a list of curl objects
mcurl = pycurl.CurlMulti()
mcurl.handles = []
for _ in range(num_conn):
curl = pycurl.Curl()
curl.fp = None
curl.setopt(pycurl.FOLLOWLOCATION, 1)
curl.setopt(pycurl.MAXREDIRS, 5)
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
curl.setopt(pycurl.TIMEOUT, 300)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.SSLKEY, ckey)
curl.setopt(pycurl.SSLCERT, cert)
curl.setopt(pycurl.SSL_VERIFYPEER, False)
mcurl.handles.append(curl)
if headers:
curl.setopt(pycurl.HTTPHEADER, \
["%s: %s" % (k, v) for k, v in headers.iteritems()])
# Main loop
freelist = mcurl.handles[:]
num_processed = 0
while num_processed < num_urls:
# If there is an url to process and a free curl object,
# add to multi-stack
while queue and freelist:
url = queue.pop(0)
curl = freelist.pop()
curl.setopt(pycurl.URL, url.encode('ascii', 'ignore'))
bbuf = StringIO.StringIO()
hbuf = StringIO.StringIO()
curl.setopt(pycurl.WRITEFUNCTION, bbuf.write)
curl.setopt(pycurl.HEADERFUNCTION, hbuf.write)
mcurl.add_handle(curl)
# store some info
curl.hbuf = hbuf
curl.bbuf = bbuf
curl.url = url
# Run the internal curl state machine for the multi stack
while 1:
ret, _num_handles = mcurl.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
# Check for curl objects which have terminated, and add them to the
# freelist
while 1:
num_q, ok_list, err_list = mcurl.info_read()
for curl in ok_list:
hdrs = curl.hbuf.getvalue()
data = curl.bbuf.getvalue()
url = curl.url
curl.bbuf.flush()
curl.bbuf.close()
curl.hbuf.close()
curl.hbuf = None
curl.bbuf = None
mcurl.remove_handle(curl)
freelist.append(curl)
yield {'url': url, 'data': data, 'headers': hdrs}
for curl, errno, errmsg in err_list:
hdrs = curl.hbuf.getvalue()
data = curl.bbuf.getvalue()
url = curl.url
curl.bbuf.flush()
curl.bbuf.close()
curl.hbuf.close()
curl.hbuf = None
curl.bbuf = None
mcurl.remove_handle(curl)
freelist.append(curl)
yield {'url': url, 'data': None, 'headers': hdrs,
'error': errmsg, 'code': errno}
num_processed = num_processed + len(ok_list) + len(err_list)
if num_q == 0:
break
# Currently no more I/O is pending, could do something in the meantime
# (display a progress bar, etc.).
# We just call select() to sleep until some more data is available.
mcurl.select(1.0)
cleanup(mcurl)
def cleanup(mcurl):
"Clean-up MultiCurl handles"
for curl in mcurl.handles:
if curl.hbuf is not None:
curl.hbuf.close()
curl.hbuf = None
if curl.bbuf is not None:
curl.bbuf.close()
curl.bbuf = None
curl.close()
mcurl.close()
Multiple nested loops via third-party library