This GitLab instance reached the end of its service life. It won't be possible to create new users or projects.

Please read the deprecation notice for more information concerning the deprecation timeline

Visit migration.git.tu-berlin.de (internal network only) to import your old projects to the new GitLab platform 📥

CoupledSimulationGameDiscovery.scala 6.98 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
package de.bbisping.coupledsim.flink

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.DataSet
import org.apache.flink.graph.scala.Graph
import org.apache.flink.types.NullValue
import de.bbisping.coupledsim.util.Coloring
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.functions.JoinFunction


/**
 * A variant of the coupled simulation game computation enhanced with a gradual
 * discovery of the game space.
 */
class CoupledSimulationGameDiscovery {
  
  import CoupledSimulationFlink.Action
  import CoupledSimulationGame._
  
  type Signature = Set[(Coloring.Color, Coloring.Color)]
  
  // due to some strange behavior of flink, we unfortunately cannot use these type synonyms
  type GameNode = (Action, Int, Int)
  type GameMove = ((Action, Int, Int), (Action, Int, Int))
  
  def compute(
      ts: Graph[Int, NullValue, CoupledSimulationFlink.Action],
      signaturesOpt: Option[DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])]],
      TAU: CoupledSimulationFlink.Action,
      env: ExecutionEnvironment)
  : (DataSet[(CoupledSimulationFlink.Action, Int, Int)],
     DataSet[((CoupledSimulationFlink.Action, Int, Int), (CoupledSimulationFlink.Action, Int, Int))]) = {
    
    val possibleAttackerNodes: DataSet[(Action, Int, Int)] =
      (ts.getVertexIds cross ts.getVertexIds) {
        (p, q) => (ATTACK, p, q)
    }
    
    val attackerNodes: DataSet[(Action, Int, Int)] = signaturesOpt match {
      case Some(signatures) =>
        // only generate attacker nodes where there is a chance of the defender winning
        (signatures cross signatures) flatMap new FlatMapFunction[((Int, Signature), (Int, Signature)), (Action, Int, Int)] {
          def flatMap(pqSig: ((Int, Signature), (Int, Signature)),
47
              out: Collector[(Action, Int, Int)]): Unit = pqSig match {
48 49 50 51 52 53 54 55 56 57 58 59
            case ((p, pSig), (q, qSig)) =>
              if (pSig.size <= qSig.size && (pSig subsetOf qSig)) {
                out.collect((ATTACK, p, q))
              }
          }
        }
      case None =>
        possibleAttackerNodes
    }
    
    // only allow "real" (non-stuttering) tau-steps (because otherwise this could be used
    // by the defender to go into infinite loops and win) (we assume that tau cycles have been compressed)
60 61 62
    val tauSteps = ts.getEdgesAsTuple3() filter new FilterFunction[(Int, Int, Action)] {
      def filter(edge: (Int, Int, Action)): Boolean = edge match {
        case (p0, p1, a) => a == TAU && p0 != p1
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
      }
    }
    
    val initialAttacks: DataSet[((Action, Int, Int), (Action, Int, Int))] = attackerNodes.map(a => (a, a))
    
    val gameMoves: DataSet[((Action, Int, Int), (Action, Int, Int))] = initialAttacks.
      iterateDelta(initialAttacks, CoupledSimulationFlink.MAX_ITERATIONS, Array(0,1)) { (discoveredMoves: DataSet[((Action, Int, Int), (Action, Int, Int))], deltaMoves: DataSet[((Action, Int, Int), (Action, Int, Int))]) =>
        val deltaNodes: DataSet[(Action, Int, Int)] = deltaMoves.map(_._2).distinct
        
        val newAttackerNodes: DataSet[(Action, Int, Int)] = deltaNodes.filter(_._1 == ATTACK)
        val newSimulationChallenges: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (ts.getEdgesAsTuple3() join newAttackerNodes)
            .where(0/*src*/).equalTo(1/*p*/) { (edge, an) =>
              (an, (edge._3/*a*/, edge._2/*tar*/, an._3 /*q*/))
        }
        
        val newDefenderSimulationNodes: DataSet[(Action, Int, Int)] = deltaNodes.filter(n => n._1 != ATTACK && n._1 != COUPLING)
        
         // the simulation answer can be postponed by internal steps on the right hand side
        val newSimulationWeakSteps: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (newDefenderSimulationNodes join tauSteps)
          .where(2/*q*/).equalTo(0/*p0*/) { (dn, edge) =>
            (dn, (dn._1, dn._2, edge._2))
        }
        
        // at some point the defender has to decide that this is the right place to perform the visible action
89
        // and yield back to the attacker (that the defender may not postpone the yielding, means that we use delay steps and not weak steps!)
90 91
        val newSimulationAnswersUnfiltered =
          (newDefenderSimulationNodes join ts.getEdgesAsTuple3())
92 93
          .where(2/*q*/,0/*a*/).equalTo(0/*src*/,2/*a*/) ((dn, edge) => (dn, (ATTACK, dn._2, edge._2)))

94
        val newSimulationAnswers: DataSet[((Action, Int, Int), (Action, Int, Int))] =
95 96 97
          (attackerNodes join newSimulationAnswersUnfiltered).where(n => n).equalTo(1)((_, mv) => mv)

        // on tau challenges the defender may yield the inititiative back to the attacker directly
98 99 100 101 102 103 104 105
        val newSimulationAnswerTauResolves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (newDefenderSimulationNodes
              .filter(_._1 == TAU)
              join attackerNodes) // ??
              .where(1,2).equalTo(1,2)
          
        // every attacker node can be the entry or exit of a coupling challenge
        val newCouplingChallengesEntrys: DataSet[((Action, Int, Int), (Action, Int, Int))]  =
106
          newAttackerNodes map (an => (an, (COUPLING, an._2, an._3)))
107 108 109 110 111 112 113 114 115 116
          
        val newDefenderCouplingNodes: DataSet[(Action, Int, Int)] = deltaNodes.filter(n => n._1 == COUPLING)
          
        val newCouplingChallengesExits: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (newDefenderCouplingNodes join attackerNodes).where(1,2).equalTo(2,1)// map (cn => (cn, (ATTACK, cn._3, cn._2)))) //join attackerNodes).where(1).equalTo(2)
        
        // during a coupling challenge, the defender may move with tau steps on the right-hand side.
        val newCouplingMoves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (newDefenderCouplingNodes join tauSteps)
          .where(2/*q*/).equalTo(0/*src*/) (new JoinFunction[(Action, Int, Int), (Int, Int, Action), ((Action, Int, Int), (Action, Int, Int))] {
117
            def join(cn: (Action, Int, Int), edge: (Int, Int, Action)): ((Action, Int, Int), (Action, Int, Int)) = {
118 119 120 121 122 123 124 125 126 127 128 129 130
              (cn, (COUPLING, cn._2, edge._2))
            }
        })
          
        val newGameMoves = newSimulationChallenges union
          newSimulationWeakSteps union
          newSimulationAnswers union
          newSimulationAnswerTauResolves union
          newCouplingChallengesEntrys union
          newCouplingChallengesExits union
          newCouplingMoves
          
        val reallyNewGameMoves = (newGameMoves coGroup discoveredMoves)
131 132 133 134
          .where(0,1).equalTo(0,1).apply(fun = {
              (mv1: Iterator[((Action, Int, Int), (Action, Int, Int))],
              mv2: Iterator[((Action, Int, Int), (Action, Int, Int))],
              out: Collector[((Action, Int, Int), (Action, Int, Int))]) =>
135 136 137 138 139 140 141 142
            if (mv2.isEmpty) {
              for (nm <- mv1) out.collect(nm)
            }
        })

        (reallyNewGameMoves, reallyNewGameMoves)
    }
    
143
    val gameNodes = attackerNodes
144 145 146 147 148
      
    (gameNodes, gameMoves)
  }
  
}