Sometimes it's easy to forget how powerful parallel and concurrent programming is and yet whenever we can come up with a way to parallelize an algorithm we are working on, speed improvements may be truly incredible on modern multi-core machines.
In the land of imperative languages, making things run in parallel may get tricky very quickly: you need to be careful about mutating data so only one thread at a time commits a change. The tool that should help you with that is called a lock and there are many ways it can go wrong leading to deadlocks and surprising behaviours that are difficult to reproduce and debug. Also, it may not be easy to distribute workload properly. Even if you think you're dividing your data evenly between threads, your program may not get the perfect speed-up because some threads may finish before the others simply because computations with different data sets may take different amounts of time.
Parallel and concurrent programming is much easier in Haskell:
-
it's pure, which means that there are no mutations to observe and all data can be shared between threads freely;
-
it supports modern techniques like STM (among many other options for parallel and concurrent programming);
-
it has lightweight green threads which help divide workload evenly.
In this blog post I'm going to describe how we achieved almost 100× speed-up in a client's project. This was simple and allowed us to reduce execution time from more than a week to just three hours!
The problem
The part of the project that is interesting to us is about querying an API of a web service, putting some data into a database, and generating CSV files along the way. Let's draw a simple diagram of the process:
The API returns entries in paginated form and we need to use the “limit” and “offset” query parameters to traverse the whole list. To make things easier for us, the API does not allow the limit value be greater than 100 nor the offset value to be greater than 3500. This leads to the question: how to divide all items into segments with size up to 3500? In the real application we use a combination of parameters, but in this blog post we will assume that variation of just one parameter is enough — price.
So far, the approach seems to be the following:
-
Choose price range (for example $1500 — $1550).
-
With that price range, use “limit” and “offset” to iterate through the paginated results and collect all the data.
-
Process fetched data as necessary and put it into the database, and also create/append to some CSV files using the information.
-
Shift the price range and repeat from step 2.
Now for the interesting part, the total number of entries we need to fetch is over 4 million. One query takes about half a second, so yeah it's going to take a while…
Making it faster with concurrent Haskell
Or is it? It's not entirely obvious how to divide the work between threads evenly, but the task obviously has the potential to be performed in parallel.
Price segmentation
I think the question to answer is “how to divide all the work we need to do to keep all cores busy”. One approach would be to divide the entire price range ($0 — $500000) into price sub-ranges ($50 each, for example) and start a thread (10000 total) per sub-range. The idea may seem crazy to non-Haskellers, but Haskell has extremely lightweight threads and the run-time system can handle up to a million of them (and more) working simultaneously — the number of threads is limited only by available memory. This makes it easy to build the right abstractions for parallel execution — you code how to process one entry and then just run an operation per entry in a separate Haskell thread, letting the run-time system keep all physical threads/cores loaded evenly.
Other parts of the system do not scale so well though and this is a pretty common situation. In particular, there are limitations on number of simultaneous connections. Through experiments, we have found that 100 threads is probably the best choice. We cannot choose price sub-ranges that are too wide because this way the “offset” limitation won't allow us to grab all the data. Ranges should be short enough — in our case $50 seems to be the golden mean.
With only 100 threads and $50 as a sub-range width, we need to make every thread shift price range several times. One solution is to assign price ranges like so:
But it's not a good idea because distribution of entries in the price continuum is not uniform. For example upper sub-ranges are less saturated than middle sub-ranges. This means that threads processing upper sub-ranges will finish before the others and speed-up won't be optimal. This is a better approach:
Here we have two price segments shown. A segment is a collection of adjoined sub-ranges where every sub-range is processed by a separate thread. Once a thread has finished with a price sub-range in one segment it jumps to the next one. Thus all threads get an approximately equal amount of work. This way, common sense and Haskell's lightweight threads work together to divide work evenly for maximal speed-up.
Interaction between threads
Suppose we have solved the problem of querying, but how do the other parts of the system work? We need to write information to CSV files — a file per value of some discrete parameter all entries have, and we have to do it “on-the-fly”. This means whenever we get some items we should either create a new file or append entries to an existing CSV file if some entries with the same value of that parameter have been written already.
We cannot do writing to disk from several threads for the following reasons:
-
Writing to disk is usually a sequential operation and it won't be very efficient even if we initiate it from many threads simultaneously.
-
Doing so we will likely get corrupted data with different entries mixed up.
We could use some mechanism to make sure that only one thread at any time is writing data to a particular file, but placing this code in querying threads will likely defeat our original intention — making things faster.
So for writing to CSV files we need a separate thread. Let's call it the “writing thread”. Let querying threads fetch data from the API and do all the processing and then “register” items that we want to write on disk for the writing thread to pick up. This looks like a FIFO channel between querying and writing threads.
What to do with database interaction? No reason not to insert entries concurrently; we just create connection pool with size 100 and place code inserting entries into querying threads.
With all that said, interaction between threads looks like this:
Implementation details
How exactly do we implement it in Haskell?
To start, we need to think about how exceptions should work in our code — when writing concurrent code, exceptions should not take you by surprise! We want to start 101 threads apart form our main thread: 100 querying threads and one to write CSV files. If any of them (including the main thread) throws an exception or is terminated by asynchronous exception (such as user interrupt) we want to:
- Propagate it to the main thread
- Terminate all sibling threads
The async
package is the
right tool for the job, it has the mapConcurrently
function:
-- | maps an @IO@-performing function over any @Traversable@ data
-- type, performing all the @IO@ actions concurrently, and returning
-- the original data structure with the arguments replaced by the
-- results.
--
-- For example, @mapConcurrently@ works with lists:
--
-- > pages <- mapConcurrently getURL ["url1", "url2", "url3"]
--
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
However, we will want to have access to a number of things:
-
database connection pool
-
TChan
— STM implementation of FIFO channel that allows querying threads to communicate with the writing thread -
possibly other
TVars
to count processed entries for example
MonadReader
will help with that. We can just put all the Tvar
s and other
values in one place and make it accessible everywhere in the working
threads. This is a very common pattern in real-word Haskell applications.
But here we have our problem: async
is hard-coded to work only with IO
monad, and yet we want to work ReaderT Params IO
. The solution is to use
the lifted-async
package.
The following code gives us the right behavior with respect to exception
handling and allows us to access TVar
s and connection pool from our
threads:
data GenConfig = GenConfig
{ cfMsgChan :: TChan Msg
-- ^ The channel that connects querying threads and the writing thread
, cfRetrieved :: TVar Int
-- ^ STM variable holding total number of entries retrieved
, cfGenerated :: TVar Int
-- ^ STM variable holding total number of entries produced
, cfConnPool :: ConnectionPool
-- ^ Database connection pool
, cfQuery :: PriceRange -> Word -> IO (Either ServantError Listing)
-- ^ Action that returns listing of items for given price range and offset
}
type Gen = ReaderT GenConfig IO
-- | Run the 'Gen' monad.
runGen
:: GenConfig -- ^ Generation config
-> Gen a -- ^ The monad to run
-> IO a
runGen cfg m = runReaderT m cfg
generate :: PostgresConf -> IO (Int, Int)
generate dbConfig = do
channel <- newTChanIO
retrieved <- newTVarIO 0
generated <- newTVarIO 0
pool <- createConnectionPool dbConfig
runGen GenConfig
{ cfMsgChan = channel
, cfRetrieved = retrieved
, cfGenerated = generated
, cfConnPool = pool
, cfQuery = undefined } $ void . mapConcurrently id $
csvWriter : (queryingAction 0 <$> priceRanges)
retrieved' <- readTVarIO retrieved
generated' <- readTVarIO generated
return (retrieved', generated')
csvWriter :: Gen ()
queryingAction :: Word -> PriceRange -> Gen ()
With the proposed approach and the principles of operation described earlier it should be obvious how to finish the application.
Conclusion
All 101 threads run 99.9% of the time, and we get an almost 100× speed-up. Currently the bottleneck is writing to the file system, but it performs reasonably well with modern SSD storage mediums — when the querying finishes, the writing thread catches up pretty quickly, even after three hours of the concurrent processing stage.
Concurrent programming feels just a little bit more imperative than the rest of Haskell, because you compose actions that actually mutate something, even if atomically. Still, it's much easier to get right than in traditional imperative languages, because it's easier to divide workload between threads and compose STM actions. As always, thinking about and working with limitations the real world imposes on us is the most difficult and interesting part of the work.