Václav is a programming enthusiast who's constantly seeking ways to make development more effective and enjoyable. He's particularly interested in server-side Java technologies, distributed systems, concurrency, agile methodologies, modern programming languages and DSLs. He works for JetBrains as a senior software developer and a technology evangelist. He is also a board member of the JetBrains Academy. On the side, he's leading the GPars project, an opensource concurrency library, and investigates the domains of neural networks, evolutionary programming and data mining. You can check out his blog or follow him on twitter. [dzone] Václav is a DZone Zone Leader and has posted 45 posts at DZone. View Full User Profile

Secret Agents helping your code handle concurrency

09.15.2009
| 7967 views |
  • submit to reddit

Among all the available options to safely and correctly manage shared state in concurrent programs, I've selected the concept of Agents for my today's post. I'll describe the principle and show use of Agents in Groovy, leveraging the GParallelizer library.

The brave of you, who constantly live on the edge, might have tried Clojure, a functional lisp-like JVM programming language. Among other clever concurrency constructs, Clojure provides Agents, lightweight, actor-like objects wrapping shared modifiable values. I find the concept quite promising.

Think of agents as independent active objects (aka actors), completely encapsulating their internal state, which do not expose any methods, except for a send() operation, through which you can pass messages to the agent. However and most importantly, Agents only accept code blocks (lambdas, closures, functions) as messages. You're sending them code, not data. Are you with me?

For example, if agents were humans, you couldn't say to an agent "hey, cook a meal for me, here's the recipe",

agent.cookMeal(recipe)

or

agent.send(recipe)

but instead you'd have to say something like "hey, here's my chef, let him cook a meal in your kitchen".

agent.send(myChef)

So essentially a kind of command-processor pattern, if you will. One noteworthy distinction, though. Since the agent has its own (green/pooled) thread and only processes all received commands in this single thread, no races can occur among concurrently arrived commands and thus Agents guarantee thread safety for the encapsulated state. Without need for the commands to bother with locking or synchronization. You send raw, thread-ignorant code to Agents.

Schematically, a multi-threaded communication scenario with an agent might look something like this:

agent = new Agent(0)  //created a new Agent wrapping an integer with initial value 0
agent.send {increment()} //asynchronous send operation, sending the increment() function

thread {
agent.send {increment()}
}

thread {
agent.send {double()}
agent.send {decrement()}
}

...

//after some delay to process the message the internal Agent's state has been updated

...

println 'The current snapshot of the value is: ' + agent.val

Leverage Agents in Groovy

The GParallelizer library implements the agents concept in the SafeVariable class, which leverages the GParallelizer actor implementation. Instances of SafeVariable wrap a modifiable state and accept Groovy closures as messages. The supplied closures are invoked in turn by the SafeVariable's thread. They get the internal agent's state passed-in as a parameter and may freely alter it or replace with a completely new instance.

def registrations = new SafeVariable([])  //we're wrapping a list
registrations.send {it.add 'Joe'} //safely adding without risk of races
registrations.send {it.add 'Dave'} //from other threads making their own registrations

Checking the current state of an agent becomes a matter of retrieving the val property.

println 'Currently registered volunteers: ' + registrations.val

Smoothing the API

In most scenarios it is worthwhile to hide the command-processor-like nature of Agents behind a traditional method-oriented API. So you get a class, which from the outside looks like an ordinary plain object, but if you look carefully inside you'll see an Agent receiving command messages and processing them by means of its own thread. A real secret agent, isn't it?
The following example of an agent-based shopping cart might give you an inspiration on how to do so.

import org.gparallelizer.actors.pooledActors.SafeVariable

class ShoppingCart {
private def cartState = new SafeVariable([:])

//----------------- public methods below here ----------------------------------

public void addItem(String product, int quantity) {
cartState << {it[product] = quantity} //the << operator sends
//a message to the SafeVariable
}

public void removeItem(String product) {
cartState << {it.remove(product)}
}

public Object listContent() {
return cartState.val
}

public void clearItems() {
cartState << performClear
}

public void increaseQuantity(String product, int quantityChange) {
cartState << this.&changeQuantity.curry(product, quantityChange)
}

//----------------- private methods below here ---------------------------------

private void changeQuantity(String product, int quantityChange, Map items) {
items[product] = (items[product] ?: 0) + quantityChange
}

private Closure performClear = { it.clear() }
}


//----------------- script code below here -------------------------------------


final ShoppingCart cart = new ShoppingCart()
cart.addItem 'Pilsner', 10
cart.addItem 'Budweisser', 5
cart.addItem 'Staropramen', 20

cart.removeItem 'Budweisser'
cart.addItem 'Budweisser', 15

println "Contents ${cart.listContent()}"

cart.increaseQuantity 'Budweisser', 3
println "Contents ${cart.listContent()}"

cart.clearItems()
println "Contents ${cart.listContent()}"

You might have noticed two implementation strategies in the code.

1. Public methods may internally just send the required code off to the Agent, instead of executing the same functionality directly

And so sequential code like

public void addItem(String product, int quantity) {
cartState[product]=quantity
}

becomes

public void addItem(String product, int quantity) {
cartState << {it[product] = quantity}
}
2. Public methods may send references to internal private methods or closures, which hold the desired functionality to perform
public void clearItems() {
cartState << performClear
}

private Closure performClear = { it.clear() }

Currying might be necessary, if the closure takes other arguments besides the current internal state instance. See the increaseQuantity method.

Conclusion

I feel Agents can offer pretty safe and yet useful programming models for shared mutable state. The implementation in GParallelizer is available for you to try and use. Check out the GParallelizer SafeVariable wiki.
I'll be very happy if you come back with feedback, proposals and ideas. Enjoy Groovy concurrency!
Published at DZone with permission of its author, Václav Pech.