Bridging Iris and Rea

Decentralizing a CPS workflow with cloud messaging

12 September 2014

Péter Szilágyi, Dávid Juhász, Barnabás Králik

European Institute for Innovation and Technology

Eötvös Loránd University, Budapest, Hungary

Babeş-Bolyai University, Cluj-Napoca, Romania

Note, these are the offline slides of the presentation. For executable codes, please check playground availability at http://iris.karalabe.com/talks.

Cyber-physical workflows

Cyber-physical systems

Collection of distributed physical and computational entities, collaborating for a higher goal.

Middle ground between embedded and pervasive computing:

Usual characteristics:

Cyber-physical workflow systems

Purely functional fortress on top of the notion of workflows based on task orientation.

Task oriented programming:

Task based distributed workflows:

Rea workflow primitives

How to distribute the workflow?

Anything but a trivial task:

Many messaging designs, most fall short:

Iris decentralized messaging

Very concise background

Decentralized messaging system:

Supported communication primitives

Bridging the two worlds

Resource discovery

Find a suitable node to run a task at

Resource discovery – Invert the topology

Encode the resources within the structure of the network

Step #1 – Resource registration

class Node(id: String, res: Array[String]) extends ServiceHandler with TopicHandler {
  private val iris = Iris.register(55555, id, this)

  override def init(conn: Connection) {
    res.foreach(conn.subscribe(_, this))
  }
  def close() { iris.close() }
}

def main(args: Array[String]) {
  new Node("Burglar alarm", Array("Motion sensor", "WiFi interface")).close()
}
For online viewers, the program output is available here!

Step #2 – Resource lookup

def search(resources: Array[String]) {                                        // (local call)
  println(id + " searching for: " + resources.mkString(", "))

  val query = SerializationUtils.serialize((id, resources))
  conn.publish(resources(0), query)
}

override def handleEvent(event: Array[Byte]) {                           // (remote callback)
  val query = SerializationUtils.deserialize[(String, Array[String])](event)
  val (origin, needed) = query

  if (needed.toSet.intersect(res.toSet).size == needed.size) {
    println(id + " assigning to " + origin + ": " + needed.mkString(", "))
  }
}

Beside the burglar alarm, a SISY attendee is searching for WiFi access (hidden code)

For online viewers, the program output is available here!

Step #3 – Resource assignment

def assign(origin: String) {                                                 // (remote call)
  conn.tunnel(origin, 1000).close()
}

override def handleTunnel(tunnel: Tunnel) {                               // (local callback)
  notify(tunnel) // Thread notification omitted for clarity
}
val alarm = new Node("Burglar alarm", Array("Motion sensor", "WiFi interface"))
val phone = new Node("SISY Attendee", Array("WiFi interface", "Bluetooth"))

try {
  phone.search(Array("Bluetooth"), 1000).close()
} finally {
  phone.close(); alarm.close()
}
For online viewers, the program output is available here!

Assembling the tasks

Primitive tasks

type Task = (Array[String], Function1[Tunnel, Unit])                  // (resources, closure)
def start(task: Task, timeout: Long): Tunnel = {                              // (local call)
  val tunnel = search(task._1, timeout)
  tunnel.send(SerializationUtils.serialize(task)); tunnel
}

def execute(tunnel: Tunnel) {                                            // (remote callback)
  val task = SerializationUtils.deserialize[Task](tunnel.receive())
  task._2.apply(tunnel); tunnel.send(Array[Byte](0x00))
}
alarm = new Node("Burglar alarm", Array("Motion sensor", "WiFi interface"))
phone = new Node("SISY Attendee", Array("GSM access", "Bluetooth"))

val task = (Array("WiFi interface"),(input: Tunnel) => println("Burglar detected!"))
val link = alarm.start(task, 1000)
For online viewers, the program output is available here!

Sequential combinator

type Task = (Array[String], Function2[Tunnel, Tunnel, Unit], Object)  // (res, closure, next)
var output: Tunnel = null                                    // (previous 'execute' extended)
if (task._3 != null) {
  output = start(task._3.asInstanceOf[Task], 1000)
}
task._2.apply(tunnel, output); tunnel.send(Array[Byte](0x00))
if (output != null) {
  output.close()
}
alarm = new Node("Burglar alarm", Array("Motion sensor", "Security camera"))
phone = new Node("SISY Attendee", Array("GSM access", "Bluetooth", "WiFi interface"))

val track = (Array("Security camera"), (in: Tunnel, out: Tunnel) => println("Tracking"), null)
val alert = (Array("WiFi interface"), (in: Tunnel, out: Tunnel) => println("Spotted"), track)
val link = alarm.start(alert, 1000)
For online viewers, the program output is available here!

Advanced combinators

Parallel combinator:

Controller:

Pipes are standard publish/subscribe and have been omitted

Wrap up – The kettle of Budapest

Thank you

Péter Szilágyi, Dávid Juhász, Barnabás Králik

European Institute for Innovation and Technology

Eötvös Loránd University, Budapest, Hungary

Babeş-Bolyai University, Cluj-Napoca, Romania