scala.collection.parallel

package parallel

Package object for parallel collections.

Deprecated Value Members

object ThreadPoolTasks

Type Members

trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks

trait AdaptiveWorkStealingTasks extends Tasks

This trait implements scheduling by employing an adaptive work stealing technique.

trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks

implicit class CollectionsHaveToParArray[C, T] extends AnyRef

Adds toParArray method to collection classes.

trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel

The base trait for all combiners. A combiner incremental collection construction just like a regular builder, but also implements an efficient merge operation of two builders via combine method. Once the collection is constructed, it may be obtained by invoking the result method.

The complexity of the combine method should be less than linear for best performance. The result method doesn’t have to be a constant time operation, but may be performed in parallel.

trait CombinerFactory[U, Repr] extends AnyRef

final case class CompositeThrowable(throwables: Set[Throwable]) extends Exception with Product with Serializable

Composite throwable - thrown when multiple exceptions are thrown at the same time.

class ExecutionContextTaskSupport extends TaskSupport with ExecutionContextTasks

A task support that uses an execution context to schedule tasks.

It can be used with the default execution context implementation in the scala.concurrent package. It internally forwards the call to either a forkjoin based task support or a thread pool executor one, depending on what the execution context uses.

By default, parallel collections are parametrized with this task support object, so parallel collections share the same execution context backend as the rest of the scala.concurrent package.

trait ExecutionContextTasks extends Tasks

This tasks implementation uses execution contexts to spawn a parallel computation.

As an optimization, it internally checks whether the execution context is the standard implementation based on fork/join pools, and if it is, creates a ForkJoinTaskSupport that shares the same pool to forward its request to it.

Otherwise, it uses an execution context exclusive Tasks implementation to divide the tasks into smaller chunks and execute operations on it.

trait FactoryOps[From, Elem, To] extends AnyRef

class ForkJoinTaskSupport extends TaskSupport with AdaptiveWorkStealingForkJoinTasks

A task support that uses a fork join pool to schedule tasks.

trait ForkJoinTasks extends Tasks with HavingForkJoinPool

An implementation trait for parallel tasks based on the fork/join framework.

trait HavingForkJoinPool extends AnyRef

A trait describing objects that provide a fork/join pool.

trait IterableSplitter[+T] extends AugmentedIterableIterator[T] with Splitter[T] with Signalling with DelegatedSignalling

Parallel iterators allow splitting and provide a remaining method to obtain the number of elements remaining in the iterator.

trait ParIterable[+T] extends GenIterable[T] with GenericParTemplate[T, ParIterable] with ParIterableLike[T, ParIterable[T], scala.Iterable[T]]

A template trait for parallel iterable collections.

This is a base trait for Scala parallel collections. It defines behaviour common to all parallel collections. Concrete parallel collections should inherit this trait and ParIterable if they want to define specific combiner factories.

Parallel operations are implemented with divide and conquer style algorithms that parallelize well. The basic idea is to split the collection into smaller parts until they are small enough to be operated on sequentially.

All of the parallel operations are implemented as tasks within this trait. Tasks rely on the concept of splitters, which extend iterators. Every parallel collection defines:

def splitter: IterableSplitter[T]

which returns an instance of IterableSplitter[T] , which is a subtype of Splitter[T] . Splitters have a method remaining to check the remaining number of elements, and method split which is defined by splitters. Method split divides the splitters iterate over into disjunct subsets:

def split: Seq[Splitter]

which splits the splitter into a sequence of disjunct subsplitters. This is typically a very fast operation which simply creates wrappers around the receiver collection. This can be repeated recursively.

Tasks are scheduled for execution through a scala.collection.parallel.TaskSupport object, which can be changed through the tasksupport setter of the collection.

Method newCombiner produces a new combiner. Combiners are an extension of builders. They provide a method combine which combines two combiners and returns a combiner containing elements of both combiners. This method can be implemented by aggressively copying all the elements into the new combiner or by lazily binding their results. It is recommended to avoid copying all of the elements for performance reasons, although that cost might be negligible depending on the use case. Standard parallel collection combiners avoid copying when merging results, relying either on a two-step lazy construction or specific data-structure properties.

Methods:

def seq: Sequential
def par: Repr

produce the sequential or parallel implementation of the collection, respectively. Method par just returns a reference to this parallel collection. Method seq is efficient - it will not copy the elements. Instead, it will create a sequential version of the collection using the same underlying data structure. Note that this is not the case for sequential collections in general - they may copy the elements and produce a different underlying data structure.

The combination of methods toMap , toSeq or toSet along with par and seq is a flexible way to change between different collection types.

Since this trait extends the GenIterable trait, methods like size must also be implemented in concrete collections, while iterator forwards to splitter by default.

Each parallel collection is bound to a specific fork/join pool, on which dormant worker threads are kept. The fork/join pool contains other information such as the parallelism level, that is, the number of processors used. When a collection is created, it is assigned the default fork/join pool found in the scala.parallel package object.

Parallel collections are not necessarily ordered in terms of the foreach operation (see Traversable ). Parallel sequences have a well defined order for iterators - creating an iterator and traversing the elements linearly will always yield the same order. However, bulk operations such as foreach , map or filter always occur in undefined orders for all parallel collections.

Existing parallel collection implementations provide strict parallel iterators. Strict parallel iterators are aware of the number of elements they have yet to traverse. It’s also possible to provide non-strict parallel iterators, which do not know the number of elements remaining. To do this, the new collection implementation must override isStrictSplitterCollection to false . This will make some operations unavailable.

To create a new parallel collection, extend the ParIterable trait, and implement size , splitter , newCombiner and seq . Having an implicit combiner factory requires extending this trait in addition, as well as providing a companion object, as with regular collections.

Method size is implemented as a constant time operation for parallel collections, and parallel collection operations rely on this assumption.

The higher-order functions passed to certain operations may contain side-effects. Since implementations of bulk operations may not be sequential, this means that side-effects may not be predictable and may produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer to either avoid using side-effects or to use some form of synchronization when accessing mutable data.

trait ParIterableLike[+T, +Repr <: ParIterable[T], +Sequential <: scala.Iterable[T] with IterableLike[T, Sequential]] extends GenIterableLike[T, Repr] with CustomParallelizable[T, Repr] with Parallel with HasNewCombiner[T, Repr]

A template trait for parallel collections of type ParIterable[T] .

This is a base trait for Scala parallel collections. It defines behaviour common to all parallel collections. Concrete parallel collections should inherit this trait and ParIterable if they want to define specific combiner factories.

Parallel operations are implemented with divide and conquer style algorithms that parallelize well. The basic idea is to split the collection into smaller parts until they are small enough to be operated on sequentially.

All of the parallel operations are implemented as tasks within this trait. Tasks rely on the concept of splitters, which extend iterators. Every parallel collection defines:

def splitter: IterableSplitter[T]

which returns an instance of IterableSplitter[T] , which is a subtype of Splitter[T] . Splitters have a method remaining to check the remaining number of elements, and method split which is defined by splitters. Method split divides the splitters iterate over into disjunct subsets:

def split: Seq[Splitter]

which splits the splitter into a sequence of disjunct subsplitters. This is typically a very fast operation which simply creates wrappers around the receiver collection. This can be repeated recursively.

Tasks are scheduled for execution through a scala.collection.parallel.TaskSupport object, which can be changed through the tasksupport setter of the collection.

Method newCombiner produces a new combiner. Combiners are an extension of builders. They provide a method combine which combines two combiners and returns a combiner containing elements of both combiners. This method can be implemented by aggressively copying all the elements into the new combiner or by lazily binding their results. It is recommended to avoid copying all of the elements for performance reasons, although that cost might be negligible depending on the use case. Standard parallel collection combiners avoid copying when merging results, relying either on a two-step lazy construction or specific data-structure properties.

Methods:

def seq: Sequential
def par: Repr

produce the sequential or parallel implementation of the collection, respectively. Method par just returns a reference to this parallel collection. Method seq is efficient - it will not copy the elements. Instead, it will create a sequential version of the collection using the same underlying data structure. Note that this is not the case for sequential collections in general - they may copy the elements and produce a different underlying data structure.

The combination of methods toMap , toSeq or toSet along with par and seq is a flexible way to change between different collection types.

Since this trait extends the GenIterable trait, methods like size must also be implemented in concrete collections, while iterator forwards to splitter by default.

Each parallel collection is bound to a specific fork/join pool, on which dormant worker threads are kept. The fork/join pool contains other information such as the parallelism level, that is, the number of processors used. When a collection is created, it is assigned the default fork/join pool found in the scala.parallel package object.

Parallel collections are not necessarily ordered in terms of the foreach operation (see Traversable ). Parallel sequences have a well defined order for iterators - creating an iterator and traversing the elements linearly will always yield the same order. However, bulk operations such as foreach , map or filter always occur in undefined orders for all parallel collections.

Existing parallel collection implementations provide strict parallel iterators. Strict parallel iterators are aware of the number of elements they have yet to traverse. It’s also possible to provide non-strict parallel iterators, which do not know the number of elements remaining. To do this, the new collection implementation must override isStrictSplitterCollection to false . This will make some operations unavailable.

To create a new parallel collection, extend the ParIterable trait, and implement size , splitter , newCombiner and seq . Having an implicit combiner factory requires extending this trait in addition, as well as providing a companion object, as with regular collections.

Method size is implemented as a constant time operation for parallel collections, and parallel collection operations rely on this assumption.

The higher-order functions passed to certain operations may contain side-effects. Since implementations of bulk operations may not be sequential, this means that side-effects may not be predictable and may produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer to either avoid using side-effects or to use some form of synchronization when accessing mutable data.

trait ParMap[K, +V] extends GenMap[K, V] with GenericParMapTemplate[K, V, ParMap] with ParIterable[(K, V)] with ParMapLike[K, V, ParMap[K, V], Map[K, V]]

A template trait for parallel maps.

The higher-order functions passed to certain operations may contain side-effects. Since implementations of bulk operations may not be sequential, this means that side-effects may not be predictable and may produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer to either avoid using side-effects or to use some form of synchronization when accessing mutable data.

trait ParMapLike[K, +V, +Repr <: ParMapLike[K, V, Repr, Sequential] with ParMap[K, V], +Sequential <: Map[K, V] with MapLike[K, V, Sequential]] extends GenMapLike[K, V, Repr] with ParIterableLike[(K, V), Repr, Sequential]

A template trait for mutable parallel maps. This trait is to be mixed in with concrete parallel maps to override the representation type.

The higher-order functions passed to certain operations may contain side-effects. Since implementations of bulk operations may not be sequential, this means that side-effects may not be predictable and may produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer to either avoid using side-effects or to use some form of synchronization when accessing mutable data.

trait ParSeq[+T] extends GenSeq[T] with ParIterable[T] with GenericParTemplate[T, ParSeq] with ParSeqLike[T, ParSeq[T], scala.Seq[T]]

A template trait for parallel sequences.

Parallel sequences inherit the Seq trait. Their indexing and length computations are defined to be efficient. Like their sequential counterparts they always have a defined order of elements. This means they will produce resulting parallel sequences in the same way sequential sequences do. However, the order in which they perform bulk operations on elements to produce results is not defined and is generally nondeterministic. If the higher-order functions given to them produce no sideeffects, then this won’t be noticeable.

This trait defines a new, more general split operation and reimplements the split operation of ParallelIterable trait using the new split operation.

The higher-order functions passed to certain operations may contain side-effects. Since implementations of bulk operations may not be sequential, this means that side-effects may not be predictable and may produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer to either avoid using side-effects or to use some form of synchronization when accessing mutable data.

trait ParSeqLike[+T, +Repr <: ParSeq[T], +Sequential <: scala.Seq[T] with SeqLike[T, Sequential]] extends GenSeqLike[T, Repr] with ParIterableLike[T, Repr, Sequential]

A template trait for sequences of type ParSeq[T] , representing parallel sequences with element type T .

Parallel sequences inherit the Seq trait. Their indexing and length computations are defined to be efficient. Like their sequential counterparts they always have a defined order of elements. This means they will produce resulting parallel sequences in the same way sequential sequences do. However, the order in which they perform bulk operations on elements to produce results is not defined and is generally nondeterministic. If the higher-order functions given to them produce no sideeffects, then this won’t be noticeable.

This trait defines a new, more general split operation and reimplements the split operation of ParallelIterable trait using the new split operation.

  • T
    • the type of the elements contained in this collection
  • Repr
    • the type of the actual collection containing the elements
  • Sequential
    • the type of the sequential version of this parallel collection
  • Self Type
    • ParSeqLike [T, Repr, Sequential]
  • Source

trait ParSet[T] extends GenSet[T] with GenericParTemplate[T, ParSet] with ParIterable[T] with ParSetLike[T, ParSet[T], Set[T]]

A template trait for parallel sets.

The higher-order functions passed to certain operations may contain side-effects. Since implementations of bulk operations may not be sequential, this means that side-effects may not be predictable and may produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer to either avoid using side-effects or to use some form of synchronization when accessing mutable data.

trait ParSetLike[T, +Repr <: ParSetLike[T, Repr, Sequential] with ParSet[T], +Sequential <: Set[T] with SetLike[T, Sequential]] extends GenSetLike[T, Repr] with ParIterableLike[T, Repr, Sequential]

A template trait for parallel sets. This trait is mixed in with concrete parallel sets to override the representation type.

The higher-order functions passed to certain operations may contain side-effects. Since implementations of bulk operations may not be sequential, this means that side-effects may not be predictable and may produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer to either avoid using side-effects or to use some form of synchronization when accessing mutable data.

trait PreciseSplitter[+T] extends Splitter[T]

A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters that traverse disjoint subsets of arbitrary sizes.

Implementors might want to override the parameterless split method for efficiency.

trait SeqSplitter[+T] extends IterableSplitter[T] with AugmentedSeqIterator[T] with PreciseSplitter[T]

Parallel sequence iterators allow splitting into arbitrary subsets.

trait Splitter[+T] extends Iterator[T]

A splitter (or a split iterator) can be split into more splitters that traverse over disjoint subsets of elements.

trait Task[R, +Tp] extends AnyRef

trait TaskSupport extends Tasks

A trait implementing the scheduling of a parallel collection operation.

Parallel collections are modular in the way operations are scheduled. Each parallel collection is parametrized with a task support object which is responsible for scheduling and load-balancing tasks to processors.

A task support object can be changed in a parallel collection after it has been created, but only during a quiescent period, i.e. while there are no concurrent invocations to parallel collection methods.

There are currently a few task support implementations available for parallel collections. The scala.collection.parallel.ForkJoinTaskSupport uses a fork-join pool internally.

The scala.collection.parallel.ExecutionContextTaskSupport uses the default execution context implementation found in scala.concurrent, and it reuses the thread pool used in scala.concurrent.

The execution context task support is set to each parallel collection by default, so parallel collections reuse the same fork-join pool as the future API.

Here is a way to change the task support of a parallel collection:

import scala.collection.parallel._
val pc = mutable.ParArray(1, 2, 3)
pc.tasksupport = new ForkJoinTaskSupport(
  new java.util.concurrent.ForkJoinPool(2))

trait Tasks extends AnyRef

A trait that declares task execution capabilities used by parallel collections.

class ThreadPoolTaskSupport extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks

A task support that uses a thread pool executor to schedule tasks.

trait ThreadPoolTasks extends Tasks

An implementation of tasks objects based on the Java thread pooling API.

trait ThrowableOps extends AnyRef

trait TraversableOps[T] extends AnyRef

Value Members

object ForkJoinTasks

object FutureThreadPoolTasks

object ParIterable extends ParFactory[ParIterable]

This object provides a set of operations to create ParIterable values.

object ParMap extends ParMapFactory[ParMap]

object ParSeq extends ParFactory[ParSeq]

object ParSet extends ParSetFactory[ParSet]

object Splitter

package immutable

package mutable

Value Members From scala.collection.parallel

val CHECK_RATE: Int

(defined at scala.collection.parallel)

val defaultTaskSupport: TaskSupport

(defined at scala.collection.parallel)

def setTaskSupport[Coll](c: Coll, t: TaskSupport): Coll

(defined at scala.collection.parallel)

def thresholdFromSize(sz: Int, parallelismLevel: Int): Int

Computes threshold from the size of the collection and the parallelism level. (defined at scala.collection.parallel)

Full Source:

/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala
package collection

import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
import scala.collection.mutable.UnrolledBuffer
import scala.annotation.unchecked.uncheckedVariance
import scala.language.implicitConversions

/** Package object for parallel collections.
 */
package object parallel {
  /* constants */
  val MIN_FOR_COPY = 512
  val CHECK_RATE = 512
  val SQRT2 = math.sqrt(2)
  val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors

  /* functions */

  /** Computes threshold from the size of the collection and the parallelism level.
   */
  def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
    val p = parallelismLevel
    if (p > 1) 1 + sz / (8 * p)
    else sz
  }

  val defaultTaskSupport: TaskSupport = new ExecutionContextTaskSupport

  def setTaskSupport[Coll](c: Coll, t: TaskSupport): Coll = {
    c match {
      case pc: ParIterableLike[_, _, _] => pc.tasksupport = t
      case _ => // do nothing
    }
    c
  }

  /** Adds toParArray method to collection classes. */
  implicit class CollectionsHaveToParArray[C, T](c: C)(implicit asGto: C => scala.collection.GenTraversableOnce[T]) {
    def toParArray = {
      val t = asGto(c)
      if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]]
      else {
        val it = t.toIterator
        val cb = mutable.ParArrayCombiner[T]()
        while (it.hasNext) cb += it.next
        cb.result
      }
    }
  }
}


package parallel {
  /** Implicit conversions used in the implementation of parallel collections. */
  private[collection] object ParallelCollectionImplicits {
    implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new FactoryOps[From, Elem, To] {
      def isParallel = bf.isInstanceOf[Parallel]
      def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
      def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new Otherwise[R] {
        def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
      }
    }
    implicit def traversable2ops[T](t: scala.collection.GenTraversableOnce[T]) = new TraversableOps[T] {
      def isParallel = t.isInstanceOf[Parallel]
      def isParIterable = t.isInstanceOf[ParIterable[_]]
      def asParIterable = t.asInstanceOf[ParIterable[T]]
      def isParSeq = t.isInstanceOf[ParSeq[_]]
      def asParSeq = t.asInstanceOf[ParSeq[T]]
      def ifParSeq[R](isbody: ParSeq[T] => R) = new Otherwise[R] {
        def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody
      }
    }
    implicit def throwable2ops(self: Throwable) = new ThrowableOps {
      def alongWith(that: Throwable) = (self, that) match {
        case (self: CompositeThrowable, that: CompositeThrowable) => new CompositeThrowable(self.throwables ++ that.throwables)
        case (self: CompositeThrowable, _) => new CompositeThrowable(self.throwables + that)
        case (_, that: CompositeThrowable) => new CompositeThrowable(that.throwables + self)
        case _ => new CompositeThrowable(Set(self, that))
      }
    }
  }

  trait FactoryOps[From, Elem, To] {
    trait Otherwise[R] {
      def otherwise(notbody: => R): R
    }

    def isParallel: Boolean
    def asParallel: CanCombineFrom[From, Elem, To]
    def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R): Otherwise[R]
  }

  trait TraversableOps[T] {
    trait Otherwise[R] {
      def otherwise(notbody: => R): R
    }

    def isParallel: Boolean
    def isParIterable: Boolean
    def asParIterable: ParIterable[T]
    def isParSeq: Boolean
    def asParSeq: ParSeq[T]
    def ifParSeq[R](isbody: ParSeq[T] => R): Otherwise[R]
  }

  @deprecated("This trait will be removed.", "2.11.0")
  trait ThrowableOps {
    @deprecated("This method will be removed.", "2.11.0")
    def alongWith(that: Throwable): Throwable
  }

  /* classes */

  trait CombinerFactory[U, Repr] {
    /** Provides a combiner used to construct a collection. */
    def apply(): Combiner[U, Repr]
    /** The call to the `apply` method can create a new combiner each time.
     *  If it does, this method returns `false`.
     *  The same combiner factory may be used each time (typically, this is
     *  the case for concurrent collections, which are thread safe).
     *  If so, the method returns `true`.
     */
    def doesShareCombiners: Boolean
  }

  /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */
  @deprecated("This class will be removed.", "2.11.0")
  final case class CompositeThrowable(throwables: Set[Throwable]) extends Exception(
    "Multiple exceptions thrown during a parallel computation: " +
      throwables.map(t => t + "\n" + t.getStackTrace.take(10).++("...").mkString("\n")).mkString("\n\n")
  )


  /** A helper iterator for iterating very small array buffers.
   *  Automatically forwards the signal delegate when splitting.
   */
  private[parallel] class BufferSplitter[T]
  (private val buffer: scala.collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, _sigdel: scala.collection.generic.Signalling)
  extends IterableSplitter[T] {
    signalDelegate = _sigdel
    def hasNext = index < until
    def next = {
      val r = buffer(index)
      index += 1
      r
    }
    def remaining = until - index
    def dup = new BufferSplitter(buffer, index, until, signalDelegate)
    def split: Seq[IterableSplitter[T]] = if (remaining > 1) {
      val divsz = (until - index) / 2
      Seq(
        new BufferSplitter(buffer, index, index + divsz, signalDelegate),
        new BufferSplitter(buffer, index + divsz, until, signalDelegate)
      )
    } else Seq(this)
    private[parallel] override def debugInformation = {
      buildString {
        append =>
        append("---------------")
        append("Buffer iterator")
        append("buffer: " + buffer)
        append("index: " + index)
        append("until: " + until)
        append("---------------")
      }
    }
  }

  /** A helper combiner which contains an array of buckets. Buckets themselves
   *  are unrolled linked lists. Some parallel collections are constructed by
   *  sorting their result set according to some criteria.
   *
   *  A reference `buckets` to buckets is maintained. Total size of all buckets
   *  is kept in `sz` and maintained whenever 2 bucket combiners are combined.
   *
   *  Clients decide how to maintain these by implementing `+=` and `result`.
   *  Populating and using the buckets is up to the client. While populating them,
   *  the client should update `sz` accordingly. Note that a bucket is by default
   *  set to `null` to save space - the client should initialize it.
   *  Note that in general the type of the elements contained in the buckets `Buck`
   *  doesn't have to correspond to combiner element type `Elem`.
   *
   *  This class simply gives an efficient `combine` for free - it chains
   *  the buckets together. Since the `combine` contract states that the receiver (`this`)
   *  becomes invalidated, `combine` reuses the receiver and returns it.
   *
   *  Methods `beforeCombine` and `afterCombine` are called before and after
   *  combining the buckets, respectively, given that the argument to `combine`
   *  is not `this` (as required by the `combine` contract).
   *  They can be overridden in subclasses to provide custom behaviour by modifying
   *  the receiver (which will be the return value).
   */
  private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
  (private val bucketnumber: Int)
  extends Combiner[Elem, To] {
  //self: EnvironmentPassingCombiner[Elem, To] =>
    protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber)
    protected var sz: Int = 0

    def size = sz

    def clear() = {
      buckets = new Array[UnrolledBuffer[Buck]](bucketnumber)
      sz = 0
    }

    def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}

    def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}

    def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = {
      if (this eq other) this
      else other match {
        case _: BucketCombiner[_, _, _, _] =>
          beforeCombine(other)
          val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]

          var i = 0
          while (i < bucketnumber) {
            if (buckets(i) eq null)
              buckets(i) = that.buckets(i)
            else if (that.buckets(i) ne null)
              buckets(i) concat that.buckets(i)

            i += 1
          }
          sz = sz + that.size
          afterCombine(other)
          this
        case _ =>
          sys.error("Unexpected combiner type.")
      }
    }
  }
}