Entrepreneur. Creator of Groovy++ Alex is a DZone Zone Leader and has posted 31 posts at DZone. You can read more from them at their website. View Full User Profile

Fast immutable persistent functional queues for concurrency with Groovy

02.22.2010
| 12086 views |
  • submit to reddit

Everybody probably heard that functional data structures are great for concurrency. In this note I am going to show why is it so and to show how easy to implement highly performant functional queue with Groovy++. You can find source code and more examples in the Groovy++ distro

Let us start with some philosophy. What is the main problem with concurrency? Obviously, to have changes of shared data synchcronized between consumers. The reason is that synchronization immidiately leads us to deadlockы, thread starvation and all other goodies we hate so much.

The best thing of course would be not to have shared data at all. Usually it is not possible. At some level we can use message passing approach when instead of sharing data objects send messages containing immutable data to each other but even that use shared data underneath (message queues, thread pools etc). How to deal with that?

Well-known idea here is to use shared mutable references to immutable data. We have volatile field pointing to immutable data and mutation method, which produce new modified copy of data then in the infinite loop we try to read field value and try to compare-and-set mutated value to the field

for(;;) { 
def oldValue = volatileField
def mutatedValue = mutate(oldValue)
if (volatileField.compareAndSet(oldValue, mutatedValue) {
break;
}
}

In Groovy++ we can apply methods of AtomicXXX to any volatile field. Compiler takes care for creation of static final AtomicXXXFieldUpdater

Great thing here is that there is no blocking operations involved at all.

Of course, mutation should be very cheap operation. For exampe, coping of the whole list is not an option. Another requirements is that mutation can not change anything in initial data. We need special data structures, which allows such operations. Such data structure known as persistent.

Let us design the simplest functional persistent data structure which is LIFO unidirectional list.

Functional list is pair of head and tail list. It has very few basic operations: size, getHead, getTail, addFirst and removeFirst. addFirst is cheap & tiny allocation and remove is even simpler.

We can also iterate, remove elements (linearly expensive) and reverse the list (linearly expensive both time/space) 

/**
* Simple implementation of one-directional immutable functional list
*/
@Typed abstract static class FList<T> implements Iterable<T> {
/**
* Singleton for empty list
*/
static final FList emptyList = new EmptyList ()

/**
* Number of elements in the list
*/
final int size

FList (int size) { this.size = size }

/**
* Element last added to the list
*/
abstract T getHead ()

/**
* Tail of the list
*/
abstract FList<T> getTail ()

/**
* Check is this list empty
*/
final boolean isEmpty () { size == 0 }

final int size () { size }

/**
* Creates new list containing given element and then all element of this list
*/
abstract FList<T> plus (T element)

/**
* Creates new list containing all elements of this list except given one
*/
final FList<T> minus (T element, FList<T> accumulated = FList.emptyList) {
!size ?
accumulated.reverse() :
head == element ?
tail.addAll(accumulated) :
tail.minus(element,accumulated + head)
}

/**
* Creates new list containing given element and then all element of this list
*/
final FList<T> addAll (Iterable<T> elements) {
def res = this
for (el in elements)
res += el
res
}

/**
* Utility method allowing convinient syntax <code>flist ()</code> for accessing head of the list
*/
final T call () { head }

/**
* Create reversed copy of the list
*/
final FList<T> reverse (FList<T> accumulated = FList.emptyList) {
if(!size) { accumulated } else { tail.reverse(accumulated + head) }
}

/**
* Checks is this list contains given element
*/
final boolean contains (T element) {
size && (head == element || tail.contains(element))
}
}

We actively use features of Groovy++ in the code above and below. Default method arguments, tail recursive calls, properties, []/[:]-syntax for object construction, optional returns and optional public helps us to be very expressive

So we have abstract implementation of the list. The idea is (mostly for performance reasons) to have separate implementations for empty list, list of one element and general case.

Let us start with empty one. The implementation is almost trivial

    private static class EmptyList<T> extends FList<T> {
EmptyList () { super(0) }

Iterator iterator () {
[
hasNext:{false},
next:{throw new NoSuchElementException()},
remove:{throw new UnsupportedOperationException()}
]
}

final OneElementList<T> plus (T element) {
[element]
}

T getHead() {
throw new NoSuchElementException()
}

FList<T> getTail() {
throw new NoSuchElementException()
}
}

 Now we can do one element list, which also will be base for general case (for that we have additional protected constructor)

    private static class OneElementList<T> extends FList<T> {
T head

OneElementList (T head) {
super(1)
this.head = head
}

protected OneElementList (T head, int addSize) {
super(addSize+1)
this.head = head
}

final MoreThanOneElementList<T> plus(T element) {
[element, this]
}

Iterator<T> iterator() {
head.singleton().iterator()
}

FList<T> getTail() {
emptyList
}
}

Method singleton used in iterator is our old friend java.util.Collections.singleton() Groovy++ makes all methods of Arrays and Collections available to our service

Now we are ready to implement general case. It is even easier.

    private static class MoreThanOneElementList<T> extends OneElementList<T> {
final FList<T> tail

MoreThanOneElementList (T head, FList<T> tail) {
super (head, tail.size)
this.tail = tail
}

Iterator<T> iterator () {
[
cur: (FList<T>)this,
hasNext: { cur.size },
next: { def that = cur; cur = cur.tail; that.head },
remove: { throw new UnsupportedOperationException() }
]
}
}

And we are done.

Of course, our list has very limited functionality but it is great building block for building concurrent algorithms. And now we start building our main example of such algorithm - functional queue.

Simplifying things a bit, functional queue is immutable data structure, which supports very few basic operations: - size

- addFirst & addLast, which returns new queue containing new element and all elements from original queue

- removeFirst, which returns pair consisting of first element and queue after removal of it

Technically functional queue is implemented as pair of functional lists - input and output. addLast adds to input, addFirst adds to output and removeFirst takes first element from output if it is non empty, otherwise before removal we need to transfer reversed input in to output.

The beauty is that adding is O(1) operation and removal is O(1) operation at average (every element transfered from input to output at most once) Both operation don't involve full structure copiing. Of course, when we use functional queue as volatile field, we should remember about potential collisions, which still way cheaper than blocking operations.

So let us implement it. We start with abstract definition

@Typed
abstract class FQueue<T> implements Iterable<T> {
abstract boolean isEmpty ()

T getFirst () { throw new NoSuchElementException() }

Pair<T, FQueue<T>> removeFirst() { throw new NoSuchElementException() }

abstract FQueue<T> addLast (T element)

abstract FQueue<T> addFirst (T element)

/**
* Number of elements in the list
*/

static final EmptyQueue emptyQueue = []

abstract int size ()
}

Again for performance reasons we will implement separately cases for empty and one-element queue. In theory, we can also separate case of outputonly queue but we leave it as exercise for the reader.

So let us implement empty queue now.

    private static final class EmptyQueue<T> extends FQueue<T> {
EmptyQueue(){
}

OneElementQueue<T> addLast (T element) { [element] }

OneElementQueue<T> addFirst (T element) { [element] }

Iterator<T> iterator () {
[
hasNext: { false },
next: { throw new UnsupportedOperationException() },
remove: { throw new UnsupportedOperationException() }
]
}

final int size () { 0 }

boolean isEmpty () { true }
}

The one element queue is also extremly simple

    private static final class OneElementQueue<T> extends FQueue<T> {
T head

OneElementQueue(T head){
this.head = head
}

MoreThanOneElementQueue<T> addLast (T element) { [(FList.emptyList + element) + head, FList.emptyList] }

MoreThanOneElementQueue<T> addFirst (T element) { [(FList.emptyList + head) + element, FList.emptyList] }

T getFirst () { head }

Pair<T, FQueue<T>> removeFirst() {
[head, FQueue.emptyQueue]
}

Iterator<T> iterator () {
head.singleton().iterator()
}

final int size () { 1 }

boolean isEmpty () { false }
}

General case is not too complicated either. Maybe the only interesting detail is removeFirst method below. The point there is that we never keep less than two elements in the output queue. If it happens we either transfer output to input or deal with one element queue.

    private static final class MoreThanOneElementQueue<T> extends FQueue<T> {
private final FList<T> input, output

MoreThanOneElementQueue (FList<T> output, FList<T> input) {
this.input = input
this.output = output
}

MoreThanOneElementQueue<T> addLast (T element) {
[output, input + element]
}

MoreThanOneElementQueue<T> addFirst (T element) {
[output + element, input]
}

T getFirst () { output.head }

Pair<T, FQueue<T>> removeFirst() {
if (size () == 2)
[output.head, new OneElementQueue(output.tail.head)]
else {
if(output.size > 2)
[output.head, (MoreThanOneElementQueue<T>)[output.tail, input]]
else {
[output.head, (MoreThanOneElementQueue<T>)[input.reverse() + output.tail.head, FList.emptyList]]
}
}
}

Iterator<T> iterator () {
output.iterator() | input.reverse().iterator()
}

final int size () {
input.size + output.size
}

boolean isEmpty () { false }
}

We are done with implementation of fast immutable persistent functional queues. Probably the only open question now how fast is it.

In the next article I will talk on Groovy++ message passing architecture based on techniques described above and give some figures for different strategies and comparision with other frameworks. For now, I want to say only that it is able to process actor like 3,5-4M messages per second and it 15-20% faster compare to amazingly fast jetlang.

I hope this article was useful and motivate  you to learn more about concurrency and functional algorithms.

Till next time.

Published at DZone with permission of its author, Alex Tkachman.

Comments

Artur Biesiadowski replied on Tue, 2010/02/23 - 1:47am

Can you tell how big benefit was gained by specializing single element lists? So far I have seen such constructs done with just multi + empty nodes (and single element list was just multi of (element,empty)).

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.