class Supply

Asynchronous data stream with multiple subscribers

class Supply {}

A supply is a thread-safe, asynchronous data stream like a Channel, but it can have multiple subscribers (taps) that all get the same values flowing through the supply.

It is a thread-safe implementation of the Observer Pattern, and central to supporting reactive programming in Perl 6.

There are two types of Supplies: live and on demand. When tapping into a live supply, the tap will only see values that are flowing through the supply after the tap has been created. Such supplies are normally infinite in nature, such as mouse movements. Closing such a tap does not stop mouse events from occurring, it just means that the values will go by unseen. All tappers see the same flow of values.

A tap on an on demand supply will initiate the production of values, and tapping the supply again may result in a new set of values. For example, Supply.interval produces a fresh timer with the appropriate interval each time it is tapped. If the tap is closed, the timer simply stops emitting values to that tap.

A live Supply is obtained from the Supplier factory method Supply. New values are emitted by calling emit on the Supplier object.

my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap(-> $v { say "$v" });
$supplier.emit(42); # Will cause the tap to output "42" 

The live method returns True on live supplies. Factory methods such as interval, from-list will return on demand supplies.

A live Supply that keeps values until tapped the first time can be created with Supplier::Preserving.

Further examples can be found in the concurrency page.

Methods that return Taps

method tap

method tap(Supply:D: &emit = -> $ { },
        :&done,
        :&quit,
    --> Tap:D)

Creates a new tap (a kind of subscription if you will), in addition to all existing taps. The first positional argument is a piece of code that will be called when a new value becomes available through the emit call.

The &done callback is called when the done method on the supply is called, indicating the end of life of the channel. For a live supply the done routine will be called on the parent Supplier.

The &quit callback is called when the quit method on the supply is called, indicating an erroneous termination of the supply. For a live supply the quit routine will be called on the parent Supplier

Method tap returns an object of type Tap, on which you can call the close method to cancel the subscription.

my $s = Supply.from-list(0 .. 5);
my $t = $s.tap(-> $v { say $v }done => { say "no more ticks" });

Produces:

0
1
2
3
4
5
no more ticks

method act

method act(Supply:D: &act --> Tap:D)

Creates a tap on the given supply with the given code. Differently from tap, the given code is guaranteed to be only executed by one thread at a time.

Utility methods

method Capture

Defined as:

method Capture(Supply:D --> Capture:D)

Equivalent to calling .List.Capture on the invocant.

method Channel

method Channel(Supply:D: --> Channel:D)

Returns a Channel object that will receive all future values from the supply, and will be closed when the Supply is done, and quit (shut down with error) when the supply is quit.

method Promise

method Promise(Supply:D: --> Promise:D)

Returns a Promise that will be kept when the Supply is done. If the Supply also emits any values, then the Promise will be kept with the final value. Otherwise, it will be kept with Nil. If the Supply ends with a quit instead of a done, then the Promise will be broken with that exception.

my $supplier = Supplier.new;
my $s = $supplier.Supply;
my $p = $s.Promise;
$p.then(-> $v { say "got $v.result()" });
$supplier.emit('cha');         # not output yet 
$supplier.done();              # got cha 

The Promise method is most useful when dealing with supplies that will tend to produce just one value, when only the final value is of interest, or when only completion (successful or not) is relevant.

method live

method live(Supply:D: --> Bool:D)

Returns True if the supply is "live", that is, values are emitted to taps as soon as they arrive. Always returns True in the default Supply (but for example on the supply returned from Supply.from-list it's False).

say Supplier.new.Supply.live;    # OUTPUT: «True␤» 

method schedule-on

method schedule-on(Supply:D: Scheduler $scheduler)

Runs the emit, done and quit callbacks on the specified scheduler.

This is useful for GUI toolkits that require certain actions to be run from the GUI thread.

Methods that wait until the supply is done

method wait

method wait(Supply:D:)

Taps the Supply it is called on, and blocks execution until the either the supply is done (in which case it evaluates to the final value that was emitted on the Supply, or Nil if not value was emitted) or quit (in which case it will throw the exception that was passed to quit).

my $s = Supplier.new;
start {
  sleep 1;
  say "One second: running.";
  sleep 1;
  $s.emit(42);
  $s.done;
}
$s.Supply.wait;
say "Two seconds: done";

method list

method list(Supply:D: --> List:D)

Taps the Supply it is called on, and returns a lazy list that will be reified as the Supply emits values. The list will be terminated once the Supply is done. If the Supply quits, then an exception will be thrown once that point in the lazy list is reached.

method grab

method grab(Supply:D: &when-done --> Supply:D)

Taps the Supply it is called on. When it is done, calls &when-done and then emits the list of values that it returns on the result Supply. If the original Supply quits, then the exception is immediately conveyed on the return Supply.

my $s = Supply.from-list(41032);
my $t = $s.grab(&sum);
$t.tap(&say);           # OUTPUT: «19␤» 

method reverse

method reverse(Supply:D: --> Supply:D)

Taps the Supply it is called on. Once that Supply emits done, all of the values it emitted will be emitted on the returned Supply in reverse order. If the original Supply quits, then the exception is immediately conveyed on the return Supply.

my $s = Supply.from-list(123);
my $t = $s.reverse;
$t.tap(&say);           # OUTPUT: «3␤2␤1␤» 

method sort

method sort(Supply:D: &custom-routine-to-use? --> Supply:D)

Taps the Supply it is called on. Once that Supply emits done, all of the values that it emitted will be sorted, and the results emitted on the returned Supply in the sorted order. Optionally accepts a comparator Block. If the original Supply quits, then the exception is immediately conveyed on the return Supply.

my $s = Supply.from-list(41032);
my $t = $s.sort();
$t.tap(&say);           # OUTPUT: «2␤3␤4␤10␤» 

Methods that return another Supply

method from-list

method from-list(Supply:U: +@values --> Supply:D)

Creates an on-demand supply from the values passed to this method.

my $s = Supply.from-list(123);
$s.tap(&say);           # OUTPUT: «1␤2␤3␤» 

method share

method share(Supply:D: --> Supply:D)

Creates a live supply from an on-demand supply, thus making it possible to share the values of the on-demand supply on multiple taps, instead of each tap seeing its own copy of all values from the on-demand supply.

# this says in turn: "first 1" "first 2" "second 2" "first 3" "second 3" 
my $s = Supply.interval(1).share;
$s.tap: { "first $_".say };
sleep 1.1;
$s.tap: { "second $_".say };
sleep 2

method flat

method flat(Supply:D: --> Supply:D)

Creates a supply on which all of the values seen in the given supply are flattened before being emitted again.

method do

method do(Supply:D: &do --> Supply:D)

Creates a supply to which all values seen in the given supply, are emitted again. The given code, executed for its side-effects only, is guaranteed to be only executed by one thread at a time.

method on-close

method on-close(Supply:D: &on-close --> Supply:D)

Returns a new Supply which will run &on-close whenever a Tap of that Supply is closed. This includes if further operations are chained on to the Supply. (for example, $supply.on-close(&on-close).map(*.uc)). When using a react or supply block, using the CLOSE phaser is usually a better choice.

my $s = Supplier.new;
my $tap = $s.Supply.on-close({ say "Tap closed" }).tap(
    -> $v { say "the value is $v" },
    done    => { say "Supply is done" },
    quit    => -> $ex { say "Supply finished with error $ex" },
);
 
$s.emit('Perl 6');
$tap.close;        # OUTPUT: «Tap closed␤» 

method interval

method interval(Supply:U: $interval$delay = 0:$scheduler = $*SCHEDULER --> Supply:D)

Creates a supply that emits a value every $interval seconds, starting $delay seconds from the call. The emitted value is an integer, starting from 0, and is incremented by one for each value emitted.

Implementations may treat too-small values as lowest resolution they support, possibly warning in such situations; e.g. treating 0.0001 as 0.001.

method grep

method grep(Supply:D: Mu $test --> Supply:D)

Creates a new supply that only emits those values from the original supply that smartmatch against $test.

my $supplier = Supplier.new;
my $all      = $supplier.Supply;
my $ints     = $all.grep(Int);
$ints.tap(&say);
$supplier.emit($_for 1'a string'3.14159;   # prints only 1 

method map

method map(Supply:D: &mapper --> Supply:D)

Returns a new supply that maps each value of the given supply through &mapper and emits it to the new supply.

my $supplier = Supplier.new;
my $all      = $supplier.Supply;
my $double   = $all.map(-> $value { $value * 2 });
$double.tap(&say);
$supplier.emit(4);           # RESULT: «8» 

method batch

method batch(Supply:D: :$elems:$seconds --> Supply:D)

Creates a new supply that batches the values of the given supply by either the number of elements in the batch (using :elems) or the maximum number of seconds (using the :seconds) or both. Any remaining values are emitted in a final batch when the supply is done.

method elems

method elems(Supply:D: $seconds? --> Supply:D)

Creates a new supply in which changes to the number of values seen are emitted. It optionally also takes an interval (in seconds) if you only want to be updated every so many seconds.

method head

method head(Supply:D: Int(Cool$number = 1 --> Supply:D)

Creates a "head" supply with the same semantics as List.head.

my $s = Supply.from-list(41032);
my $hs = $s.head(2);
$hs.tap(&say);           # OUTPUT: «4␤10␤» 

method tail

method tail(Supply:D: Int(Cool$number = 1 --> Supply:D)

Creates a "tail" supply with the same semantics as List.tail.

my $s = Supply.from-list(41032);
my $ts = $s.tail(2);
$ts.tap(&say);           # OUTPUT: «3␤2␤» 

method rotor

method rotor(Supply:D: @cycle --> Supply:D)

Creates a "rotoring" supply with the same semantics as List.rotor.

method delayed

method delayed(Supply:D: $seconds:$scheduler = $*SCHEDULER --> Supply:D)

Creates a new supply in which all values flowing through the given supply are emitted, but with the given delay in seconds.

method throttle

method throttle(Supply:D:
  $limit,                 # values / time or simultaneous processing 
  $seconds or $callable,  # time-unit / code to process simultaneously 
  $delay = 0,             # initial delay before starting, in seconds 
  :$control,              # supply to emit control messages on (optional) 
  :$status,               # supply to tap status messages from (optional) 
  :$bleed,                # supply to bleed messages to (optional) 
  :$vent-at,              # bleed when so many buffered (optional) 
  :$scheduler,            # scheduler to use, default $*SCHEDULER 
  --> Supply:D)

Produces a Supply from a given Supply, but makes sure the number of messages passed through, is limited.

It has two modes of operation: per time-unit or by maximum number of execution of a block of code: this is determined by the second positional parameter.

The first positional parameter specifies the limit that should be applied.

If the second positional parameter is a Callable, then the limit indicates the maximum number of parallel processes executing the Callable, which is given the value that was received. The emitted values in this case will be the Promises that were obtained from starting the Callable.

If the second positional parameter is a numeric value, it is interpreted as the time-unit (in seconds). If you specify .1 as the value, then it makes sure you don't exceed the limit for every tenth of a second.

If the limit is exceeded, then incoming messages are buffered until there is room to pass on / execute the Callable again.

The third positional parameter is optional: it indicates the number of seconds the throttle will wait before passing on any values.

The :control named parameter optionally specifies a Supply that you can use to control the throttle while it is in operation. Messages that can be sent, are strings in the form of "key:value". Please see below for the types of messages that you can send to control the throttle.

The :status named parameter optionally specifies a Supply that will receive any status messages. If specified, it will at least send one status message after the original Supply is exhausted. See status message below.

The :bleed named parameter optionally specifies a Supply that will receive any values that were either explicitly bled (with the bleed control message), or automatically bled (if there's a vent-at active).

The :vent-at named parameter indicates the number of values that may be buffered before any additional value will be routed to the :bleed Supply. Defaults to 0 if not specified (causing no automatic bleeding to happen). Only makes sense if a :bleed Supply has also been specified.

The :scheduler named parameter indicates the scheduler to be used. Defaults to $*SCHEDULER.

control messages

These messages can be sent to the :control Supply. A control message consists of a string of the form "key: value", e.g. "limit: 4".

Change the number of messages (as initially given in the first positional) to the value given.

Route the given number of buffered messages to the :bleed Supply.

Change the maximum number of buffered values before automatic bleeding takes place. If the value is lower than before, will cause immediate rerouting of buffered values to match the new maximum.

Send a status message to the :status Supply with the given id.

status message

The status return message is a hash with the following keys:

The current number of messages / callables that is still allowed to be passed / executed.

The number of messages routed to the :bleed Supply.

The number of messages currently buffered because of overflow.

The number of messages emitted (passed through).

The id of this status message (a monotonically increasing number). Handy if you want to log status messages.

The current limit that is being applied.

The maximum number of messages that may be buffered before they're automatically re-routed to the :bleed Supply.

Examples

Have a simple piece of code announce when it starts running asynchronously, wait a random amount of time, then announce when it is done. Do this 6 times, but don't let more than 3 of them run simultaneously.

my $s = Supply.from-list(^6);  # set up supply 
my $t = $s.throttle: 3,        # only allow 3 at a time 
{                              # code block to run 
    say "running $_";          # announce we've started 
    sleep rand;                # wait some random time 
    say "done $_"              # announce we're done 
}                              # don't need ; because } at end of line 
$t.wait;                       # wait for the supply to be done 

and the result of one run will be:

running 0
running 1
running 2
done 2
running 3
done 1
running 4
done 4
running 5
done 0
done 3
done 5

method stable

method stable(Supply:D: $time:$scheduler = $*SCHEDULER --> Supply:D)

Creates a new supply that only passes on a value flowing through the given supply if it wasn't superseded by another value in the given $time (in seconds). Optionally uses another scheduler than the default scheduler, using the :scheduler parameter.

To clarify the above, if, during the timeout $time, additional values are emitted to the Supplier all but the last one will be thrown away. Each time an additional value is emitted to the Supplier, during the timeout, $time is reset.

This method can be quite useful when handling UI input, where it is not desired to perform an operation until the user has stopped typing for a while rather than on every keystroke.

my $supplier = Supplier.new;
my $supply1 = $supplier.Supply;
$supply1.tap(-> $v { say "Supply1 got: $v" });
$supplier.emit(42);
 
my Supply $supply2 = $supply1.stable(5);
$supply2.tap(-> $v { say "Supply2 got: $v" });
sleep(3);
$supplier.emit(43);  # will not be seen by $supply2 but will reset $time 
$supplier.emit(44);
sleep(10);
# OUTPUT: «Supply1 got: 42␤Supply1 got: 43␤Supply1 got: 44␤Supply2 got: 44␤» 

As can be seen above, $supply1 received all values emitted to the Supplier while $supply2 only received one value. The 43 was thrown away because it was followed by another 'last' value 44 which was retained and sent to $supply2 after approximately eight seconds, this due to the fact that the timeout $time was reset after three seconds.

method reduce

method reduce(Supply:D: &with --> Supply:D)

Creates a "reducing" supply with the same semantics as List.reduce.

my $supply = Supply.from-list(1..5).reduce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «15␤» 

method produce

method produce(Supply:D: &with --> Supply:D)

Creates a "producing" supply with the same semantics as List.produce.

my $supply = Supply.from-list(1..5).produce({$^a + $^b});
$supply.tap(-> $v { say "$v" }); # OUTPUT: «1␤3␤6␤10␤15␤» 

method lines

method lines(Supply:D: :$chomp = True --> Supply:D)

Creates a supply that will emit the characters coming in line by line from a supply that's usually created by some asynchronous I/O operation. The optional :chomp parameter indicates whether to remove line separators: the default is True.

method words

method words(Supply:D: --> Supply:D)

Creates a supply that will emit the characters coming in word for word from a supply that's usually created by some asynchronous I/O operation.

my $s = Supply.from-list("Hello Word!".comb);
my $ws = $s.words;
$ws.tap(&say);           # OUTPUT: «Hello␤Word!␤» 

method unique

method unique(Supply:D: :$as:$with:$expires --> Supply:D)

Creates a supply that only provides unique values, as defined by the optional :as and :with parameters (same as with List.unique). The optional :expires parameter how long to wait (in seconds) before "resetting" and not considering a value to have been seen, even if it's the same as an old value.

method squish

method squish(Supply:D: :$as:$with --> Supply:D)

Creates a supply that only provides unique values, as defined by the optional :as and :with parameters (same as with List.squish).

method max

method max(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

Creates a supply that only emits values from the given supply if they are larger than any value seen before. In other words, from a continuously ascending supply it will emit all the values. From a continuously descending supply it will only emit the first value. The optional parameter specifies the comparator, just as with Any.max.

method min

method min(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

Creates a supply that only emits values from the given supply if they are smaller than any value seen before. In other words, from a continuously descending supply it will emit all the values. From a continuously ascending supply it will only emit the first value. The optional parameter specifies the comparator, just as with Any.min.

method minmax

method minmax(Supply:D: &custom-routine-to-use = &infix:<cmp> --> Supply:D)

Creates a supply that emits a Range every time a new minimum or maximum values is seen from the given supply. The optional parameter specifies the comparator, just as with Any.minmax.

method skip

method skip(Supply:D: Int(Cool$number = 1 --> Supply:D)

Returns a new Supply which will emit all values from the given Supply except for the first $number values, which will be thrown away.

my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply = $supply.skip(3);
$supply.tap({ say $_ });
$supplier.emit($_for 1..10# OUTPUT: «4␤5␤6␤7␤8␤9␤10␤» 

method start

method start(Supply:D: &startee --> Supply:D)

Creates a supply of supplies. For each value in the original supply, the code object is scheduled on another thread, and returns a supply either of a single value (if the code succeeds), or one that quits without a value (if the code fails).

This is useful for asynchronously starting work that you don't block on.

Use migrate to join the values into a single supply again.

method migrate

method migrate(Supply:D: --> Supply:D)

Takes a Supply which itself has values that are of type Supply as input. Each time the outer Supply emits a new Supply, this will be tapped and its values emitted. Any previously tapped Supply will be closed. This is useful for migrating between different data sources, and only paying attention to the latest one.

For example, imagine an application where the user can switch between different stocks. When they switch to a new one, a connection is established to a web socket to get the latest values, and any previous connection should be closed. Each stream of values coming over the web socket would be represented as a Supply, which themselves are emitted into a Supply of latest data sources to watch. The migrate method could be used to flatten this supply of supplies into a single Supply of the current values that the user cares about.

Here is a simple simulation of such a program:

my Supplier $stock-sources .= new;
 
sub watch-stock($symbol{
    $stock-sources.emit: supply {
        say "Starting to watch $symbol";
        whenever Supply.interval(1{
            emit "$symbol: 111." ~ 99.rand.Int;
        }
        CLOSE say "Lost interest in $symbol";
    }
}
 
$stock-sources.Supply.migrate.tap: *.say;
 
watch-stock('GOOG');
sleep 3;
watch-stock('AAPL');
sleep 3;

Which produces output like:

Starting to watch GOOG
GOOG: 111.67
GOOG: 111.20
GOOG: 111.37
Lost interest in GOOG
Starting to watch AAPL
AAPL: 111.55
AAPL: 111.6
AAPL: 111.6

Methods that combine supplies

method merge

method merge(Supply @*supplies --> Supply:D)

Creates a supply to which any value seen from the given supplies, is emitted. The resulting supply is done Only when all given supplies are done. Can also be called as a class method.

method zip

method zip(Supply @*supplies:&with = &[,] --> Supply:D)

Creates a supply that emits combined values as soon as there is a new value seen on all of the supplies. By default, Lists are created, but this can be changed by specifying your own combiner with the :with parameter. The resulting supply is done as soon as any of the given supplies are done. Can also be called as a class method.

method zip-latest

method zip-latest(Supply @*supplies:&with = &[,], :$initial --> Supply:D)

Creates a supply that emits combined values as soon as there is a new value seen on any of the supplies. By default, Lists are created, but this can be changed by specifying your own combiner with the :with parameter. The optional :initial parameter can be used to indicate the initial state of the combined values. By default, all supplies have to have at least one value emitted on them before the first combined values is emitted on the resulting supply. The resulting supply is done as soon as any of the given supplies are done. Can also be called as a class method.

I/O features exposed as supplies

sub signal

sub signal(*@signals:$scheduler = $*SCHEDULER)

Creates a supply for the Signal enums (such as SIGINT) specified, and an optional :scheduler parameter. Any signals received, will be emitted on the supply. For example:

signal(SIGINT).tap{ say "Thank you for your attention"exit 0 } );

would catch Control-C, thank you, and then exit.

To go from a signal number to a Signal, you can do something like this:

signal(Signal(2)).tap-> $sig { say "Received signal: $sig" } );

The list of supported signals can be found by checking Signal::.keys (as you would any enum). For more details on how enums work see enum.

Note: Rakudo versions up to 2018.05 had a bug due to which numeric values of signals were incorrect on some systems. For example, Signal(10) was returning SIGBUS even if it was actually SIGUSR1 on a particular system. That being said, using signal(SIGUSR1) was working as expected on all Rakudo versions except 2018.04, 2018.04.1 and 2018.05, where the intended behavior can be achieved by using signal(SIGBUS) instead. These issues are resolved in Rakudo releases after 2018.05.

method IO::Notification.watch-path

method watch-path($path --> Supply:D)

Creates a supply to which the OS will emit values to indicate changes on the filesystem for the given path. Also has a shortcut with the watch method on an IO object, like this:

IO::Notification.watch-path(".").act{ say "$^file changed" } );
".".IO.watch.act(                     { say "$^file changed" } );   # same 

Type Graph

Type relations for Supply
perl6-type-graph Supply Supply Any Any Supply->Any Mu Mu Any->Mu

Expand above chart