Atomic updates
#| A collection of non-negative integers, with atomic operations.
class BucketStore {
has $.elems is required;
has @!buckets = ^1024 .pick xx $!elems;
has $lock = Lock.new;
#| Returns an array with the contents of all buckets.
method buckets {
$lock.protect: { [@!buckets] }
}
#| Transfers $amount from bucket at index $from, to bucket at index $to.
method transfer ($amount, :$from!, :$to!) {
return if $from == $to;
$lock.protect: {
my $clamped = $amount min @!buckets[$from];
@!buckets[$from] -= $clamped;
@!buckets[$to] += $clamped;
}
}
}
# Create bucket store
my $bucket-store = BucketStore.new: elems => 8;
my $initial-sum = $bucket-store.buckets.sum;
# Start a thread to equalize buckets
Thread.start: {
loop {
my @buckets = $bucket-store.buckets;
# Pick 2 buckets, so that $to has not more than $from
my ($to, $from) = @buckets.keys.pick(2).sort({ @buckets[$_] });
# Transfer half of the difference, rounded down
$bucket-store.transfer: ([-] @buckets[$from, $to]) div 2, :$from, :$to;
}
}
# Start a thread to distribute values among buckets
Thread.start: {
loop {
my @buckets = $bucket-store.buckets;
# Pick 2 buckets
my ($to, $from) = @buckets.keys.pick(2);
# Transfer a random portion
$bucket-store.transfer: ^@buckets[$from] .pick, :$from, :$to;
}
}
# Loop to display buckets
loop {
sleep 1;
my @buckets = $bucket-store.buckets;
my $sum = @buckets.sum;
say "{@buckets.fmt: '%4d'}, total $sum";
if $sum != $initial-sum {
note "ERROR: Total changed from $initial-sum to $sum";
exit 1;
}
}
Output:
23 52 831 195 1407 809 813 20, total 4150
1172 83 336 306 751 468 615 419, total 4150
734 103 1086 88 313 136 1252 438, total 4150
512 323 544 165 200 3 2155 248, total 4150
...
Last updated