// E07-ProducerConsumer.cpp -- demonstrates cooperation between multiple threads within a program

// In this example one thread (Producer) puts some data into a shared buffer, while
//  several other threads (Consumers) concurrently retrieve the data from the buffer
//  and perform some work on it. If there is no data in the buffer the consumers will
//  wait until the Producer makes more data available and notifies the Consumer threads
//  to resume their work.

// The main program thread acts as a superviser of the entire operation and once the
//  Producer thread has completed its operation (i.e. put all the data it has into
//  the shared buffer) the supervisor cancels the Consumer threads. Upon cancelletion
//  each of the Consumer threads will put the result of their work into a common result
//  collector buffer and finish execution.

// The shared buffer is implemented as a FIFO queue of a fixed size. Once full the buffer
//  will block the Producer thread trying to put more data into it until the data is
//  retrieved by any one of the Consumer threads.

#include <Platform.h>
using namespace Platform;
using namespace Platform::Containers;
using namespace Platform::Containers::Names;

// the limit of the shared buffer (in payload items)
#define BufferLimit     3
// the number of payload items to process (sent by the Producer thread)
#define TransmitItems   33


// implementation of the result collector buffer
//  the result collector collects the data processed by the Consumer threads
//====================================================================================================
struct ResultCollector:
    protected Lang::TypeConcepts::SynchronizableType
{
    // processed data storage
    IntArray data;

    // this method is called by the Consumer threads when they finish their work
    Void collect(const IntArray& processedData)
    {
        // acquire a lock on this monitor for the duration of this method
        //  (the lock is scope based)
        Thread::Lock lock(this);

        // put the processed data into storage
        data.addAll(processedData);
    }
};


// implementation of the shared buffer
//====================================================================================================
struct SharedBuffer:
    protected Lang::TypeConcepts::SynchronizableType
{
    IntQueue buffer;

    // put the payload into the shared buffer (done by the Producer thread)
    Void put(Int payload)
    {
        // acquire a lock on this monitor for the duration of this method
        //  (the lock is scope based)
        Thread::Lock lock(this);

        // if the shared buffer is full, block this thread and place it in the
        //  waiting queue for this monitor; the wait method will automatically
        //  release the monitor
        while(buffer.size() >= BufferLimit)
            Thread::wait(this);

        // when we get here there is space available in the shared buffer,
        //  and so we put the payload into the buffer
        buffer.enqueue(payload);

        // notify all the threads waiting on this monitor that some new data
        //  has become available; at this point the threads queued on this
        //  monitor will be unblocked and begin to compete for ownership of
        //  the monitor
        Thread::notifyAll(this);
    }
    // get the payload from the shared buffer (done by the Consumer threads)
    Int get()
    {
        // acquire a lock on this monitor for the duration of this method
        //  (the lock is scope based)
        Thread::Lock lock(this);

        // if the shared buffer is empty, block this thread and place it in the
        //  waiting queue for this monitor; the wait method will automatically
        //  release the monitor
        while(buffer.isEmpty())
            Thread::wait(this);

        // when we get here the Producer has put the payload into the shared buffer,
        //  and so we retrieve the payload
        Int payload = buffer.dequeue();

        // notify all the threads waiting on this monitor that there is space
        //  in the buffer available; at this point the threads queued on this
        //  monitor will be unblocked and begin to compete for ownership of
        //  the monitor
        Thread::notifyAll(this);

        // return the payload
        return payload;
    }
};


// implementation of the Producer thread
//====================================================================================================
struct Producer:
    public Thread::IRunnable
{
    // reference to the shared buffer into which the Producer puts payload data
    //  to be processed by the Consumer threads
    SharedBuffer& buffer;

    // Producer constructor
    Producer(SharedBuffer& inBuffer):
        buffer(inBuffer)
    {}

    // Producer thread implementation
    Void run()
    {
        // the Producer thread simply puts the payload data into a shared buffer;
        //  a real-world implementation could for instance retrieve the data from
        //  some remote source (e.g. a socket) and multiple Producer threads could
        //  be running simultaneously
        for(Int i = 0; i < TransmitItems; i++)
            buffer.put(i);
    }
};


// implementation of the Consumer threads
//====================================================================================================
struct Consumer:
    public Thread::IRunnable
{
    // reference to the shared buffer from which the Consumer threads retrieve
    //  payload data to be processed
    SharedBuffer&       buffer;

    // reference to the buffer into which the Consumer threads will put the
    //  processed data
    ResultCollector&    result;

    // stores data processed by the Consumer; when the Consumer is finished
    //  the processed data is collected into a separate storage
    IntArray processedData;

    // Consumer constructor
    Consumer(SharedBuffer& inBuffer, ResultCollector& inResult):
        buffer(inBuffer),
        result(inResult)
    {}

    // Consumer thread implementation
    Void run()
    {
        // we enclose the thread's code into try-catch block to be able
        //  to receive and handle interrupt/cancel signals from the parent
        //  thread which provides supervision of the entire process
        try
        {
            // loop forever
            while(True)
            {
                // retrieve a payload from the shared buffer
                Int payload = buffer.get();

                // at this point the payload data would be processed in a
                //  real-world program

                // here we simply negate the payload integer value
                payload = -payload;

                // put the processed payload data into this thread's storage,
                //  it will be collected when this thread finishes
                processedData.add(payload);
            }
        }
        // catch and handle the cancel signal sent by the superviser thread
        catch(Thread::CancelSignal e)
        {
            // the superviser (main) thread sends a cancel signal to all the
            //  Consumer threads telling that no more data is to be processed

            // at this point we simply put the data processed by this Consumer
            //  thread into the result storage and return thus ending this thread
            result.collect(processedData);
        }
    }
};


// program entry point - this is the main (supervisor) thread
//====================================================================================================
AppMain(args)
{
    // define the result data storage
    ResultCollector result;
    // define the shared buffer storage
    SharedBuffer    storage;

    // define an array of runnable threads which will perform the some work
    Array<Thread::IRunnable*> threads;

    // create an instance of a Producer runnable and put it into the array
    //  (it is placed at index 0 in the array)
    threads.add(new Producer(storage));

    // create ten instances of the Consumer runnable and put them into the array
    for(Int i = 0; i < 10; i++)
        threads.add(new Consumer(storage, result));

    // create a task group using the runnable instances in the array
    //  note that the threads are started immediately as Task objects are
    //  created by the TaskGroup's constructor
    Thread::TaskGroup tasks(threads.sequence(), threads.size());

    // block this (main) thread and let the Producer finish
    //  the Producer thread will put the data to be processed by the Consumer
    //  threads into the shared buffer
    tasks[0].join();

    // block this thread letting each of the consumers to process the data
    //  placed into the shared buffer by the Producer thread
    for(Int i = 1; i < tasks.size(); i++)
        tasks[i].cancel();

    // wait until all the threads in the group have finished execution
    tasks.join();

    // clean up: delete the runnable instances in the threads array
    deleteAll(threads);

    // print the collected result to standard output
    Runtime::StdOut::fprintln(appText("The result is: %s"), result.data.sort().toString().value());

    return 0;
}


/* EOF */