scala.collection.parallel.ParSeqLike#Zip

class Zip[U >: T, S, That] extends Transformer[Combiner[(U, S), That], Zip[U, S, That]]

Type Members

type Result = Combiner[(U, S), That]

  • Definition Classes
    • Task

Instance Constructors From scala.collection.parallel.ParSeqLike.Zip

new Zip(len: Int, cf: CombinerFactory[(U, S), That], pit: SeqSplitter[T], otherpit: SeqSplitter[S])

(defined at scala.collection.parallel.ParSeqLike.Zip)

Value Members From scala.collection.parallel.ParSeqLike.Zip

def leaf(prev: Option[Result]): Unit

Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous completed task or None if there was no previous task (or the previous task is uncompleted or unknown).

  • Definition Classes
    • Zip → Task

(defined at scala.collection.parallel.ParSeqLike.Zip)

def merge(that: Zip[U, S, That]): Unit

Read of results of that task and merge them into results of this one.

  • Definition Classes
    • Zip → Task

(defined at scala.collection.parallel.ParSeqLike.Zip)

def newSubtask(p: SuperParIterator): Nothing

  • Attributes
    • protected[this]
  • Definition Classes
    • Zip → Accessor

(defined at scala.collection.parallel.ParSeqLike.Zip)

val otherpit: SeqSplitter[S]

(defined at scala.collection.parallel.ParSeqLike.Zip)

val pit: SeqSplitter[T]

  • Attributes
    • protected[this]
  • Definition Classes
    • Zip → Accessor → Accessor

(defined at scala.collection.parallel.ParSeqLike.Zip)

var result: Result

A result that can be accessed once the task is completed.

  • Definition Classes
    • Zip → Task

(defined at scala.collection.parallel.ParSeqLike.Zip)

def split: Seq[Zip[U, S, That]]

Splits this task into a list of smaller tasks.

  • Definition Classes
    • Zip → Accessor → Task

(defined at scala.collection.parallel.ParSeqLike.Zip)

Value Members From scala.collection.parallel.Task

def repr: Zip[U, S, That]

  • Definition Classes
    • Task

(defined at scala.collection.parallel.Task)


Value Members From Implicit scala.collection.parallel.CollectionsHaveToParArray ——————————————————————————–

def toParArray: ParArray[T]

  • Implicit information
    • This member is added by an implicit conversion from Zip [U, S, That] to CollectionsHaveToParArray [Zip [U, S, That], T] performed by method CollectionsHaveToParArray in scala.collection.parallel. This conversion will take place only if an implicit value of type (Zip [U, S, That]) ⇒ GenTraversableOnce [T] is in scope.
  • Definition Classes
    • CollectionsHaveToParArray (added by implicit convertion: scala.collection.parallel.CollectionsHaveToParArray)

Full Source:

/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala
package collection.parallel

import scala.collection.{ SeqLike, GenSeq, GenIterable, Iterator }
import scala.collection.generic.DefaultSignalling
import scala.collection.generic.AtomicIndexFlag
import scala.collection.generic.CanBuildFrom
import scala.collection.generic.VolatileAbort

import scala.collection.parallel.ParallelCollectionImplicits._

/** A template trait for sequences of type `ParSeq[T]`, representing
 *  parallel sequences with element type `T`.
 *
 *  $parallelseqinfo
 *
 *  @tparam T           the type of the elements contained in this collection
 *  @tparam Repr        the type of the actual collection containing the elements
 *  @tparam Sequential  the type of the sequential version of this parallel collection
 *
 *  @define parallelseqinfo
 *  Parallel sequences inherit the `Seq` trait. Their indexing and length computations
 *  are defined to be efficient. Like their sequential counterparts
 *  they always have a defined order of elements. This means they will produce resulting
 *  parallel sequences in the same way sequential sequences do. However, the order
 *  in which they perform bulk operations on elements to produce results is not defined and is generally
 *  nondeterministic. If the higher-order functions given to them produce no sideeffects,
 *  then this won't be noticeable.
 *
 *  This trait defines a new, more general `split` operation and reimplements the `split`
 *  operation of `ParallelIterable` trait using the new `split` operation.
 *
 *  @author Aleksandar Prokopec
 *  @since 2.9
 */
trait ParSeqLike[+T, +Repr <: ParSeq[T], +Sequential <: Seq[T] with SeqLike[T, Sequential]]
extends scala.collection.GenSeqLike[T, Repr]
   with ParIterableLike[T, Repr, Sequential] {
self =>

  protected[this] type SuperParIterator = IterableSplitter[T]

  /** A more refined version of the iterator found in the `ParallelIterable` trait,
   *  this iterator can be split into arbitrary subsets of iterators.
   *
   *  @return       an iterator that can be split into subsets of precise size
   */
  protected[parallel] def splitter: SeqSplitter[T]

  override def iterator: PreciseSplitter[T] = splitter

  override def size = length

  /** Used to iterate elements using indices */
  protected abstract class Elements(start: Int, val end: Int) extends SeqSplitter[T] with BufferedIterator[T] {
    private var i = start

    def hasNext = i < end

    def next(): T = if (i < end) {
      val x = self(i)
      i += 1
      x
    } else Iterator.empty.next()

    def head = self(i)

    final def remaining = end - i

    def dup = new Elements(i, end) {}

    def split = psplit(remaining / 2, remaining - remaining / 2)

    def psplit(sizes: Int*) = {
      val incr = sizes.scanLeft(0)(_ + _)
      for ((from, until) <- incr.init zip incr.tail) yield {
        new Elements(start + from, (start + until) min end) {}
      }
    }

    override def toString = "Elements(" + start + ", " + end + ")"
  }

  /* ParallelSeq methods */

  /** Returns the length of the longest segment of elements starting at
   *  a given position satisfying some predicate.
   *
   *  $indexsignalling
   *
   *  The index flag is initially set to maximum integer value.
   *
   *  @param p     the predicate used to test the elements
   *  @param from  the starting offset for the search
   *  @return      the length of the longest segment of elements starting at `from` and
   *               satisfying the predicate
   */
  def segmentLength(p: T => Boolean, from: Int): Int = if (from >= length) 0 else {
    val realfrom = if (from < 0) 0 else from
    val ctx = new DefaultSignalling with AtomicIndexFlag
    ctx.setIndexFlag(Int.MaxValue)
    tasksupport.executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1
  }

  /** Finds the first element satisfying some predicate.
   *
   *  $indexsignalling
   *
   *  The index flag is initially set to maximum integer value.
   *
   *  @param p     the predicate used to test the elements
   *  @param from  the starting offset for the search
   *  @return      the index `>= from` of the first element of this $coll that satisfies the predicate `p`,
   *               or `-1`, if none exists
   */
  def indexWhere(p: T => Boolean, from: Int): Int = if (from >= length) -1 else {
    val realfrom = if (from < 0) 0 else from
    val ctx = new DefaultSignalling with AtomicIndexFlag
    ctx.setIndexFlag(Int.MaxValue)
    tasksupport.executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))
  }

  /** Finds the last element satisfying some predicate.
   *
   *  $indexsignalling
   *
   *  The index flag is initially set to minimum integer value.
   *
   *  @param p     the predicate used to test the elements
   *  @param end   the maximum offset for the search
   *  @return      the index `<= end` of the first element of this $coll that satisfies the predicate `p`,
   *               or `-1`, if none exists
   */
  def lastIndexWhere(p: T => Boolean, end: Int): Int = if (end < 0) -1 else {
    val until = if (end >= length) length else end + 1
    val ctx = new DefaultSignalling with AtomicIndexFlag
    ctx.setIndexFlag(Int.MinValue)
    tasksupport.executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx))
  }

  def reverse: Repr = {
    tasksupport.executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.resultWithTaskSupport })
  }

  def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
    tasksupport.executeAndWaitResult(
      new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.resultWithTaskSupport }
    )
  } else setTaskSupport(seq.reverseMap(f)(bf2seq(bf)), tasksupport)
  /*bf ifParallel { pbf =>
    tasksupport.executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result })
  } otherwise seq.reverseMap(f)(bf2seq(bf))*/

  /** Tests whether this $coll contains the given sequence at a given index.
   *
   *  $abortsignalling
   *
   *  @tparam S      the element type of `that` parallel sequence
   *  @param that    the parallel sequence this sequence is being searched for
   *  @param offset  the starting offset for the search
   *  @return        `true` if there is a sequence `that` starting at `offset` in this sequence, `false` otherwise
   */
  def startsWith[S](that: GenSeq[S], offset: Int): Boolean = that ifParSeq { pthat =>
    if (offset < 0 || offset >= length) offset == length && pthat.length == 0
    else if (pthat.length == 0) true
    else if (pthat.length > length - offset) false
    else {
      val ctx = new DefaultSignalling with VolatileAbort
      tasksupport.executeAndWaitResult(
        new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter)
      )
    }
  } otherwise seq.startsWith(that, offset)

  override def sameElements[U >: T](that: GenIterable[U]): Boolean = that ifParSeq { pthat =>
    val ctx = new DefaultSignalling with VolatileAbort
    length == pthat.length && tasksupport.executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter))
  } otherwise seq.sameElements(that)

  /** Tests whether this $coll ends with the given parallel sequence.
   *
   *  $abortsignalling
   *
   *  @tparam S       the type of the elements of `that` sequence
   *  @param that     the sequence to test
   *  @return         `true` if this $coll has `that` as a suffix, `false` otherwise
   */
  def endsWith[S](that: GenSeq[S]): Boolean = that ifParSeq { pthat =>
    if (that.length == 0) true
    else if (that.length > length) false
    else {
      val ctx = new DefaultSignalling with VolatileAbort
      val tlen = that.length
      tasksupport.executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter))
    }
  } otherwise seq.endsWith(that)

  def patch[U >: T, That](from: Int, patch: GenSeq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
    val realreplaced = replaced min (length - from)
    if (patch.isParSeq && bf(repr).isCombiner && (size - realreplaced + patch.size) > MIN_FOR_COPY) {
      val that = patch.asParSeq
      val pits = splitter.psplitWithSignalling(from, replaced, length - from - realreplaced)
      val cfactory = combinerFactory(() => bf(repr).asCombiner)
      val copystart = new Copy[U, That](cfactory, pits(0))
      val copymiddle = wrap {
        val tsk = new that.Copy[U, That](cfactory, that.splitter)
        tasksupport.executeAndWaitResult(tsk)
      }
      val copyend = new Copy[U, That](cfactory, pits(2))
      tasksupport.executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
        _.resultWithTaskSupport
      })
    } else patch_sequential(from, patch.seq, replaced)
  }

  private def patch_sequential[U >: T, That](fromarg: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
    val from = 0 max fromarg
    val b = bf(repr)
    val repl = (r min (length - from)) max 0
    val pits = splitter.psplitWithSignalling(from, repl, length - from - repl)
    b ++= pits(0)
    b ++= patch
    b ++= pits(2)
    setTaskSupport(b.result(), tasksupport)
  }

  def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf(repr).isCombiner) {
    tasksupport.executeAndWaitResult(
      new Updated(index, elem, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult {
        _.resultWithTaskSupport
      }
    )
  } else setTaskSupport(seq.updated(index, elem)(bf2seq(bf)), tasksupport)
  /*bf ifParallel { pbf =>
    tasksupport.executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result })
  } otherwise seq.updated(index, elem)(bf2seq(bf))*/

  def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
    patch(0, mutable.ParArray(elem), 0)
  }

  def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
    patch(length, mutable.ParArray(elem), 0)
  }

  def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (length < len) {
    patch(length, new immutable.Repetition(elem, len - length), 0)
  } else patch(length, Nil, 0)

  override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) {
    val thatseq = that.asParSeq
    tasksupport.executeAndWaitResult(
      new Zip(length min thatseq.length, combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult {
        _.resultWithTaskSupport
      }
    )
  } else super.zip(that)(bf)

  /** Tests whether every element of this $coll relates to the
   *  corresponding element of another parallel sequence by satisfying a test predicate.
   *
   *  $abortsignalling
   *
   *  @param   that    the other parallel sequence
   *  @param   p       the test predicate, which relates elements from both sequences
   *  @tparam  S       the type of the elements of `that`
   *  @return          `true` if both parallel sequences have the same length and
   *                   `p(x, y)` is `true` for all corresponding elements `x` of this $coll
   *                   and `y` of `that`, otherwise `false`
   */
  def corresponds[S](that: GenSeq[S])(p: (T, S) => Boolean): Boolean = that ifParSeq { pthat =>
    val ctx = new DefaultSignalling with VolatileAbort
    length == pthat.length && tasksupport.executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter))
  } otherwise seq.corresponds(that)(p)

  def diff[U >: T](that: GenSeq[U]): Repr = sequentially {
    _ diff that
  }

  /** Computes the multiset intersection between this $coll and another sequence.
   *
   *  @param that   the sequence of elements to intersect with.
   *  @tparam U     the element type of `that` parallel sequence
   *  @return       a new collection of type `That` which contains all elements of this $coll
   *                which also appear in `that`.
   *                If an element value `x` appears
   *                ''n'' times in `that`, then the first ''n'' occurrences of `x` will be retained
   *                in the result, but any following occurrences will be omitted.
   *
   *  @usecase def intersect(that: Seq[T]): $Coll[T]
   *    @inheritdoc
   *
   *    $mayNotTerminateInf
   *
   *    @return       a new $coll which contains all elements of this $coll
   *                  which also appear in `that`.
   *                  If an element value `x` appears
   *                  ''n'' times in `that`, then the first ''n'' occurrences of `x` will be retained
   *                  in the result, but any following occurrences will be omitted.
   */
  def intersect[U >: T](that: GenSeq[U]) = sequentially {
    _ intersect that
  }

  /** Builds a new $coll from this $coll without any duplicate elements.
   *  $willNotTerminateInf
   *
   *  @return  A new $coll which contains the first occurrence of every element of this $coll.
   */
  def distinct: Repr = sequentially {
    _.distinct
  }

  override def toString = seq.mkString(stringPrefix + "(", ", ", ")")

  override def toSeq = this.asInstanceOf[ParSeq[T]]

  @deprecated("use .seq.view", "2.11.0")
  override def view = seq.view

  /* tasks */

  protected[this] def down(p: IterableSplitter[_]) = p.asInstanceOf[SeqSplitter[T]]

  protected trait Accessor[R, Tp] extends super.Accessor[R, Tp] {
    protected[this] val pit: SeqSplitter[T]
  }

  protected trait Transformer[R, Tp] extends Accessor[R, Tp] with super.Transformer[R, Tp]

  protected[this] class SegmentLength(pred: T => Boolean, from: Int, protected[this] val pit: SeqSplitter[T])
  extends Accessor[(Int, Boolean), SegmentLength] {
    @volatile var result: (Int, Boolean) = null
    def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) {
      val itsize = pit.remaining
      val seglen = pit.prefixLength(pred)
      result = (seglen, itsize == seglen)
      if (!result._2) pit.setIndexFlagIfLesser(from)
    } else result = (0, false)
    protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
    override def split = {
      val pits = pit.splitWithSignalling
      for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p)
    }
    override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2)
    override def requiresStrictSplitters = true
  }

  protected[this] class IndexWhere(pred: T => Boolean, from: Int, protected[this] val pit: SeqSplitter[T])
  extends Accessor[Int, IndexWhere] {
    @volatile var result: Int = -1
    def leaf(prev: Option[Int]) = if (from < pit.indexFlag) {
      val r = pit.indexWhere(pred)
      if (r != -1) {
        result = from + r
        pit.setIndexFlagIfLesser(from)
      }
    }
    protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
    override def split = {
      val pits = pit.splitWithSignalling
      for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p)
    }
    override def merge(that: IndexWhere) = result = if (result == -1) that.result else {
      if (that.result != -1) result min that.result else result
    }
    override def requiresStrictSplitters = true
  }

  protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, protected[this] val pit: SeqSplitter[T])
  extends Accessor[Int, LastIndexWhere] {
    @volatile var result: Int = -1
    def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) {
      val r = pit.lastIndexWhere(pred)
      if (r != -1) {
        result = pos + r
        pit.setIndexFlagIfGreater(pos)
      }
    }
    protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
    override def split = {
      val pits = pit.splitWithSignalling
      for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p)
    }
    override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else {
      if (that.result != -1) result max that.result else result
    }
    override def requiresStrictSplitters = true
  }

  protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], protected[this] val pit: SeqSplitter[T])
  extends Transformer[Combiner[U, This], Reverse[U, This]] {
    @volatile var result: Combiner[U, This] = null
    def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf()))
    protected[this] def newSubtask(p: SuperParIterator) = new Reverse(cbf, down(p))
    override def merge(that: Reverse[U, This]) = result = that.result combine result
  }

  protected[this] class ReverseMap[S, That](f: T => S, pbf: () => Combiner[S, That], protected[this] val pit: SeqSplitter[T])
  extends Transformer[Combiner[S, That], ReverseMap[S, That]] {
    @volatile var result: Combiner[S, That] = null
    def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf())
    protected[this] def newSubtask(p: SuperParIterator) = new ReverseMap(f, pbf, down(p))
    override def merge(that: ReverseMap[S, That]) = result = that.result combine result
  }

  protected[this] class SameElements[U >: T](protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[U])
  extends Accessor[Boolean, SameElements[U]] {
    @volatile var result: Boolean = true
    def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
      result = pit.sameElements(otherpit)
      if (!result) pit.abort()
    }
    protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
    override def split = {
      val fp = pit.remaining / 2
      val sp = pit.remaining - fp
      for ((p, op) <- pit.psplitWithSignalling(fp, sp) zip otherpit.psplitWithSignalling(fp, sp)) yield new SameElements(p, op)
    }
    override def merge(that: SameElements[U]) = result = result && that.result
    override def requiresStrictSplitters = true
  }

  protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CombinerFactory[U, That], protected[this] val pit: SeqSplitter[T])
  extends Transformer[Combiner[U, That], Updated[U, That]] {
    @volatile var result: Combiner[U, That] = null
    def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf())
    protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
    override def split = {
      val pits = pit.splitWithSignalling
      for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p)
    }
    override def merge(that: Updated[U, That]) = result = result combine that.result
    override def requiresStrictSplitters = true
  }

  protected[this] class Zip[U >: T, S, That](len: Int, cf: CombinerFactory[(U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S])
  extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
    @volatile var result: Result = null
    def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, cf())
    protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
    override def split = {
      val fp = len / 2
      val sp = len - len / 2
      val pits = pit.psplitWithSignalling(fp, sp)
      val opits = otherpit.psplitWithSignalling(fp, sp)
      Seq(
        new Zip(fp, cf, pits(0), opits(0)),
        new Zip(sp, cf, pits(1), opits(1))
      )
    }
    override def merge(that: Zip[U, S, That]) = result = result combine that.result
  }

  protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S])
  extends Accessor[Boolean, Corresponds[S]] {
    @volatile var result: Boolean = true
    def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
      result = pit.corresponds(corr)(otherpit)
      if (!result) pit.abort()
    }
    protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
    override def split = {
      val fp = pit.remaining / 2
      val sp = pit.remaining - fp
      for ((p, op) <- pit.psplitWithSignalling(fp, sp) zip otherpit.psplitWithSignalling(fp, sp)) yield new Corresponds(corr, p, op)
    }
    override def merge(that: Corresponds[S]) = result = result && that.result
    override def requiresStrictSplitters = true
  }
}