This GitLab instance reached the end of its service life. It won't be possible to create new users or projects.

Please read the deprecation notice for more information concerning the deprecation timeline

Visit migration.git.tu-berlin.de (internal network only) to import your old projects to the new GitLab platform 📥

ActionsStringToLongRelabeling.scala 1.21 KB
Newer Older
1 2 3 4
package de.bbisping.coupledsim.flink

import org.apache.flink.api.scala._
import org.apache.flink.graph.scala.Graph
5
import org.apache.flink.api.scala.utils.DataSetUtils
6 7 8 9 10 11 12 13 14 15 16
import org.apache.flink.api.scala.DataSet
import org.apache.flink.types.NullValue

class ActionsStringToLongRelabeling(ts: Graph[Int, NullValue, String]) {
  
  def compute(env: ExecutionEnvironment, tauStr: String, tau: Long): (Graph[Int, NullValue, Long], DataSet[(Long, String)]) = {
    val stringLabeledEdges = ts.getEdgesAsTuple3()
    
    val actions: DataSet[String] = stringLabeledEdges.map(_._3).distinct()
    val preliminaryActionIds: DataSet[(Long, String)] = DataSetUtils(actions).zipWithIndex//.zipWithUniqueId
    val actionIds: DataSet[(Long, String)] =
17
      preliminaryActionIds map ({ case (id, name) =>
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
        if (name == tauStr) {
          (tau, name)
        } else {
          (id + tau + 10, name)
        }
    }: ((Long, String)) => (Long, String))
    
    val relabeledEdges: DataSet[(Int, Int, Long)] =
      (stringLabeledEdges join actionIds)
      .where(2).equalTo(1) { (edge, actionId) =>
        (edge._1, edge._2, actionId._1)
    }
    
    (Graph.fromTupleDataSet(ts.getVerticesAsTuple2(), relabeledEdges, env), actionIds)
  }
  
}