This GitLab instance reached the end of its service life. It won't be possible to create new users or projects.

Please read the deprecation notice for more information concerning the deprecation timeline

Visit migration.git.tu-berlin.de (internal network only) to import your old projects to the new GitLab platform 📥

SimpleGameDecider.scala 3.14 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
package de.bbisping.coupledsim.flink

import scala.reflect.ClassTag

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.graph.scala.Graph
import org.apache.flink.types.NullValue
import org.apache.flink.graph.spargel.ScatterGatherConfiguration
import org.apache.flink.graph.spargel.ScatterFunction
import org.apache.flink.types.LongValue
import org.apache.flink.graph.EdgeDirection
import org.apache.flink.graph.Vertex
import org.apache.flink.graph.spargel.MessageIterator
import org.apache.flink.graph.spargel.GatherFunction

class SimpleGameDecider {
  
  import CoupledSimulationFlink.Action
  
  def compute(
      gameGraph: Graph[(CoupledSimulationFlink.Action, Int, Int), Int, NullValue],
      env: ExecutionEnvironment): Graph[(CoupledSimulationFlink.Action, Int, Int), Int, NullValue] = {
25 26
    implicit val tupleType: TypeInformation[((Action, Int, Int), Int)] = TypeInformation.of(classOf[((Action, Int, Int), Int)])
    implicit val tupleClassTag: ClassTag[Nothing] = ClassTag.apply(classOf[((Action, Int, Int), Int)])
27 28 29
    
    val successorCount = gameGraph.outDegrees().map(
        (nodeOut: ((Action, Int, Int), LongValue)) => nodeOut match {
30
        case (v: (Action, Int, Int), d: LongValue) =>
31 32 33 34 35 36 37 38 39 40
          val num = d.getValue.toInt
          (v, if (num == 0 && v._1 != CoupledSimulationGame.ATTACK) SimpleGameDecider.ATTACKER_WIN_MAGIC_NUMBER else num)
      }
    )
    
    val winningRegionComputationGraph: Graph[(Action, Int, Int), Int, NullValue] =
      Graph.fromTupleDataSet(successorCount, gameGraph.getEdgesAsTuple3, env)
        
    val propagate = new ScatterFunction[(Action, Int, Int), Int, Int, NullValue] {
      
41
    	override def sendMessages(vertex: Vertex[(Action, Int, Int), Int]): Unit = {
42 43 44 45 46 47 48 49 50 51 52 53
    	  if (vertex.getValue == SimpleGameDecider.ATTACKER_WIN_MAGIC_NUMBER ) {
    	    val edges = getEdges.iterator()
    	    while (edges.hasNext) {
    	      val edge = edges.next
		      	sendMessageTo(edge.getSource, 1)
    	    }
    	  }
		  }
    }
    
    val propagateGather = new GatherFunction[(Action, Int, Int), Int, Int] {
      
54
    	override def updateVertex(vertex: Vertex[(Action, Int, Int), Int], inMessages: MessageIterator[Int]): Unit = {
55 56 57 58 59
    		var value = vertex.getValue
    		
    		if (value != SimpleGameDecider.ATTACKER_WIN_MAGIC_NUMBER) {
    		  var count = 0
      		while (inMessages.hasNext) {
60
      		  inMessages.next
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
      		  count += 1
      		}
      		
      		if (count > 0 && vertex.getId._1 == CoupledSimulationGame.ATTACK || count >= value ) {
      		  value = SimpleGameDecider.ATTACKER_WIN_MAGIC_NUMBER
      		} else {
      		  value -= count
      		}
      		
      		if (value < vertex.getValue) {
      		  setNewVertexValue(value)
      		}
    		}
    	}
    }
    
    val propagateCfg = new ScatterGatherConfiguration
    propagateCfg.setDirection(EdgeDirection.IN)
    
    winningRegionComputationGraph
      .runScatterGatherIteration(propagate, propagateGather, CoupledSimulationFlink.MAX_ITERATIONS, propagateCfg)
  }
  
}

object SimpleGameDecider {
87
  val ATTACKER_WIN_MAGIC_NUMBER: Int = -1
88
}