Flow

Flow is a lightweight C++ inter-thread flow library, built on top of the boost::thread library. It allows dispatch of parts of a complex computation process to one or more additional threads, in a transparent and idiomatic way. It has been written in early 2008 by me (Victor Nicollet), and it is available under the terms of the new BSD license.

The flow library implements thread-safe FIFO queues (flows), and provides facilities for manipulating them that are compatible with the Standard C++ Library.

Downloads

Using this library requires Boost.Threads and Boost.Optional to be installed and available to the program. To use the elements defined in the flow library, one only has to include the header file flow.hpp: since all code is templated, there is no need for separate compiling and linking.

  • flow.hpp : include this file to use any entity in the library.
  • prime.cpp : a naive prime number test using several cores.
  • sort.cpp : a constant-time sort using an auxiliary thread.

Concepts

A flow is a FIFO queue where enqueueing and dequeueing are atomic. The type flows::flow<T> represents a flow of elements of type T.

Flows are manipulated through readers and writers, which are objects used to read from and write to the flow. Reading will remove the oldest element from the flow, while writing will append a new element to the flow, in a FIFO fashion. Attempting to read from an empty flow will block until another thread writes to the flow. Readers are represented by class flows::reader<T>, and writers are represented by class flows::writer<T>.

When all writers of a flow disappear, the flow is closed: all the readers are notified that no more data will be appended to the flow, triggering an end-of-flow state once all the remaining elements have been removed from the flow.

When all readers of a flow disappear, the flow is silent: all the writers are notified that nobody will ever read the data written to the flow, providing opportunities for optimization.

When all readers and writers of a flow disappear, that flow is automatically garbage-collected and is destroyed by the thread that destroyed the last reader or writer (along with all the elements it still contained).

Documentation

class flows::reader<T>

This class represents a reader, which may be associated to a flow (valid reader), or not be associated to any flow (invalid reader). The class itself can be summarized as:

template <typename T>
class reader
{
public:
  typedef T                  value_type;
  typedef boost::optional<T> option_type;
  typedef /* unspecified */  iterator; 

  iterator    begin();
  iterator    end();
              reader();
              reader(const reader&);
  reader&     operator =(const reader&);
  void        close();
  option_type get();
};

The default constructor creates an invalid reader. A valid reader can only be obtained from flows::flow<T>. Assigning to a reader dissociates that reader from its flow (if any), which may lead to that flow being silenced or even destroyed if this was the last reader for that flow. Note that flows::reader<T>::close() is equivalent to assigning a default-constructed reader: the reader becomes invalid and detached from its associated flow (if any).

The flows::reader<T>::get() function behaves as follows:

  • If the reader is invalid, returns no value.
  • If the reader is valid, but the associated flow is closed and empty, returns no value.
  • If the reader is valid, but the associated flow is empty, blocks until a value becomes available or the flow is closed, and tries again.
  • If the reader is valid and the associated flow is not empty, atomically removes the oldest value and returns it.

As a consequence, if two or more readers are associated to the same flow, every element in the flow will be read by at most one reader (and all elements will be either read by a reader or destroyed along with the flow).

flows::reader<T>::begin() and flows::reader<T>::end() are the first and past-the-end iterators of the read sequence of elements. They are input iterators (in the sense defined by the Standard C++Library) which represent the sequence of elements read by a particular reader:

  • Creating an iterator using begin() or incrementing an iterator reads the next value from the flow by calling the flows::reader<T>::get() function on the associated reader. This function call may block until the flow is written to. If it returns a value, then the iterator is bound to that value. Otherwise, the iterator becomes equal to the past-the-end iterator.
  • Dereferencing an iterator returns the value bound to the iterator. This is guaranteed to occur in constant time.

Iterators are invalidated on every assignment to the reader.

An analogy for read iterators is the std::istream_iterator, but the data is read from a flow using flows::reader<T>::get(), instead of being read from a stream using operator>>().

class flows::writer<T>

This class represents a writer, which may be associated to a flow (valid writer), or not be associated to any flow (invalid writer). The class can be summarized as:

template <typename T>
class writer
{
  typedef T                 value_type;
  typedef /* unspecified */ iterator; 

  iterator begin();
           writer();
           writer(const writer&);
  writer&  operator =(const writer&);
  void     close();
  bool     silent();
  void     put(const T&);
};

The default constructor creates an invalid writer. The only way to create a valid writer is to use flows::flow<T>. Assigning to a writer dissociates that writer from its flow (if any), and then binds it to the flow of the writer that was assigned to it (if any). This may lead to the flow becoming closed or even garbage-collected. flows::writer<T>::close() is equivalent to (but more idiomatic than) assigning a default-constructed writer.

The flows::writer<T>::put() function atomically writes a copy of its argument to the flow associated to the writer. If there is no associated flow, the behavior of this function is undefined.

The flows::writer<T>::silent() function returns whether the associated flow is silent or not. If there is no associated flow, the behavior of this function is undefined. This is provided as optimization hints to avoid computing values only to place them in a silent flow. It can be used to notify search threads that the one of them has found what was searched. See the above example on prime numbers for an illustration.

The flows::writer<T>::begin() function returns the output iterator for this writer. This iterator behaves as an std::back_insert_iterator, but calls flows::writer<T>::put() on its associated writer instead of calling insert(). This iterator is invalidated by assignment to the writer.

class flows::flow<T>

This class represents an active flow, and allows creating fresh readers and writers associated to that flow. For that reason, for as long as this instance exists, the underlying flow can be neither closed nor silenced (because a new reader or writer could be obtained from it at any time). Therefore, this instance should be used to prepare a computation, and destroyed before the computation starts.

When the instance is destroyed, the flow itself remains in memory for as long as readers or writers are associated to it. Once all readers and writers are gone, the flow is destroyed immediately, destroying any unread data left.

Note that the library provides several functions that dispatch computations to other threads without requiring the explicit creation of a flow from user code. Ideally, flows::flow<T> should only be used in situations where these functions do not meet the needs of the program, and even then it should only be used with care.

The class can be summarized as:

template <typename T> 
class flow
{
  flow(); 

  const writer<T>& input() const;
  const reader<T>& output() const;
};

As expected, flows::flow<T>::input() returns a writer associated to the flow, and flows::flow<T>::output() returns a reader associated to the flow. Unlike the naming approach of the C++ Standard Library, where an input stream is a reader and an output stream is a writer, the input of a flow is the writer (where the flow gets its input) and the output of a flow is the reader (where the flow sends its input).

Generators

A generator is a separate thread that computes values and outputs them to a flow. The original thread keeps a reader associated to that flow, and may read values from the flow as they become available.

Two functions allow the creation of generators:

template <typename T, typename F>
reader<T> generate(F f); 

template <typename T, typename F, typename I>
reader<T> generate(F f, I b, I e);

The first function creates a new flow and a new thread. Then, in that new thread, it calls f(w) where f is the argument to the generate function, and w is a writer associated to that flow. Simultaneously, it returns a reader associated to the flow in the original thread. The created thread ends when the argument function returns.

The second function creates one new flow, and creates a thread for each iterator i between b and e. Each thread calls f(w,*i), and the function simultaneously returns a reader for the flow. Each created thread ends when its function call returns.

That is, the first function creates a generator, and the second function creates one generator for each iterator in the passed sequence.

Processing

A processor is a thread which reads from a flow, does some computations, and writes to another flow. The most generic way of doing this (by specifying the exact function that reads and writes) is with the flows::process<T>() function:

template<typename T, typename U, typename F>
reader<T> process(const reader<U> &source, F f);

This function creates a new flow and a new thread which runs f(source,dest), where dest is a writer associated to the new flow. It immediately returns a reader associated to the new flow.

A special case of processing is mapping an function to a flow: the processor reads a value, applies a function to it, and writes the result to another flow. Mapping is done using the flows::transform<T>() function:

template <typename T, typename U, typename F>
reader<T> transform(const reader<U> &source, F f, unsigned n = 1);

This function creates n new threads and a single flow, and returns a reader associated to that flow. Every thread reads a value x from the source reader, and writes f(x) to the new flow. Note that if n = 1, the order is preserved, but not if n > 1. The new flow is closed when all the data from the original flow has been processed.

Accumulating

An accumulator is a thread which reads from a flow and applies a binary operation to collapse all the values into a single one, using an initial value for the first application. Once all the data is processed, it writes the final value to an output flow.

Accumulation is performed using the function flows::accumulate<T>() function:

template <typename T, typename U, typename F>
reader<T> accumulate(const reader<U> &source, T initial, F f);

This function creates a flow and thread. The thread performs the operation f(initial,*it) for it between source.begin() and source.end(), then writes initial to the flow. The function returns the reader associated to the flow immediately.

Further work

The implementation could be improved. First, it could be adapted to work with lock-free structures. Second, it could also be adapted to use worker threads instead of spawning new threads for every operation.

The interface could also use some improvement. For instance, it’s still too easy to create deadlocks by forgetting to close a writer in the wrong place. And the code does not interact with the standard library as well as it should.

2 Responses to “Flow”


  1. http://www.gamedev.net/community/forums/topic.asp?topic_id=514871&whichpage=1&#3351296

    Please update… give it your all… if you write a good threading library for multicore systems and keep it up-to-date, you’ll never regret it. it’ll save you and others a lot of problems down the line.

    http://nocturnal.insomniacgames.com/index.php/Main_Page
    This might also be useful. Good luck.

    As for me… I’ll be doing the same… but not at a quality level I could share as I’m new to C++ and threading in general. Why are the abstractions we use so alien to our minds… there must be a better way. Good luck.

    The interface could also use some improvement. For instance, it’s still too easy to create deadlocks by forgetting to close a writer in the wrong place.
    You could use the C++ reflection in the nocturnal initiative to determine where you made that glitch at runtime… I think… sometimes debugging compile time errors at runtime isn’t a bad idea… but then again if you never run that little code segment, you’re screw’d.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>



1170 feed subscribers
(readers who polled a feed this week)