CoupledSimulationGameDiscovery.scala 6.98 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
package de.bbisping.coupledsim.flink

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.DataSet
import org.apache.flink.graph.scala.Graph
import org.apache.flink.types.NullValue
import de.bbisping.coupledsim.util.Coloring
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.functions.JoinFunction


/**
 * A variant of the coupled simulation game computation enhanced with a gradual
 * discovery of the game space.
 */
class CoupledSimulationGameDiscovery {
  
  import CoupledSimulationFlink.Action
  import CoupledSimulationGame._
  
  type Signature = Set[(Coloring.Color, Coloring.Color)]
  
  // due to some strange behavior of flink, we unfortunately cannot use these type synonyms
  type GameNode = (Action, Int, Int)
  type GameMove = ((Action, Int, Int), (Action, Int, Int))
  
  def compute(
      ts: Graph[Int, NullValue, CoupledSimulationFlink.Action],
      signaturesOpt: Option[DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])]],
      TAU: CoupledSimulationFlink.Action,
      env: ExecutionEnvironment)
  : (DataSet[(CoupledSimulationFlink.Action, Int, Int)],
     DataSet[((CoupledSimulationFlink.Action, Int, Int), (CoupledSimulationFlink.Action, Int, Int))]) = {
    
    val possibleAttackerNodes: DataSet[(Action, Int, Int)] =
      (ts.getVertexIds cross ts.getVertexIds) {
        (p, q) => (ATTACK, p, q)
    }
    
    val attackerNodes: DataSet[(Action, Int, Int)] = signaturesOpt match {
      case Some(signatures) =>
        // only generate attacker nodes where there is a chance of the defender winning
        (signatures cross signatures) flatMap new FlatMapFunction[((Int, Signature), (Int, Signature)), (Action, Int, Int)] {
          def flatMap(pqSig: ((Int, Signature), (Int, Signature)),
47
              out: Collector[(Action, Int, Int)]): Unit = pqSig match {
48 49 50 51 52 53 54 55 56 57 58 59
            case ((p, pSig), (q, qSig)) =>
              if (pSig.size <= qSig.size && (pSig subsetOf qSig)) {
                out.collect((ATTACK, p, q))
              }
          }
        }
      case None =>
        possibleAttackerNodes
    }
    
    // only allow "real" (non-stuttering) tau-steps (because otherwise this could be used
    // by the defender to go into infinite loops and win) (we assume that tau cycles have been compressed)
60 61 62
    val tauSteps = ts.getEdgesAsTuple3() filter new FilterFunction[(Int, Int, Action)] {
      def filter(edge: (Int, Int, Action)): Boolean = edge match {
        case (p0, p1, a) => a == TAU && p0 != p1
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
      }
    }
    
    val initialAttacks: DataSet[((Action, Int, Int), (Action, Int, Int))] = attackerNodes.map(a => (a, a))
    
    val gameMoves: DataSet[((Action, Int, Int), (Action, Int, Int))] = initialAttacks.
      iterateDelta(initialAttacks, CoupledSimulationFlink.MAX_ITERATIONS, Array(0,1)) { (discoveredMoves: DataSet[((Action, Int, Int), (Action, Int, Int))], deltaMoves: DataSet[((Action, Int, Int), (Action, Int, Int))]) =>
        val deltaNodes: DataSet[(Action, Int, Int)] = deltaMoves.map(_._2).distinct
        
        val newAttackerNodes: DataSet[(Action, Int, Int)] = deltaNodes.filter(_._1 == ATTACK)
        val newSimulationChallenges: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (ts.getEdgesAsTuple3() join newAttackerNodes)
            .where(0/*src*/).equalTo(1/*p*/) { (edge, an) =>
              (an, (edge._3/*a*/, edge._2/*tar*/, an._3 /*q*/))
        }
        
        val newDefenderSimulationNodes: DataSet[(Action, Int, Int)] = deltaNodes.filter(n => n._1 != ATTACK && n._1 != COUPLING)
        
         // the simulation answer can be postponed by internal steps on the right hand side
        val newSimulationWeakSteps: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (newDefenderSimulationNodes join tauSteps)
          .where(2/*q*/).equalTo(0/*p0*/) { (dn, edge) =>
            (dn, (dn._1, dn._2, edge._2))
        }
        
        // at some point the defender has to decide that this is the right place to perform the visible action
89
        // and yield back to the attacker (that the defender may not postpone the yielding, means that we use delay steps and not weak steps!)
90 91
        val newSimulationAnswersUnfiltered =
          (newDefenderSimulationNodes join ts.getEdgesAsTuple3())
92 93
          .where(2/*q*/,0/*a*/).equalTo(0/*src*/,2/*a*/) ((dn, edge) => (dn, (ATTACK, dn._2, edge._2)))

94
        val newSimulationAnswers: DataSet[((Action, Int, Int), (Action, Int, Int))] =
95 96 97
          (attackerNodes join newSimulationAnswersUnfiltered).where(n => n).equalTo(1)((_, mv) => mv)

        // on tau challenges the defender may yield the inititiative back to the attacker directly
98 99 100 101 102 103 104 105
        val newSimulationAnswerTauResolves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (newDefenderSimulationNodes
              .filter(_._1 == TAU)
              join attackerNodes) // ??
              .where(1,2).equalTo(1,2)
          
        // every attacker node can be the entry or exit of a coupling challenge
        val newCouplingChallengesEntrys: DataSet[((Action, Int, Int), (Action, Int, Int))]  =
106
          newAttackerNodes map (an => (an, (COUPLING, an._2, an._3)))
107 108 109 110 111 112 113 114 115 116
          
        val newDefenderCouplingNodes: DataSet[(Action, Int, Int)] = deltaNodes.filter(n => n._1 == COUPLING)
          
        val newCouplingChallengesExits: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (newDefenderCouplingNodes join attackerNodes).where(1,2).equalTo(2,1)// map (cn => (cn, (ATTACK, cn._3, cn._2)))) //join attackerNodes).where(1).equalTo(2)
        
        // during a coupling challenge, the defender may move with tau steps on the right-hand side.
        val newCouplingMoves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
          (newDefenderCouplingNodes join tauSteps)
          .where(2/*q*/).equalTo(0/*src*/) (new JoinFunction[(Action, Int, Int), (Int, Int, Action), ((Action, Int, Int), (Action, Int, Int))] {
117
            def join(cn: (Action, Int, Int), edge: (Int, Int, Action)): ((Action, Int, Int), (Action, Int, Int)) = {
118 119 120 121 122 123 124 125 126 127 128 129 130
              (cn, (COUPLING, cn._2, edge._2))
            }
        })
          
        val newGameMoves = newSimulationChallenges union
          newSimulationWeakSteps union
          newSimulationAnswers union
          newSimulationAnswerTauResolves union
          newCouplingChallengesEntrys union
          newCouplingChallengesExits union
          newCouplingMoves
          
        val reallyNewGameMoves = (newGameMoves coGroup discoveredMoves)
131 132 133 134
          .where(0,1).equalTo(0,1).apply(fun = {
              (mv1: Iterator[((Action, Int, Int), (Action, Int, Int))],
              mv2: Iterator[((Action, Int, Int), (Action, Int, Int))],
              out: Collector[((Action, Int, Int), (Action, Int, Int))]) =>
135 136 137 138 139 140 141 142
            if (mv2.isEmpty) {
              for (nm <- mv1) out.collect(nm)
            }
        })

        (reallyNewGameMoves, reallyNewGameMoves)
    }
    
143
    val gameNodes = attackerNodes
144 145 146 147 148
      
    (gameNodes, gameMoves)
  }
  
}