This GitLab instance reached the end of its service life. It won't be possible to create new users or projects.

Please read the deprecation notice for more information concerning the deprecation timeline

Visit migration.git.tu-berlin.de (internal network only) to import your old projects to the new GitLab platform 📥

TransitiveClosure.scala 1.05 KB
Newer Older
1 2
package de.bbisping.coupledsim.flink

3
import org.apache.flink.api.common.typeinfo.TypeInformation
4 5
import org.apache.flink.api.scala.DataSet
import org.apache.flink.util.Collector
6

7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
import scala.reflect.ClassTag


class TransitiveClosure[A] {
  
  def compute(steps: DataSet[(A, A)])
    (implicit evidence: TypeInformation[(A, A)], classTag: ClassTag[(A, A)])
  : DataSet[(A, A)] = {
    
    val closedSteps = steps.iterateDelta(steps, CoupledSimulationFlink.MAX_ITERATIONS, Array(0,1)) { (closedStepsPartial, deltaSteps) =>
      val newSteps: DataSet[(A, A)] = (deltaSteps join steps)
        .where(1).equalTo(0) { (step1: (A,A), step2: (A,A)) =>
          (step1._1, step2._2)
      }
      
      val reallyNewSteps: DataSet[(A, A)] = (newSteps coGroup closedStepsPartial)
        .where(0,1).equalTo(0,1).apply(fun = { (st1: Iterator[(A,A)], st2: Iterator[(A,A)], out: Collector[(A,A)]) => 
          if (st2.isEmpty) {
            for (st <- st1) out.collect(st)
          }
      })
      
      (reallyNewSteps, reallyNewSteps)
    }
    
    closedSteps
  }
  
}