This GitLab instance reached the end of its service life. It won't be possible to create new users or projects.

Please read the deprecation notice for more information concerning the deprecation timeline

Visit migration.git.tu-berlin.de (internal network only) to import your old projects to the new GitLab platform 📥

CoupledSimulationGame.scala 5.61 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
package de.bbisping.coupledsim.flink

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.DataSet
import org.apache.flink.graph.scala.Graph
import org.apache.flink.types.NullValue
import de.bbisping.coupledsim.util.Coloring
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.functions.JoinFunction


class CoupledSimulationGame {
  
  import CoupledSimulationFlink.Action
  import CoupledSimulationGame._
  
  type Signature = Set[(Coloring.Color, Coloring.Color)]
  
  def compute(
      ts: Graph[Int, NullValue, CoupledSimulationFlink.Action],
      signaturesOpt: Option[DataSet[(Int, Set[(Coloring.Color, Coloring.Color)])]],
      TAU: CoupledSimulationFlink.Action)
  : (DataSet[(CoupledSimulationFlink.Action, Int, Int)],
     DataSet[((CoupledSimulationFlink.Action, Int, Int), (CoupledSimulationFlink.Action, Int, Int))]) = {
    
    val possibleAttackerNodes: DataSet[(Action, Int, Int)] =
      (ts.getVertexIds cross ts.getVertexIds) {
        (p, q) => (ATTACK, p, q)
    }
    
    val attackerNodes: DataSet[(Action, Int, Int)] = signaturesOpt match {
      case Some(signatures) =>
        // only generate attacker nodes where there is a chance of the defender winning
        (signatures cross signatures) flatMap new FlatMapFunction[((Int, Signature), (Int, Signature)), (Action, Int, Int)] {
          def flatMap(pqSig: ((Int, Signature), (Int, Signature)),
38
              out: Collector[(Action, Int, Int)]): Unit = pqSig match {
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
            case ((p, pSig), (q, qSig)) =>
              if (pSig.size <= qSig.size && (pSig subsetOf qSig)) {
                out.collect((ATTACK, p, q))
              }
          }
        }
      case None =>
        possibleAttackerNodes
    }
    
    val simulationChallenges: DataSet[((Action, Int, Int), (Action, Int, Int))]  =
      (ts.getEdgesAsTuple3() join possibleAttackerNodes) // ? 
      .where(0/*src*/).equalTo(1/*p*/) { (edge, an) =>
        (an, (edge._3/*a*/, edge._2/*tar*/, an._3 /*q*/))
    }
    
    val defenderSimulationNodes: DataSet[(Action, Int, Int)] =
      ((simulationChallenges flatMap new FlatMapFunction[((Action, Int, Int), (Action, Int, Int)), (Action, Int, Int)] { 
        def flatMap(simChallenge: ((Action, Int, Int), (Action, Int, Int)),
58 59
              out: Collector[(Action, Int, Int)]): Unit = simChallenge match {
          case (_, rhs) =>
60 61 62 63 64 65
            out.collect(rhs)
            //out.collect((TAU, rhs._2, rhs._3))
        }
      })
      union (possibleAttackerNodes map (an => (TAU, an._2, an._3)))
      ).distinct()
66

67 68
    // only allow "real" (non-stuttering) tau-steps (because otherwise this could be used
    // by the defender to go into infinite loops and win) (we assume that tau cycles have been compressed)
69 70 71
    val tauSteps: DataSet[(Int, Int, Action)] = ts.getEdgesAsTuple3() filter new FilterFunction[(Int, Int, Action)] {
      def filter(edge: (Int, Int, Action)): Boolean = edge match {
        case (p0, p1, a) => a == TAU && p0 != p1
72 73 74 75 76 77 78 79 80 81 82 83 84 85
      }
    }
    
    // the simulation answer can be postponed by internal steps on the right hand side
    val simulationWeakSteps: DataSet[((Action, Int, Int), (Action, Int, Int))] =
      (defenderSimulationNodes join tauSteps)
      .where(2/*q*/).equalTo(0/*p0*/) { (dn, edge) =>
        (dn, (dn._1, dn._2, edge._2))
    }
    
    // at some point the defender has to decide that this is the right place to perform the visible action
    val simulationAnswers: DataSet[((Action, Int, Int), (Action, Int, Int))] =
      (defenderSimulationNodes join ts.getEdgesAsTuple3())
      .where(2/*q*/,0/*a*/).equalTo(0/*src*/,2/*a*/) (new JoinFunction[(Action, Int, Int), (Int, Int, Action), ((Action, Int, Int), (Action, Int, Int))] {
86
        def join(dn: (Action, Int, Int), edge: (Int, Int, Action)): ((Action, Int, Int), (Action, Int, Int)) = {
87 88 89 90 91 92 93 94
          (dn, (TAU, dn._2, edge._2))
        }
      })
    
    // afterwards (or directly on tau challenges) the defender may yield the intitiative back to the attacker
    val simulationAnswerTauResolves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
      (defenderSimulationNodes
          .filter(new FilterFunction[(Action, Int, Int)] {
95
            def filter(challenge: (Action, Int, Int)): Boolean = challenge._1 == TAU})
96
       join attackerNodes) // ??
97 98
      .where(1,2).equalTo(1,2)

99 100 101 102
    // every attacker node can be the entry or exit of a coupling challenge
    val couplingChallengesEntrysExits: DataSet[((Action, Int, Int), (Action, Int, Int))]  =
      (possibleAttackerNodes map (an => (an, (COUPLING, an._2, an._3)))) union // ??
      (attackerNodes map (an => ((COUPLING, an._3, an._2), an))) // ????
103

104 105 106 107
    // during a coupling challenge, the defender may move with tau steps on the right-hand side.
    val couplingMoves: DataSet[((Action, Int, Int), (Action, Int, Int))] =
      (possibleAttackerNodes join tauSteps)
      .where(2/*q*/).equalTo(0/*src*/) (new JoinFunction[(Action, Int, Int), (Int, Int, Action), ((Action, Int, Int), (Action, Int, Int))] {
108
        def join(an: (Action, Int, Int), edge: (Int, Int, Action)): ((Action, Int, Int), (Action, Int, Int)) = {
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
          ((COUPLING, an._2, an._3), (COUPLING, an._2, edge._2))
        }
    })
    
    val gameNodes = attackerNodes union defenderSimulationNodes
    val gameMoves = simulationChallenges union
      simulationWeakSteps union simulationAnswers union simulationAnswerTauResolves union
      couplingChallengesEntrysExits union couplingMoves
      
    (gameNodes, gameMoves)
  }
  
}

object CoupledSimulationGame {
  val ATTACK: Long = -1
  val COUPLING: Long = -2
}