scala.collection.parallel.mutable.UnrolledParArrayCombiner#CopyUnrolledToArray

class CopyUnrolledToArray extends Task[Unit, CopyUnrolledToArray]

Type Members

type Result = Unit

  • Definition Classes
    • Task

Value Members From scala.collection.parallel.Task

def repr: CopyUnrolledToArray

  • Definition Classes
    • Task

(defined at scala.collection.parallel.Task)


Instance Constructors From scala.collection.parallel.mutable.UnrolledParArrayCombiner.CopyUnrolledToArray ——————————————————————————–

new CopyUnrolledToArray(array: Array[Any], offset: Int, howmany: Int)

(defined at scala.collection.parallel.mutable.UnrolledParArrayCombiner.CopyUnrolledToArray)


Value Members From scala.collection.parallel.mutable.UnrolledParArrayCombiner.CopyUnrolledToArray ——————————————————————————–

def leaf(prev: Option[Unit]): Unit

Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous completed task or None if there was no previous task (or the previous task is uncompleted or unknown).

  • Definition Classes
    • CopyUnrolledToArray → Task

(defined at scala.collection.parallel.mutable.UnrolledParArrayCombiner.CopyUnrolledToArray)

def split: immutable.List[CopyUnrolledToArray]

Splits this task into a list of smaller tasks.

  • Definition Classes
    • CopyUnrolledToArray → Task

(defined at scala.collection.parallel.mutable.UnrolledParArrayCombiner.CopyUnrolledToArray)


Value Members From Implicit scala.collection.parallel.CollectionsHaveToParArray ——————————————————————————–

def toParArray: ParArray[T]

  • Implicit information
    • This member is added by an implicit conversion from CopyUnrolledToArray to CollectionsHaveToParArray [CopyUnrolledToArray, T] performed by method CollectionsHaveToParArray in scala.collection.parallel. This conversion will take place only if an implicit value of type (CopyUnrolledToArray) ⇒ GenTraversableOnce [T] is in scope.
  • Definition Classes
    • CollectionsHaveToParArray (added by implicit convertion: scala.collection.parallel.CollectionsHaveToParArray)

Full Source:

/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala
package collection.parallel.mutable

import scala.collection.mutable.ArraySeq
import scala.collection.mutable.UnrolledBuffer
import scala.collection.mutable.UnrolledBuffer.Unrolled
import scala.collection.parallel.Combiner
import scala.collection.parallel.Task
import scala.reflect.ClassTag

// Todo -- revisit whether inheritance is the best way to achieve this functionality
private[mutable] class DoublingUnrolledBuffer[T](implicit t: ClassTag[T]) extends UnrolledBuffer[T]()(t) {
  override def calcNextLength(sz: Int) = if (sz < 10000) sz * 2 else sz
  protected override def newUnrolled = new Unrolled[T](0, new Array[T](4), null, this)
}


/** An array combiner that uses doubling unrolled buffers to store elements. */
trait UnrolledParArrayCombiner[T]
extends Combiner[T, ParArray[T]] {
//self: EnvironmentPassingCombiner[T, ParArray[T]] =>
  // because size is doubling, random access is O(logn)!
  val buff = new DoublingUnrolledBuffer[Any]

  def +=(elem: T) = {
    buff += elem
    this
  }

  def result = {
    val arrayseq = new ArraySeq[T](size)
    val array = arrayseq.array.asInstanceOf[Array[Any]]

    combinerTaskSupport.executeAndWaitResult(new CopyUnrolledToArray(array, 0, size))

    new ParArray(arrayseq)
  }

  def clear() {
    buff.clear()
  }

  override def sizeHint(sz: Int) = {
    buff.lastPtr.next = new Unrolled(0, new Array[Any](sz), null, buff)
    buff.lastPtr = buff.lastPtr.next
  }

  def combine[N <: T, NewTo >: ParArray[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = other match {
    case that if that eq this => this // just return this
    case that: UnrolledParArrayCombiner[t] =>
      buff concat that.buff
      this
    case _ => throw new UnsupportedOperationException("Cannot combine with combiner of different type.")
  }

  def size = buff.size

  /* tasks */

  class CopyUnrolledToArray(array: Array[Any], offset: Int, howmany: Int)
  extends Task[Unit, CopyUnrolledToArray] {
    var result = ()

    def leaf(prev: Option[Unit]) = if (howmany > 0) {
      var totalleft = howmany
      val (startnode, startpos) = findStart(offset)
      var curr = startnode
      var pos = startpos
      var arroffset = offset
      while (totalleft > 0) {
        val lefthere = scala.math.min(totalleft, curr.size - pos)
        Array.copy(curr.array, pos, array, arroffset, lefthere)
        // println("from: " + arroffset + " elems " + lefthere + " - " + pos + ", " + curr + " -> " + array.toList + " by " + this + " !! " + buff.headPtr)
        totalleft -= lefthere
        arroffset += lefthere
        pos = 0
        curr = curr.next
      }
    }
    private def findStart(pos: Int) = {
      var left = pos
      var node = buff.headPtr
      while ((left - node.size) >= 0) {
        left -= node.size
        node = node.next
      }
      (node, left)
    }
    def split = {
      val fp = howmany / 2
      List(new CopyUnrolledToArray(array, offset, fp), new CopyUnrolledToArray(array, offset + fp, howmany - fp))
    }
    def shouldSplitFurther = howmany > scala.collection.parallel.thresholdFromSize(size, combinerTaskSupport.parallelismLevel)
    override def toString = "CopyUnrolledToArray(" + offset + ", " + howmany + ")"
  }
}

object UnrolledParArrayCombiner {
  def apply[T](): UnrolledParArrayCombiner[T] = new UnrolledParArrayCombiner[T] {} // was: with EnvironmentPassingCombiner[T, ParArray[T]]
}