Commit 78822ee5 by benkeks

new repo without copyrighted content

parents
.log
.class
bin
*#
*~
*.bak
*.sav
*.lyx.emergency
*-lyxformat-474.lyx
target
.cache
.cache-main
.cache-tests
.history
.lib
dist/*
isabelle/output
code/node_modules
stuff
eclipse.sh
talks/talk_2017-may
MIT License
Copyright (c) 2018 Benjamin Bisping
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Coupled Sim Fiddle
==================
The Coupled Sim Fiddle is a tool for playing around with labeled transition systems and algorithms on them (especially concerning coupled similarity – a notion of equivalence for systems with internal steps.)
A live instance of the tool runs on <https://coupledsim.bbisping.de>.
The repository also contains Benjamin Bisping's master's thesis "Computing coupled similarity" (in `/thesis/`).
Building the web tool
---------------------
The web tool can be built using `sbt webStage`. After that, `web/target/web/stage/index.html` contains the tool.
Test of the algorithms on a set of transition systems are triggered by `sbt test`. (Unfortunately, the node.js integration in the project is a little unfirm. It may be that you have to manually install node.js modules for the tests to run, for example, `npm install jsdom` in the `code` directory.)
(You will need sbt. For installation instructions, go to <https://www.scala-sbt.org/download.html>.)
The Apache Flink program
------------------------
A program to compute coupled simulation reslations using Apache Flink, can be used via `sbt flink/run`.
The program takes the following arguments:
Switch | Effect
------------------------ | --------------------------------------
--ts PATH/TO/LTS.csv | Determine the input transition system (must be given as a CSV with format: "srcId, tarId, actionName". The internal action is denoted by "i".)
--overapproximation ARG | Which over-approximation to apply to keep the game small. (Default: bigstep, alternative: none)
--preminimization ARG | Which under-approximation to use for minimization at the beginning of the algorithm. (Default: delaybisim, alternative: weakbisim)
--outputgame | Whether to write the game to disk. (Will be written to input source path with ".game" appended)
--checksoundness | Checks that the result relation really is a coupled simulation
--parallelism N | Degree of parallelism for the Flink program
--sizemark | Output sizes of systems, games, results for predefined benchmark set (takes a lot of time and space)
--timemark | Output running times for predefined benchmark set (takes a lot of time)
(Arguments are channelled through sbt like this: `sbt "flink/run --ts myTransitionSystem.csv"`)
Test can be run by `sbt flink/test`.
name := "CoupledSim"
version := "0.1.0"
scalaVersion := "2.11.11"
val scalacOpts = Seq(
"-Xmax-classfile-name", "140",
"-feature",
"-language:implicitConversions",
"-language:postfixOps",
"-language:existentials"
)
lazy val web = (project in file("web")).settings(
scalaVersion := "2.11.11",
scalaJSProjects := Seq(jsClient),
isDevMode in scalaJSPipeline := false,
pipelineStages in Assets := Seq(scalaJSPipeline),
compile in Compile := ((compile in Compile) dependsOn scalaJSPipeline).value,
libraryDependencies ++= Seq(
"org.webjars" % "codemirror" % "5.13",
"org.webjars" % "jquery" % "2.1.3",
"org.webjars" % "bootstrap" % "3.3.6"
)
).enablePlugins(SbtWeb)
lazy val shared = (project in file("shared")).settings(
scalaVersion := "2.11.11",
name := "shared",
scalacOptions ++= scalacOpts,
test in assembly := {},
libraryDependencies ++= Seq(
"org.scalaz" %%% "scalaz-core" % "7.2.16"
)
)
lazy val jsClient = (project in file("js-client")).settings(
scalaVersion := "2.11.11",
name := "coupledsim-client",
parallelExecution in ThisBuild := false,
scalacOptions ++= scalacOpts,
testFrameworks += new TestFramework("utest.runner.Framework"),
resolvers += sbt.Resolver.bintrayRepo("denigma", "denigma-releases"),
libraryDependencies ++= Seq(
"org.scalaz" %%% "scalaz-core" % "7.2.16",
"com.lihaoyi" %%% "utest" % "0.5.4",
"org.singlespaced" %%% "scalajs-d3" % "0.3.4",
"org.denigma" %%% "codemirror-facade" % "5.13.2-0.8",
"com.github.karasiq" %%% "scalajs-bootstrap" % "2.3.1"
),
artifactPath in (Compile,fastOptJS) :=
((target in fastOptJS).value /
((moduleName in fastOptJS).value + ".js")),
artifactPath in (Compile,fullOptJS) := (artifactPath in (Compile,fastOptJS)).value,
jsDependencies ++= Seq(
"org.webjars" % "codemirror" % "5.13" / "codemirror.js",
"org.webjars" % "jquery" % "2.1.3" / "2.1.3/jquery.js",
"org.webjars" % "bootstrap" % "3.3.7" / "bootstrap.min.js"
),
unmanagedSourceDirectories in Compile +=
baseDirectory.value / ".." / "shared" / "src" / "main" / "scala-2.11"
).aggregate(shared).dependsOn(shared).enablePlugins(ScalaJSPlugin, ScalaJSWeb)
val flinkVersion = "1.4.2"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-table" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-gelly-scala" % "1.4.1")
lazy val flink = (project in file("flink")).
settings(
scalaVersion := "2.11.11",
resolvers ++= Seq(
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
Resolver.mavenLocal
),
libraryDependencies ++= flinkDependencies ++ Seq("org.scalatest" %% "scalatest" % "3.0.5" % "test"),
//fork in run := true,
test in assembly := {},
run in Compile := Defaults.runTask(fullClasspath in Compile,
mainClass in (Compile, run),
runner in (Compile, run)
).evaluated
).aggregate(shared).dependsOn(shared)
lazy val root = project.in(file(".")).settings(
name := "coupledsim",
testFrameworks += new TestFramework("utest.runner.Framework")
).aggregate(shared, jsClient, web)
.dependsOn(jsClient, web)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
sbt.version=1.1.1
\ No newline at end of file
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
package de.bbisping.coupledsim.flink
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala.Graph
import org.apache.flink.api.scala.utils.`package`.DataSetUtils
import org.apache.flink.api.scala.DataSet
import org.apache.flink.types.NullValue
class ActionsStringToLongRelabeling(ts: Graph[Int, NullValue, String]) {
def compute(env: ExecutionEnvironment, tauStr: String, tau: Long): (Graph[Int, NullValue, Long], DataSet[(Long, String)]) = {
val stringLabeledEdges = ts.getEdgesAsTuple3()
val actions: DataSet[String] = stringLabeledEdges.map(_._3).distinct()
val preliminaryActionIds: DataSet[(Long, String)] = DataSetUtils(actions).zipWithIndex//.zipWithUniqueId
val actionIds: DataSet[(Long, String)] =
preliminaryActionIds map ({ case ((id, name)) =>
if (name == tauStr) {
(tau, name)
} else {
(id + tau + 10, name)
}
}: ((Long, String)) => (Long, String))
val relabeledEdges: DataSet[(Int, Int, Long)] =
(stringLabeledEdges join actionIds)
.where(2).equalTo(1) { (edge, actionId) =>
(edge._1, edge._2, actionId._1)
}
(Graph.fromTupleDataSet(ts.getVerticesAsTuple2(), relabeledEdges, env), actionIds)
}
}
\ No newline at end of file
package de.bbisping.coupledsim.flink
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.Seq
/**
* Runs the coupled simulation flink algorithm on a number of VLTS samples
*/
object CoupledSimulationFlinkBenchmark {
val smallSamples = Seq(
"shared/src/test/assets/csv/alphabet.csv",
"shared/src/test/assets/csv/bug1.csv",
"shared/src/test/assets/csv/bug2.csv",
"shared/src/test/assets/csv/contra-sim-1.csv",
"shared/src/test/assets/csv/coupled-sim-1b.csv",
"shared/src/test/assets/csv/coupled-sim-2.csv",
"shared/src/test/assets/csv/coupled-sim-phil.csv",
"shared/src/test/assets/csv/diamond.csv",
"shared/src/test/assets/csv/ltbts.csv",
"shared/src/test/assets/csv/sim-1.csv",
"shared/src/test/assets/csv/weak-bisim-1.csv",
"shared/src/test/assets/csv/weak-bisim-2.csv",
"shared/src/test/assets/csv/weak-bisim-2b.csv"
)
val vltsSamplesSmall = Seq(
"shared/src/test/assets/vlts/vasy_0_1.csv" // 289, 1224, no tau, 2
)
val vltsSamplesMedium = Seq(
"shared/src/test/assets/vlts/vasy_1_4.csv", // 1183, 4464, 1213, 6
"shared/src/test/assets/vlts/vasy_5_9.csv",
"shared/src/test/assets/vlts/cwi_1_2.csv", // 1952, 2387, 2215, 26
"shared/src/test/assets/vlts/cwi_3_14.csv",// 3996, 14552, 14551, 2
"shared/src/test/assets/vlts/vasy_8_24.csv",
"shared/src/test/assets/vlts/vasy_8_38.csv",
"shared/src/test/assets/vlts/vasy_10_56.csv",
// "shared/src/test/assets/vlts/vasy_18_73.csv" // memory ran out (in discovery)
"shared/src/test/assets/vlts/vasy_25_25.csv"
// "shared/src/test/assets/vlts/vasy_40_60.csv" // weak bisim takes forever (also 15 secs in the optimized [BGR2016] implementation)
)
def runSizeMark(cfgPreminimization: String, cfgOverApproximation: String) = {
val samples =
smallSamples ++
vltsSamplesSmall ++
vltsSamplesMedium
val env = ExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging()
for (s <- samples) {
val begin = System.currentTimeMillis()
val csResult = CoupledSimulationFlink.executeAlgorithm(env,
cfgPath = s,
cfgPreminimization = cfgPreminimization,
cfgOverApproximation = cfgOverApproximation,
cfgBenchmarkSizes = true,
cfgReturnPartitionRelation = true)
val time = System.currentTimeMillis() - begin
val benchmark = csResult.benchmarkSizes.withDefaultValue("")
val (csPart, csRel) = csResult.partitionRelation
val csPartitionCount = csPart.values.toSet.size
println(s + ", " +
benchmark("systemStates") + ", " +
benchmark("systemTransitions") + ", " +
benchmark("systemWeakTransitions") + ", " +
benchmark("minimizedStates") + ", " +
benchmark("minimizedTransitions") + ", " +
benchmark("gameNodes") + ", " +
benchmark("gameMoves") + ", " +
csPartitionCount + ", " +
csRel.size + ", " +
time)
}
}
def runTimeMark(cfgPreminimization: String, cfgOverApproximation: String) = {
val samples =
smallSamples ++ // warmup
smallSamples ++
vltsSamplesSmall ++
vltsSamplesMedium
val env = ExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging()
for (s <- samples) {
val begin = System.currentTimeMillis()
val csResult = CoupledSimulationFlink.executeAlgorithm(env,
cfgPath = s,
cfgPreminimization = cfgPreminimization,
cfgOverApproximation = cfgOverApproximation)
val time = System.currentTimeMillis() - begin
println(s + ", " + time)
}
}
}
\ No newline at end of file
package de.bbisping.coupledsim.flink
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.DataSet
import org.apache.flink.graph.scala.Graph
import org.apache.flink.types.NullValue
import de.bbisping.coupledsim.util.Coloring
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.functions.JoinFunction
class CoupledSimulationGame {
import CoupledSimulationFlink.Action
import CoupledSimulationGame._
type Signature = Set[(Coloring.Color, Coloring.Color)]
def compute(
ts: Graph[Int, NullValue, CoupledSimulationFlink.Action],
signaturesOpt: Option[DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])]],
TAU: CoupledSimulationFlink.Action)
: (DataSet[(CoupledSimulationFlink.Action, Int, Int)],
DataSet[((CoupledSimulationFlink.Action, Int, Int), (CoupledSimulationFlink.Action, Int, Int))]) = {
val possibleAttackerNodes: DataSet[(Action, Int, Int)] =
(ts.getVertexIds cross ts.getVertexIds) {
(p, q) => (ATTACK, p, q)
}
val attackerNodes: DataSet[(Action, Int, Int)] = signaturesOpt match {
case Some(signatures) =>
// only generate attacker nodes where there is a chance of the defender winning
(signatures cross signatures) flatMap new FlatMapFunction[((Int, Signature), (Int, Signature)), (Action, Int, Int)] {
def flatMap(pqSig: ((Int, Signature), (Int, Signature)),
out: Collector[(Action, Int, Int)]) = pqSig match {
case ((p, pSig), (q, qSig)) =>
if (pSig.size <= qSig.size && (pSig subsetOf qSig)) {
out.collect((ATTACK, p, q))
}
}
}
case None =>
possibleAttackerNodes
}
//println(attackerNodes.collect())
val simulationChallenges: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(ts.getEdgesAsTuple3() join possibleAttackerNodes) // ?
.where(0/*src*/).equalTo(1/*p*/) { (edge, an) =>
(an, (edge._3/*a*/, edge._2/*tar*/, an._3 /*q*/))
}
val defenderSimulationNodes: DataSet[(Action, Int, Int)] =
((simulationChallenges flatMap new FlatMapFunction[((Action, Int, Int), (Action, Int, Int)), (Action, Int, Int)] {
def flatMap(simChallenge: ((Action, Int, Int), (Action, Int, Int)),
out: Collector[(Action, Int, Int)]) = simChallenge match {
case ((_, rhs)) =>
out.collect(rhs)
//out.collect((TAU, rhs._2, rhs._3))
}
})
union (possibleAttackerNodes map (an => (TAU, an._2, an._3)))
).distinct()
// Seq(rhs, (TAU, rhs._2, rhs._3))
// }: (((Action, Int, Int), (Action, Int, Int))) => TraversableOnce[(Action, Int, Int)]))
// .distinct()
// only allow "real" (non-stuttering) tau-steps (because otherwise this could be used
// by the defender to go into infinite loops and win) (we assume that tau cycles have been compressed)
val tauSteps = ts.getEdgesAsTuple3() filter new FilterFunction[((Int, Int, Action))] {
def filter(edge: ((Int, Int, Action))) = edge match {
case ((p0, p1, a)) => a == TAU && p0 != p1
}
}
// the simulation answer can be postponed by internal steps on the right hand side
val simulationWeakSteps: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(defenderSimulationNodes join tauSteps)
.where(2/*q*/).equalTo(0/*p0*/) { (dn, edge) =>
(dn, (dn._1, dn._2, edge._2))
}
// at some point the defender has to decide that this is the right place to perform the visible action
val simulationAnswers: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(defenderSimulationNodes join ts.getEdgesAsTuple3())
.where(2/*q*/,0/*a*/).equalTo(0/*src*/,2/*a*/) (new JoinFunction[(Action, Int, Int), (Int, Int, Action), ((Action, Int, Int), (Action, Int, Int))] {
def join(dn: (Action, Int, Int), edge: (Int, Int, Action)) = {
(dn, (TAU, dn._2, edge._2))
}
})
// afterwards (or directly on tau challenges) the defender may yield the intitiative back to the attacker
val simulationAnswerTauResolves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(defenderSimulationNodes
.filter(new FilterFunction[(Action, Int, Int)] {
def filter(challenge: (Action, Int, Int)) = challenge._1 == TAU})
join attackerNodes) // ??
.where(1,2).equalTo(1,2)/* { dn =>
(dn, (ATTACK, dn._2, dn._3))//TODO: Restrict this to attacker nodes which are in the over-approximation (otherwise this may generate spurious victories for the defender!!)
}*/
// every attacker node can be the entry or exit of a coupling challenge
val couplingChallengesEntrysExits: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(possibleAttackerNodes map (an => (an, (COUPLING, an._2, an._3)))) union // ??
(attackerNodes map (an => ((COUPLING, an._3, an._2), an))) // ????
//
// attackerNodes flatMap new FlatMapFunction[(Action, Int, Int), ((Action, Int, Int), (Action, Int, Int))] {
// def flatMap(an: (Action, Int, Int), out: Collector[((Action, Int, Int), (Action, Int, Int))]) = {
// out.collect((an, (COUPLING, an._2, an._3)))
// out.collect(((COUPLING, an._3, an._2), an))// note the reversed order of p and q!!!
// }
// }
// during a coupling challenge, the defender may move with tau steps on the right-hand side.
val couplingMoves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(possibleAttackerNodes join tauSteps)
.where(2/*q*/).equalTo(0/*src*/) (new JoinFunction[(Action, Int, Int), (Int, Int, Action), ((Action, Int, Int), (Action, Int, Int))] {
def join(an: (Action, Int, Int), edge: (Int, Int, Action)) = {
((COUPLING, an._2, an._3), (COUPLING, an._2, edge._2))
}
})
val gameNodes = attackerNodes union defenderSimulationNodes
val gameMoves = simulationChallenges union
simulationWeakSteps union simulationAnswers union simulationAnswerTauResolves union
couplingChallengesEntrysExits union couplingMoves
(gameNodes, gameMoves)
}
//
// def genAttack(pqWithSig: ((Int, Set[(Coloring.Color, Coloring.Color)]), (Int, Set[(Coloring.Color, Coloring.Color)]))) = pqWithSig match {
// case ((p, pSig), (q, qSig)) =>
// //if (pSig.size <= qSig.size && (pSig subsetOf qSig)) {
// (ATTACK, p, q)
//// } else {
//// (ATTACK, p, q)
//// }
// }
// (Int, Set[(Coloring.Color, Coloring.Color)]), (Int, Set[(Coloring.Color, Coloring.Color)]))) => Seq[(Action, Int, Int)]
}
object CoupledSimulationGame {
val ATTACK: Long = -1
val COUPLING: Long = -2
}
\ No newline at end of file
package de.bbisping.coupledsim.flink
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.DataSet
import org.apache.flink.graph.scala.Graph
import org.apache.flink.types.NullValue
import de.bbisping.coupledsim.util.Coloring
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.functions.JoinFunction
import scala.reflect.ClassTag
import org.apache.flink.api.common.typeinfo.TypeInformation
/**
* A variant of the coupled simulation game computation enhanced with a gradual
* discovery of the game space.
*/
class CoupledSimulationGameDiscovery {
import CoupledSimulationFlink.Action
import CoupledSimulationGame._
type Signature = Set[(Coloring.Color, Coloring.Color)]
// due to some strange behavior of flink, we unfortunately cannot use these type synonyms
type GameNode = (Action, Int, Int)
type GameMove = ((Action, Int, Int), (Action, Int, Int))
def compute(
ts: Graph[Int, NullValue, CoupledSimulationFlink.Action],
signaturesOpt: Option[DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])]],
TAU: CoupledSimulationFlink.Action,
env: ExecutionEnvironment)
: (DataSet[(CoupledSimulationFlink.Action, Int, Int)],
DataSet[((CoupledSimulationFlink.Action, Int, Int), (CoupledSimulationFlink.Action, Int, Int))]) = {
val possibleAttackerNodes: DataSet[(Action, Int, Int)] =
(ts.getVertexIds cross ts.getVertexIds) {
(p, q) => (ATTACK, p, q)
}
val attackerNodes: DataSet[(Action, Int, Int)] = signaturesOpt match {
case Some(signatures) =>
// only generate attacker nodes where there is a chance of the defender winning
(signatures cross signatures) flatMap new FlatMapFunction[((Int, Signature), (Int, Signature)), (Action, Int, Int)] {
def flatMap(pqSig: ((Int, Signature), (Int, Signature)),
out: Collector[(Action, Int, Int)]) = pqSig match {
case ((p, pSig), (q, qSig)) =>
if (pSig.size <= qSig.size && (pSig subsetOf qSig)) {
out.collect((ATTACK, p, q))
}
}
}
case None =>
possibleAttackerNodes
}
// only allow "real" (non-stuttering) tau-steps (because otherwise this could be used
// by the defender to go into infinite loops and win) (we assume that tau cycles have been compressed)
val tauSteps = ts.getEdgesAsTuple3() filter new FilterFunction[((Int, Int, Action))] {
def filter(edge: ((Int, Int, Action))) = edge match {
case ((p0, p1, a)) => a == TAU && p0 != p1
}
}
val initialAttacks: DataSet[((Action, Int, Int), (Action, Int, Int))] = attackerNodes.map(a => (a, a))
val gameMoves: DataSet[((Action, Int, Int), (Action, Int, Int))] = initialAttacks.
iterateDelta(initialAttacks, CoupledSimulationFlink.MAX_ITERATIONS, Array(0,1)) { (discoveredMoves: DataSet[((Action, Int, Int), (Action, Int, Int))], deltaMoves: DataSet[((Action, Int, Int), (Action, Int, Int))]) =>
val deltaNodes: DataSet[(Action, Int, Int)] = deltaMoves.map(_._2).distinct
val newAttackerNodes: DataSet[(Action, Int, Int)] = deltaNodes.filter(_._1 == ATTACK)
val newSimulationChallenges: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(ts.getEdgesAsTuple3() join newAttackerNodes)
.where(0/*src*/).equalTo(1/*p*/) { (edge, an) =>
(an, (edge._3/*a*/, edge._2/*tar*/, an._3 /*q*/))
}
val newDefenderSimulationNodes: DataSet[(Action, Int, Int)] = deltaNodes.filter(n => n._1 != ATTACK && n._1 != COUPLING)
// the simulation answer can be postponed by internal steps on the right hand side
val newSimulationWeakSteps: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(newDefenderSimulationNodes join tauSteps)
.where(2/*q*/).equalTo(0/*p0*/) { (dn, edge) =>
(dn, (dn._1, dn._2, edge._2))
}
// at some point the defender has to decide that this is the right place to perform the visible action
val newSimulationAnswersUnfiltered =
(newDefenderSimulationNodes join ts.getEdgesAsTuple3())
.where(2/*q*/,0/*a*/).equalTo(0/*src*/,2/*a*/) ((dn, edge) => (dn, (ATTACK, dn._2, edge._2))) // TAU
val newSimulationAnswers: DataSet[((Action, Int, Int), (Action, Int, Int))] =
(attackerNodes join newSimulationAnswersUnfiltered).where(n => n).equalTo(1)((a, mv) => mv)