Commit 80d4598e by benkeks

Update public repo with upgraded Flink and Isabelle/HOL code

parent 37bcb2de
......@@ -21,3 +21,4 @@ code/node_modules
stuff
eclipse.sh
talks/talk_2017-may
.idea
......@@ -2,7 +2,7 @@ name := "CoupledSim"
version := "0.1.0"
scalaVersion := "2.11.11"
scalaVersion := "2.11.12"
val scalacOpts = Seq(
"-Xmax-classfile-name", "140",
......@@ -13,7 +13,7 @@ val scalacOpts = Seq(
)
lazy val web = (project in file("web")).settings(
scalaVersion := "2.11.11",
scalaVersion := "2.11.12",
scalaJSProjects := Seq(jsClient),
isDevMode in scalaJSPipeline := false,
pipelineStages in Assets := Seq(scalaJSPipeline),
......@@ -26,24 +26,24 @@ lazy val web = (project in file("web")).settings(
).enablePlugins(SbtWeb)
lazy val shared = (project in file("shared")).settings(
scalaVersion := "2.11.11",
scalaVersion := "2.11.12",
name := "shared",
scalacOptions ++= scalacOpts,
test in assembly := {},
libraryDependencies ++= Seq(
"org.scalaz" %%% "scalaz-core" % "7.2.16"
"org.scalaz" %%% "scalaz-core" % "7.2.26"
)
)
lazy val jsClient = (project in file("js-client")).settings(
scalaVersion := "2.11.11",
scalaVersion := "2.11.12",
name := "coupledsim-client",
parallelExecution in ThisBuild := false,
scalacOptions ++= scalacOpts,
testFrameworks += new TestFramework("utest.runner.Framework"),
resolvers += sbt.Resolver.bintrayRepo("denigma", "denigma-releases"),
libraryDependencies ++= Seq(
"org.scalaz" %%% "scalaz-core" % "7.2.16",
"org.scalaz" %%% "scalaz-core" % "7.2.26",
"com.lihaoyi" %%% "utest" % "0.5.4",
"org.singlespaced" %%% "scalajs-d3" % "0.3.4",
"org.denigma" %%% "codemirror-facade" % "5.13.2-0.8",
......@@ -62,22 +62,25 @@ lazy val jsClient = (project in file("js-client")).settings(
baseDirectory.value / ".." / "shared" / "src" / "main" / "scala-2.11"
).aggregate(shared).dependsOn(shared).enablePlugins(ScalaJSPlugin, ScalaJSWeb)
val flinkVersion = "1.4.2"
val flinkVersion = "1.6.1"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-table" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-gelly-scala" % "1.4.1")
"org.apache.flink" %% "flink-gelly-scala" % "1.6.1")
lazy val flink = (project in file("flink")).
settings(
scalaVersion := "2.11.11",
scalaVersion := "2.11.12",
resolvers ++= Seq(
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
Resolver.mavenLocal
),
libraryDependencies ++= flinkDependencies ++ Seq("org.scalatest" %% "scalatest" % "3.0.5" % "test"),
libraryDependencies ++= flinkDependencies ++ Seq(
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
//,"org.slf4j" % "slf4j-simple" % "1.7.+"
),
//fork in run := true,
test in assembly := {},
run in Compile := Defaults.runTask(fullClasspath in Compile,
......
......@@ -2,7 +2,7 @@ package de.bbisping.coupledsim.flink
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala.Graph
import org.apache.flink.api.scala.utils.`package`.DataSetUtils
import org.apache.flink.api.scala.utils.DataSetUtils
import org.apache.flink.api.scala.DataSet
import org.apache.flink.types.NullValue
......@@ -14,7 +14,7 @@ class ActionsStringToLongRelabeling(ts: Graph[Int, NullValue, String]) {
val actions: DataSet[String] = stringLabeledEdges.map(_._3).distinct()
val preliminaryActionIds: DataSet[(Long, String)] = DataSetUtils(actions).zipWithIndex//.zipWithUniqueId
val actionIds: DataSet[(Long, String)] =
preliminaryActionIds map ({ case ((id, name)) =>
preliminaryActionIds map ({ case (id, name) =>
if (name == tauStr) {
(tau, name)
} else {
......
......@@ -39,11 +39,15 @@ object CoupledSimulationFlinkBenchmark {
"shared/src/test/assets/vlts/vasy_10_56.csv",
// "shared/src/test/assets/vlts/vasy_18_73.csv" // memory ran out (in discovery)
"shared/src/test/assets/vlts/vasy_25_25.csv"
// "shared/src/test/assets/vlts/vasy_40_60.csv" // weak bisim takes forever (also 15 secs in the optimized [BGR2016] implementation)
// "shared/src/test/assets/vlts/vasy_40_60.csv"
/*
transitive closure on vasy_40_60 takes forever (also 15 secs in the optimized [BGR2016] implementation);
weak bisim result probably wrong due to hash collisions
*/
)
def runSizeMark(cfgPreminimization: String, cfgOverApproximation: String) = {
val samples =
def runSizeMark(cfgPreminimization: String, cfgOverApproximation: String): Unit = {
val samples = // List("shared/src/test/assets/vlts/vasy_40_60.csv")
smallSamples ++
vltsSamplesSmall ++
vltsSamplesMedium
......@@ -81,7 +85,7 @@ object CoupledSimulationFlinkBenchmark {
}
}
def runTimeMark(cfgPreminimization: String, cfgOverApproximation: String) = {
def runTimeMark(cfgPreminimization: String, cfgOverApproximation: String): Unit = {
val samples =
smallSamples ++ // warmup
smallSamples ++
......@@ -95,7 +99,7 @@ object CoupledSimulationFlinkBenchmark {
val begin = System.currentTimeMillis()
val csResult = CoupledSimulationFlink.executeAlgorithm(env,
CoupledSimulationFlink.executeAlgorithm(env,
cfgPath = s,
cfgPreminimization = cfgPreminimization,
cfgOverApproximation = cfgOverApproximation)
......
......@@ -35,7 +35,7 @@ class CoupledSimulationGame {
// only generate attacker nodes where there is a chance of the defender winning
(signatures cross signatures) flatMap new FlatMapFunction[((Int, Signature), (Int, Signature)), (Action, Int, Int)] {
def flatMap(pqSig: ((Int, Signature), (Int, Signature)),
out: Collector[(Action, Int, Int)]) = pqSig match {
out: Collector[(Action, Int, Int)]): Unit = pqSig match {
case ((p, pSig), (q, qSig)) =>
if (pSig.size <= qSig.size && (pSig subsetOf qSig)) {
out.collect((ATTACK, p, q))
......@@ -46,8 +46,6 @@ class CoupledSimulationGame {
possibleAttackerNodes
}
//println(attackerNodes.collect())
val simulationChallenges: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(ts.getEdgesAsTuple3() join possibleAttackerNodes) // ?
.where(0/*src*/).equalTo(1/*p*/) { (edge, an) =>
......@@ -57,24 +55,20 @@ class CoupledSimulationGame {
val defenderSimulationNodes: DataSet[(Action, Int, Int)] =
((simulationChallenges flatMap new FlatMapFunction[((Action, Int, Int), (Action, Int, Int)), (Action, Int, Int)] {
def flatMap(simChallenge: ((Action, Int, Int), (Action, Int, Int)),
out: Collector[(Action, Int, Int)]) = simChallenge match {
case ((_, rhs)) =>
out: Collector[(Action, Int, Int)]): Unit = simChallenge match {
case (_, rhs) =>
out.collect(rhs)
//out.collect((TAU, rhs._2, rhs._3))
}
})
union (possibleAttackerNodes map (an => (TAU, an._2, an._3)))
).distinct()
// Seq(rhs, (TAU, rhs._2, rhs._3))
// }: (((Action, Int, Int), (Action, Int, Int))) => TraversableOnce[(Action, Int, Int)]))
// .distinct()
// only allow "real" (non-stuttering) tau-steps (because otherwise this could be used
// by the defender to go into infinite loops and win) (we assume that tau cycles have been compressed)
val tauSteps = ts.getEdgesAsTuple3() filter new FilterFunction[((Int, Int, Action))] {
def filter(edge: ((Int, Int, Action))) = edge match {
case ((p0, p1, a)) => a == TAU && p0 != p1
val tauSteps: DataSet[(Int, Int, Action)] = ts.getEdgesAsTuple3() filter new FilterFunction[(Int, Int, Action)] {
def filter(edge: (Int, Int, Action)): Boolean = edge match {
case (p0, p1, a) => a == TAU && p0 != p1
}
}
......@@ -89,7 +83,7 @@ class CoupledSimulationGame {
val simulationAnswers: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(defenderSimulationNodes join ts.getEdgesAsTuple3())
.where(2/*q*/,0/*a*/).equalTo(0/*src*/,2/*a*/) (new JoinFunction[(Action, Int, Int), (Int, Int, Action), ((Action, Int, Int), (Action, Int, Int))] {
def join(dn: (Action, Int, Int), edge: (Int, Int, Action)) = {
def join(dn: (Action, Int, Int), edge: (Int, Int, Action)): ((Action, Int, Int), (Action, Int, Int)) = {
(dn, (TAU, dn._2, edge._2))
}
})
......@@ -98,29 +92,20 @@ class CoupledSimulationGame {
val simulationAnswerTauResolves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(defenderSimulationNodes
.filter(new FilterFunction[(Action, Int, Int)] {
def filter(challenge: (Action, Int, Int)) = challenge._1 == TAU})
def filter(challenge: (Action, Int, Int)): Boolean = challenge._1 == TAU})
join attackerNodes) // ??
.where(1,2).equalTo(1,2)/* { dn =>
(dn, (ATTACK, dn._2, dn._3))//TODO: Restrict this to attacker nodes which are in the over-approximation (otherwise this may generate spurious victories for the defender!!)
}*/
.where(1,2).equalTo(1,2)
// every attacker node can be the entry or exit of a coupling challenge
val couplingChallengesEntrysExits: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(possibleAttackerNodes map (an => (an, (COUPLING, an._2, an._3)))) union // ??
(attackerNodes map (an => ((COUPLING, an._3, an._2), an))) // ????
//
// attackerNodes flatMap new FlatMapFunction[(Action, Int, Int), ((Action, Int, Int), (Action, Int, Int))] {
// def flatMap(an: (Action, Int, Int), out: Collector[((Action, Int, Int), (Action, Int, Int))]) = {
// out.collect((an, (COUPLING, an._2, an._3)))
// out.collect(((COUPLING, an._3, an._2), an))// note the reversed order of p and q!!!
// }
// }
// during a coupling challenge, the defender may move with tau steps on the right-hand side.
val couplingMoves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(possibleAttackerNodes join tauSteps)
.where(2/*q*/).equalTo(0/*src*/) (new JoinFunction[(Action, Int, Int), (Int, Int, Action), ((Action, Int, Int), (Action, Int, Int))] {
def join(an: (Action, Int, Int), edge: (Int, Int, Action)) = {
def join(an: (Action, Int, Int), edge: (Int, Int, Action)): ((Action, Int, Int), (Action, Int, Int)) = {
((COUPLING, an._2, an._3), (COUPLING, an._2, edge._2))
}
})
......@@ -132,18 +117,6 @@ class CoupledSimulationGame {
(gameNodes, gameMoves)
}
//
// def genAttack(pqWithSig: ((Int, Set[(Coloring.Color, Coloring.Color)]), (Int, Set[(Coloring.Color, Coloring.Color)]))) = pqWithSig match {
// case ((p, pSig), (q, qSig)) =>
// //if (pSig.size <= qSig.size && (pSig subsetOf qSig)) {
// (ATTACK, p, q)
//// } else {
//// (ATTACK, p, q)
//// }
// }
// (Int, Set[(Coloring.Color, Coloring.Color)]), (Int, Set[(Coloring.Color, Coloring.Color)]))) => Seq[(Action, Int, Int)]
}
......
......@@ -9,8 +9,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.functions.JoinFunction
import scala.reflect.ClassTag
import org.apache.flink.api.common.typeinfo.TypeInformation
/**
......@@ -46,7 +44,7 @@ class CoupledSimulationGameDiscovery {
// only generate attacker nodes where there is a chance of the defender winning
(signatures cross signatures) flatMap new FlatMapFunction[((Int, Signature), (Int, Signature)), (Action, Int, Int)] {
def flatMap(pqSig: ((Int, Signature), (Int, Signature)),
out: Collector[(Action, Int, Int)]) = pqSig match {
out: Collector[(Action, Int, Int)]): Unit = pqSig match {
case ((p, pSig), (q, qSig)) =>
if (pSig.size <= qSig.size && (pSig subsetOf qSig)) {
out.collect((ATTACK, p, q))
......@@ -59,9 +57,9 @@ class CoupledSimulationGameDiscovery {
// only allow "real" (non-stuttering) tau-steps (because otherwise this could be used
// by the defender to go into infinite loops and win) (we assume that tau cycles have been compressed)
val tauSteps = ts.getEdgesAsTuple3() filter new FilterFunction[((Int, Int, Action))] {
def filter(edge: ((Int, Int, Action))) = edge match {
case ((p0, p1, a)) => a == TAU && p0 != p1
val tauSteps = ts.getEdgesAsTuple3() filter new FilterFunction[(Int, Int, Action)] {
def filter(edge: (Int, Int, Action)): Boolean = edge match {
case (p0, p1, a) => a == TAU && p0 != p1
}
}
......@@ -88,14 +86,15 @@ class CoupledSimulationGameDiscovery {
}
// at some point the defender has to decide that this is the right place to perform the visible action
// and yield back to the attacker (that the defender may not postpone the yielding, means that we use delay steps and not weak steps!)
val newSimulationAnswersUnfiltered =
(newDefenderSimulationNodes join ts.getEdgesAsTuple3())
.where(2/*q*/,0/*a*/).equalTo(0/*src*/,2/*a*/) ((dn, edge) => (dn, (ATTACK, dn._2, edge._2))) // TAU
.where(2/*q*/,0/*a*/).equalTo(0/*src*/,2/*a*/) ((dn, edge) => (dn, (ATTACK, dn._2, edge._2)))
val newSimulationAnswers: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(attackerNodes join newSimulationAnswersUnfiltered).where(n => n).equalTo(1)((a, mv) => mv)
// afterwards (or directly on tau challenges) the defender may yield the inititiative back to the attacker
(attackerNodes join newSimulationAnswersUnfiltered).where(n => n).equalTo(1)((_, mv) => mv)
// on tau challenges the defender may yield the inititiative back to the attacker directly
val newSimulationAnswerTauResolves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(newDefenderSimulationNodes
.filter(_._1 == TAU)
......@@ -104,7 +103,7 @@ class CoupledSimulationGameDiscovery {
// every attacker node can be the entry or exit of a coupling challenge
val newCouplingChallengesEntrys: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(newAttackerNodes map (an => (an, (COUPLING, an._2, an._3))))
newAttackerNodes map (an => (an, (COUPLING, an._2, an._3)))
val newDefenderCouplingNodes: DataSet[(Action, Int, Int)] = deltaNodes.filter(n => n._1 == COUPLING)
......@@ -115,7 +114,7 @@ class CoupledSimulationGameDiscovery {
val newCouplingMoves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(newDefenderCouplingNodes join tauSteps)
.where(2/*q*/).equalTo(0/*src*/) (new JoinFunction[(Action, Int, Int), (Int, Int, Action), ((Action, Int, Int), (Action, Int, Int))] {
def join(cn: (Action, Int, Int), edge: (Int, Int, Action)) = {
def join(cn: (Action, Int, Int), edge: (Int, Int, Action)): ((Action, Int, Int), (Action, Int, Int)) = {
(cn, (COUPLING, cn._2, edge._2))
}
})
......@@ -129,7 +128,10 @@ class CoupledSimulationGameDiscovery {
newCouplingMoves
val reallyNewGameMoves = (newGameMoves coGroup discoveredMoves)
.where(0,1).equalTo(0,1).apply(fun = { (mv1: Iterator[((Action, Int, Int), (Action, Int, Int))], mv2: Iterator[((Action, Int, Int), (Action, Int, Int))], out: Collector[((Action, Int, Int), (Action, Int, Int))]) =>
.where(0,1).equalTo(0,1).apply(fun = {
(mv1: Iterator[((Action, Int, Int), (Action, Int, Int))],
mv2: Iterator[((Action, Int, Int), (Action, Int, Int))],
out: Collector[((Action, Int, Int), (Action, Int, Int))]) =>
if (mv2.isEmpty) {
for (nm <- mv1) out.collect(nm)
}
......@@ -138,7 +140,7 @@ class CoupledSimulationGameDiscovery {
(reallyNewGameMoves, reallyNewGameMoves)
}
val gameNodes = attackerNodes //union defenderSimulationNodes
val gameNodes = attackerNodes
(gameNodes, gameMoves)
}
......
package de.bbisping.coupledsim.flink
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.DataSet
import java.lang.{Long => JavaLong}
import de.bbisping.coupledsim.util.Coloring
import org.apache.flink.api.scala.{DataSet, _}
import org.apache.flink.api.scala.utils.DataSetUtils
import org.apache.flink.graph.scala.Graph
import org.apache.flink.types.NullValue
import de.bbisping.coupledsim.util.Coloring
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.functions.JoinFunction
import scala.reflect.ClassTag
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.utils.`package`.DataSetUtils
class SignatureRefinement {
import CoupledSimulationFlink.Action
import SignatureRefinement._
type Signature = Set[(Coloring.Color, Coloring.Color)]
def compute(
ts: Graph[Int, NullValue, CoupledSimulationFlink.Action])
: (DataSet[(Int, Coloring.Color)], DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])]) = {
val initialColoring: DataSet[(Int, Coloring.Color)] = ts.getVertexIds().map((_, 0))
val verts = ts.getVertexIds()
val initialColoring: DataSet[(Int, Color)] = verts.map((_, Long.MinValue))
val coloring = initialColoring.iterateWithTermination(CoupledSimulationFlink.MAX_ITERATIONS) { coloring =>
val oldSigs: DataSet[(Int, Int)] = coloring.map(_._2).distinct.map(s => (1,1))
val oldSigCount: DataSet[(Int, Int)] = oldSigs.sum(0)
val oldSigCount: DataSet[Tuple1[Int]] = coloring.map(_._2).distinct.map(_ => Tuple1(1)).sum(0)
val signatureEdges: DataSet[(Int, (Coloring.Color, Coloring.Color))] = ts.getEdgesAsTuple3().join(coloring).where(1).equalTo(0) {
val signatureEdges: DataSet[(Int, (Color, Color))] = ts.getEdgesAsTuple3().join(coloring).where(1).equalTo(0) {
(edge, color) => (edge._1, (edge._3.toInt, color._2))
}
val signatures: DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])] = signatureEdges.groupBy(0).reduceGroup { g =>
val head = g.next()
(head._1, g.map(_._2).toSet + head._2)
val signatures: DataSet[(Int, Color)] =
signatureEdges.groupBy(0).reduceGroup { g =>
val head = g.next
(head._1, longHashing(g.map(_._2).toSet + head._2))
}
// new colors for the vertices, still keeping color for vertices without out-edges!
val newColoring: DataSet[(Int, Color)] = coloring.leftOuterJoin(signatures)
.where(0).equalTo(0) { (oldColoring, newColor) =>
if (newColor == null) (oldColoring._1, Long.MinValue) else newColor
}
val newColoring: DataSet[(Int, Coloring.Color)] = coloring.leftOuterJoin(signatures)
.where(0).equalTo(0) ((oldColoring, newSig) => if (newSig == null) oldColoring else (newSig._1, newSig._2.hashCode()))
val newSigCount: DataSet[(Int, Int)] = newColoring.map(_._2).distinct.map(s => (1,1)).sum(0)
val newSigCount: DataSet[Tuple1[Int]] = newColoring.map(_._2).distinct.map(_ => Tuple1(1)).sum(0)
val terminationSet = (oldSigCount cross newSigCount).filter(on => on._1._1 < on._2._1)
(newColoring, terminationSet)
}
val compactColorIds: DataSet[(Long, Color)] = DataSetUtils(coloring.map(_._2).distinct).zipWithIndex
val compactColoring = (coloring join compactColorIds).where(1).equalTo(1) {
(vertSig, sigId) => (vertSig._1, sigId._1.toInt)
}
// recompute the last signatures
val signatureEdges: DataSet[(Int, (Coloring.Color, Coloring.Color))] = ts.getEdgesAsTuple3().join(coloring).where(1).equalTo(0) {
val signatureEdges: DataSet[(Int, (Coloring.Color, Coloring.Color))] = ts.getEdgesAsTuple3().join(compactColoring).where(1).equalTo(0) {
(edge, color) => (edge._1, (edge._3.toInt, color._2))
}
val signatures: DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])] = signatureEdges.groupBy(0).reduceGroup { g =>
......@@ -58,7 +63,73 @@ class SignatureRefinement {
(head._1, g.map(_._2).toSet + head._2)
}
(coloring, signatures)
(compactColoring, signatures)
}
}
object SignatureRefinement {
type Color = Long
class FullSignature(entries: Set[(Coloring.Color, Coloring.Color)]) extends Ordered[FullSignature] {
lazy private val str = entries.mkString(",")
def compare(that: FullSignature): Int = str compare that.str
}
def longHashing(xs: TraversableOnce[Any]) = {
// adapted from https://github.com/scala/scala/blob/v2.12.5/src/library/scala/util/hashing/MurmurHash3.scala
var a, b, n = 0L
var c = 1L
xs foreach { x =>
val h = x.hashCode
a += h
b ^= h
if (h != 0) c *= h
n += 1
}
var h = 23L
h = mix(h, a)
h = mix(h, b)
h = mixLast(h, c)
finalizeHash(h, n)
}
/** Mix in a block of data into an intermediate hash value. */
final def mix(hash: Long, data: Long): Long = {
var h = mixLast(hash, data)
h = JavaLong.rotateLeft(h, 13) //Integer.rotateLeft(h, 13)
h * 5 + 0xe6546b64
}
/** May optionally be used as the last mixing step. Is a little bit faster than mix,
* as it does no further mixing of the resulting hash. For the last element this is not
* necessary as the hash is thoroughly mixed during finalization anyway. */
final def mixLast(hash: Long, data: Long): Long = {
var k = data
k *= 0xcc9e2d51
k = JavaLong.rotateLeft(k, 15)
k *= 0x1b873593
hash ^ k
}
/** Finalize a hash to incorporate the length and make sure all bits avalanche. */
final def finalizeHash(hash: Long, length: Long): Long = avalanche(hash ^ length)
/** Force all bits of the hash to avalanche. Used for finalizing the hash. */
private final def avalanche(hash: Long): Long = {
var h = hash
h ^= h >>> 16
h *= 0x85ebca6b
h ^= h >>> 13
h *= 0xc2b2ae35
h ^= h >>> 16
h
}
}
\ No newline at end of file
package de.bbisping.coupledsim.flink
import de.bbisping.coupledsim.flink.CoupledSimulationFlink.Action
import de.bbisping.coupledsim.util.Coloring
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.{DataSet, _}
import org.apache.flink.api.scala.utils.DataSetUtils
import org.apache.flink.graph.scala.Graph
import org.apache.flink.types.NullValue
import org.apache.flink.util.Collector
/**
* Not yet working properly. (Because of instable hashes in graph circles)
*/
class SignatureRefinementDelta {
import SignatureRefinement._
type Signature = Set[(Coloring.Color, Coloring.Color)]
def compute(
ts: Graph[Int, NullValue, CoupledSimulationFlink.Action])
: (DataSet[(Int, Coloring.Color)], DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])]) = {
val verts = ts.getVertexIds()
val initialColoring: DataSet[(Int, Color)] = verts.map((_, Long.MinValue))
val coloring = initialColoring.iterateDelta(initialColoring, CoupledSimulationFlink.MAX_ITERATIONS, Array(0)) { (currentColoring, lastChanges) =>
// we only have to update predecessors of changed vertices
val activeVertices: DataSet[Int] =
(lastChanges join ts.getEdgeIds() where (v => v._1) equalTo 1) { (_, edge) => println(edge._1); edge._1 }.distinct
val oldSignatures: DataSet[(Int, Color)] =
(activeVertices join currentColoring where (v => v) equalTo 0) { (_, oldSig) => oldSig }
// the edges originating from active vertices have to be considered to compute new signatures
val activeEdges: DataSet[(Int, Int, Action)] =
(activeVertices join ts.getEdgesAsTuple3() where (v => v) equalTo 0) { (_, edge) => edge }
val signatureEdges: DataSet[(Int, (Color, Color))] =
(activeEdges join currentColoring where 1 equalTo 0) { (edge, color) =>
(edge._1, (edge._3, color._2))
}
val signatureUpdates: DataSet[(Int, Color)] =
signatureEdges.groupBy(0).reduceGroup { g =>
val head = g.next
(head._1, longHashing(g.map(_._2).toSet + head._2))
}
val vertexColorComparison: DataSet[(Int, Color, Color)] =
(signatureUpdates join currentColoring where 0 equalTo 0) { (newSig, oldSig) =>
(newSig._1, newSig._2, oldSig._2)
}
val reallyUpdatedVertices = vertexColorComparison flatMap new FlatMapFunction[(Int, Color, Color), (Int, Color)] {
def flatMap(change: (Int, Color, Color),
out: Collector[(Int, Color)]): Unit = change match {
case (id: Int, newColor: Color, oldColor: Color) =>
if (newColor != oldColor) {
println(id, oldColor, newColor)
out.collect((id, newColor))
}
}
}
//
//
// filter new FilterFunction[(Int, Color, Color)] {
// def filter(update: (Int, Color, Color)): Boolean = update match {
// case (id: Int, newColor: Color, oldColor: Color) => newColor != oldColor
// }
// }
//
// val reallyUpdatedVertices = realUpdates map (change => change._1)
(reallyUpdatedVertices, reallyUpdatedVertices)
}
/*
val coloring = initialColoring.iterateWithTermination(CoupledSimulationFlink.MAX_ITERATIONS) { coloring =>
//val oldSigMaxId: DataSet[Color] = coloring.max(1).map(_._2)
//val oldSigs: DataSet[(Int, Int)] = coloring.map(_._2).distinct.map(_ => (1,1))
val oldSigCount: DataSet[(Int, Int)] = coloring.map(_._2).distinct.map(_ => (1,1)).sum(0)
val signatureEdges: DataSet[(Int, (Color, Color))] = ts.getEdgesAsTuple3().join(coloring).where(1).equalTo(0) {
(edge, color) => (edge._1, (edge._3.toInt, color._2))
}
val signatures: DataSet[(Int, Color)] =
signatureEdges.groupBy(0).reduceGroup { g =>
val head = g.next
(head._1, longHashing((g.map(_._2)).toSet + head._2))
}
/*
// defragment colors (new IDs on LHS)
val signatureColors: DataSet[(Color, Color)] =
DataSetUtils(signatures.map(s => s._2).distinct).zipWithIndex
val verticesColored: DataSet[(Int, Color)] =
(signatures join signatureColors).where(1).equalTo(1) {
(vertSig, sigId) => (vertSig._1, sigId._1)
}*/
// new colors for the vertices, still keeping color for vertices without out-edges!
val newColoring: DataSet[(Int, Color)] = coloring.leftOuterJoin(signatures)
.where(0).equalTo(0) { (oldColoring, newColor) =>
if (newColor == null) (oldColoring._1, Long.MinValue) else newColor
}
//val newSigMaxId: DataSet[Color] = newColoring.max(1).map(_._2)
val newSigCount: DataSet[(Int, Int)] = newColoring.map(_._2).distinct.map(_ => (1,1)).sum(0)
// val terminationSet = (oldSigMaxId cross newSigMaxId) filter { on =>
// println(Console.GREEN + " {Old Count, new Count} " + on)
// on._1 < on._2
// }
val terminationSet = (oldSigCount cross newSigCount).filter(on => on._1._1 < on._2._1)
(newColoring, terminationSet)
}
*/
val compactColorIds: DataSet[(Long, Color)] = DataSetUtils(coloring.map(_._2).distinct).zipWithIndex
val compactColoring = (coloring join compactColorIds).where(1).equalTo(1) {
(vertSig, sigId) => (vertSig._1, sigId._1.toInt)
}
// recompute the last signatures
val signatureEdges: DataSet[(Int, (Coloring.Color, Coloring.Color))] = ts.getEdgesAsTuple3().join(compactColoring).where(1).equalTo(0) {
(edge, color) => (edge._1, (edge._3.toInt, color._2))
}
val signatures: DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])] = signatureEdges.groupBy(0).reduceGroup { g =>
val head = g.next()
(head._1, g.map(_._2).toSet + head._2)
}
(compactColoring, signatures)