5 Coroutines and Concurrency

A BETA object may be the basis for an execution thread. Such a thread will consist of a stack of objects currently being executed. An object which can be used as the basis for an execution thread has to be declared as an object of kind component as shown in the following declaration:

5.1 Components with execution threads

A: @|activity

The symbol '|' describes that the object A is a component. A component (thread) may be executed as a coroutine or it may be forked as a concurrent process. Consider the following description of activity:

activity:
  (#
  do cycle
      (#
      do getOrder; suspend;
         processOrder; suspend;
         deliverOrder; suspend
  #)#)

The component object may be invoked by an imperative

A

which implies that the do-part is executed. The execution of A is temporarily suspended when A executes a suspend-imperative. In the above example this happens after the execution of getOrder. A subsequent invocation of A will resume execution after the suspend-imperative. In the above example this means that processOrder will be executed. If B is also an instance of activity, then the calling object may alternate between executing A and B:

cycle(# do A;  B;  #)

The above example shows how to use components as deterministic coroutines in the sense that the calling object controls the scheduling of the coroutines. In section 9.1 below another example of using coroutines will be given.

It is also possible to execute component objects concurrently. By executing

A[]->fork; B[]->fork

the component objects A and B will be executed concurrently. As for the deterministic coroutine situation, A and B will temporarily suspend execution when they execute a suspend-imperative. Further examples of concurrent objects will be given below in section 9.2.

5.2 Coroutines

Deterministic coroutines have demonstrated their usefulness through many years of usage. Below we give a typical example of using coroutines.

Suppose we have a register for the permanent workers and another one for the hourly paid workers. Suppose also that it is possible to sort these registers according to a given criterion like the total hours worked by the employee. Suppose that we want to produce a list of names of all employees sorted according to the total hours worked. This may be done by merging the two registers. A register object has a scan operation that makes it possible to go through all elements of the register. Instead we define an operation of register in the form of a coroutine getNext, which delivers the next element of the register when called:

register:
  (# 
     getNext: | @
       (# elm: ^employee
       do scan(# do current[]->elm[]; suspend #);
          none->elm[]
       exit elm[]
       #);
  #);
  pReg: @permanentRegister; hReg: @hourlyPaidRegister;
  
  pReg.getNext->e1[]; hReg.getnext->e2[];
  L: cycle
    (#
    do (if e1[] = none then (*empty hReg*); leave L if);
       (if e2[] = none then (*empty pReg*); leave L if);
       (if e1.totalHours < e2.totalHours then
           e1.print; pReg.getNext->e1[]
        else 
           e2.print; hReg.getNext->e2[]
       if)
    #)

5.2.1 Suspending and resuming

The attributes getNext of the objects pReg and hReg have their own thread of execution. When called in an imperative like pReg.getNext->e1[], the thread is executed until it either executes a suspend or terminates. If it executes a suspend, it may be called again in which case it will resume execution at the point of suspend. The first time getNext is called, it will start executing scan. For each element in the register, it will suspend execution and exit the current element via the exit variable elm[]. When the register is empty, NONE is returned.

5.3 Concurrency

As previously mentioned, it is possible to perform concurrent execution of components by means of the fork operation as sketched in the following example:

(# S1: @| (#  do  #);
   S2: @| (#  do  #);
   S3: @| (#  do  #)
  do S1[] -> fork; S2[] -> fork; S3[] -> fork;  
#)

The execution of S1, S2 and S3 will take place concurrently with each other and with the object executing the fork operations. Concurrent objects may access the same shared objects without synchronization, but may synchronize access to shared objects by means of semaphores. In section 5 above the pattern semaphore has been described. It is well known that a semaphore is a low level synchronization mechanism which may be difficult to use in other than simple situations. For this reason the Mjølner library has a number of patterns defining higher level synchronization mechanisms. This library includes a monitor pattern as described in section 5 above. The library also includes patterns defining synchronization in the form of rendezvous as in Ada.

5.4 Monitor Example

The following example describes a company with a number of salesmen, workers and carriers. The salesmen obtain orders from customers and store them in an order pool. The workers obtain orders from the order pool, process them and deliver the resulting item in an item pool. The carriers pick up the items from the item pool and bring them to the customer. Salesmen, workers and carriers are described as active objects whereas the order- and item pools are represented as monitor objects.

(# salesman: employee
     (# getOrder: (#  exit anOrder[] #)
     do cycle (# do getOrder -> jobPool.put #)
     #);
   S1,S2, : @|salesman;
   jobPool: @monitor
     (# jobs: @register(# type::< order #);
        put: entry
          (# ord: ^order enter ord[] do ord[] ->jobs.insert #);
        get: entry
          (# ord: ^order do jobs.remove -> ord[] exit ord[] #)  
     #);
   worker: employee
     (# processJob: (#  enter anOrder[] do  exit anItem[] #)
     do cycle(# do jobPool.get -> processJob -> itemPool.put #)
     #);
   W1,W2,: @| worker;
   itemPool: @monitor(#  #);
   carrier: employee
     (# deliverItem: (# enter anItem[] do  #)
     do cycle(# do itemPool.get ->DeliverItem #)
     #);
   C1,C2, : @| carrier;
do jobPool.init; itemPool.init; 
   conc(# do S1[]->start;  W1[]->start;  C1[]->start;  #)
#)

5.4.1 Procedure pattern conc

The procedure pattern conc is another example of a high-level concurrency pattern from the Mjølner library. It does not terminate execution until components being started (by S1[]->start, etc.) have terminated their execution.

5.4.2 Rendezvous Example

Next we show an example of using the library patterns for describing synchronized rendezvous. The example shows a drink machine that provides coffee and soup. A customer operates the machine by pushing either makeCoffee or makeSoup. If makeCoffee has been pushed, then the customer may obtain the coffee by means of getCoffee. Similarly if makeSoup has been pushed then the soup may be obtained by means of getSoup.

The system pattern has a port attribute which may be used to define synchronization ports. The drink machine described below has three such ports, activate, coffeeReady, and soupReady. A port object has a pattern attribute entry which may be used to define procedure patterns associated with port. For the port activate, two procedure patterns makeCoffee and makeSoup are defined. For coffeeReady and soupReady, the procedure patterns getCoffee and getSoup are defined.

An execution of a port-entry operation like aDrinkMachine.makeCoffee will only be executed if the drinkMachine has executed a corresponding accept by means of activate.accept.

drinkMachine: system
  (# activate: @port;
     makeCoffee: activate.entry
       (# do  coffeeReady[]->drinkReady[] #);
    makeSoup: activate.entry(# do  soupReady[]->drinkReady[] #);
    coffeeReady, soupReady: @port;
    getCoffee: coffeeReady.entry(# do  exit someCoffee [] #);
    getSoup: soupReady.entry(# do  exit someSoup [] #);
    drinkReady: ^port
  do cycle(# do activate.accept; drinkReady.accept #)
  #)

The drinkMachine may be used in the following way:

aDrinkMachine: @| drinkMachine
  
  aDrinkMachine.makeCoffee;  aDrinkMachine.getCoffee;
  aDrinkMachine.makeSoup;  aDrinkMachine.getSoup;

As may be seen the use of the patterns system, port and entry makes it possible to describe a concurrent program in the style of Ada tasks that synchronize their execution by means of rendezvous. A port object defines two semaphores for controlling the execution of the associated entry patterns. The actual details will not be given in this language introduction.

It is possible to specialize the drinkMachine into a machine that accepts further operations:

extendedDrinkMachine: drinkMachine
  (# makeTea: activate.entry(# do  teaReady[]->drinkReady[] #);
     teaReady: @port;
     getTea: teaReady.entry(#  exit someTea[] #)
  #)

The extendedDrinkMachine inherits the operations and protocol from drinkMachine and adds new operations to the protocol.

The basic mechanisms in BETA for providing concurrency are component-objects (providing threads), the fork-imperative (for initiating concurrent execution) and the semaphore (for providing synchronization). As has been mentioned already, these mechanisms are inadequate for many situations. The abstraction mechanisms of BETA make it possible to define higher-level abstractions for concurrency and synchronization.

5.4.3 More information

Please see the manual [MIA 90-8] for details about the concurrency library.


BETA Language Introduction
© 1994-2004 Mjølner Informatics
[Modified: Saturday October 21st 2000 at 18:34]