Usuwanie przedawnionych danych z mapy przy pomocy RxScala

 Początkowo planowałem wykorzystać do realizacji tytułowego celu framework Akka Streams. Okazało się jednak, że w zestawie wbudowanych operatorów brakuje odpowiednika switchMap z RxJava/RxScala. Jest on o tyle ważny, że stanowi kluczową część zaproponowanego przeze mnie algorytmu.

 Postanowiłem zacząć od rozszerzenia definicji interfejsu struktury, która faktycznie przechowuje nasze dane, tak aby istniała możliwość przejrzenia jej wszystkich elementów:

trait SimpleMap[Id,Val] extends Traversable[(Id,Val)]{
    /** Gets a key from a map
      *  @param aKey map's key
      *  @return Some(Value) if map contains aKey or None otherwise
      */
    def get(aKey:Id):Option[Val]

    /** Puts a value under a specified key
      *  @param aKey a key under which aVal should be available
      *  @param aVal a value to associate with aKey
      *  @return Some(Value) if aKey was already associate with other value or None otherwise
      */
    def put(aKey:Id,aVal:Val): Option[Val]

    /** Remove a specified key and a value associated with it
      *  @param aKey a key which should be removed
      *  @return Some(Value) if aKey was associate with value or None otherwise
      */
    def remove(aKey:Id): Option[Val]
  }

Wykorzystałem w tym celu standardowy interfejs z bibliotek scala Traversable. Nowa funkcjonalność umożliwi nam przeiterowanie po elemantach kolekcji i znalezienie danych, które najszybciej ulegną przedawnieniu. ExpirableMap zbudowane na bazie, utworzonej wcześniej, ConcurrentMap wygląda następująco:

class ExpirableMap[Id,Val](
  aMap    : SimpleMap[Id,(DateTime,Val)],
  aContext: ExecutionContext
) extends ConcurrentMap[Id,(DateTime,Val)](aMap, aContext) with AutoCloseable {

  implicit def toDuration(aTime: DateTime) = {
    if(DateTime.now >= aTime) Zero
    else Try{(aTime.getMillis - DateTime.now.getMillis) millis}.getOrElse(365 days)
  }

  protected val mMaxTime      = new DateTime(Long.MaxValue)
  protected val mSubject      = BehaviorSubject[DateTime](mMaxTime)
  protected val mScheduler    = ExecutionContextScheduler(aContext)
  protected val mSubscription =
    mSubject
      .slidingBuffer(2,1)
      .filter{
        case h1+:h2+:_ if h2 != h1 => true
        case _                     => false
      }.switchMap{
      case _+:h2+:_ => timer{h2}.map{_=>h2}
      case _        => empty
    }.subscribeOn(mScheduler)
      .observeOn(mScheduler)
      .subscribe{x=>
        aMap.filterNot{_._2._1 > x}
          .foldLeft(().point[Result]){
            case (acc,(id,_))=> for {
              _<-acc
              _<-remove(id)
            } yield ()
          } |> run
      }

  override def run[T](aAction: Result[T]): Future[T] = super.run(aAction).andThen{
    case f@Failure(_) => f
    case s@Success(_) =>
      if(aMap.nonEmpty) mSubject onNext {aMap.map{_._2._1}.minBy(_.millis)}
      s
  } (aContext)

  override def close(): Unit = mSubscription.unsubscribe()
}

 Pierwszą modyfikacją, na którą warto zwrócić uwagę, jest metoda run[T]. W przypadku, gdy operacje na ExpirableMap zakończą się sukcesem, przeszukujemy listę zapisanych wartości w celu znalezienia tej posiadającej najbliższy termin przedawnienia. Następnie wartość tę emitujemy do strumienia utworzonego przy pomocy RxScala. Na naszym strumieniu danych używamy operatora slidingBuffer(2,1), dzięki któremu mamy dostęp do dwóch ostatnich dat. Możemy teraz sprawdzić, czy nowa data przedawnienia jest inna niż poprzednia. Jeśli tak to tworzymy licznik, który wygeneruje zdarzenie w odpowiednim momencie. Dzięki operatorowi switchMap po wygenerowaniu nowego licznika, stary zostanie zatrzymany. Stosując operator subscribeOn, observeOn zapewniamy, że kod przekazany do funkcji subscribe wykona się na tym samym wątku, co operacje wywołane poprzez metodę run. Jest to bardzo istotne, ponieważ w sekcji subscribe przeglądamy strukturę, która bezpośrednio przechowuje nasze dane. Elementy te usuwamy stosując interfejs run, w ten sposób zapewnimy przekazanie nowej daty przedawnienia do naszego strumienia.

 W zasadzie nasza kompaktowa implementacja jest “prawie” dobra. Nie uwzględnia ona jednego przypadku: data przedawnienia nowego elementu jest identyczna jak poprzedniego i dodatkowo owa data już minęła. W takim przypadku każdy użytkownik oczekiwałby, że dodanie przedawnionego elementu spowoduje jego natychmiastowe usunięcie. Niestety, wyeliminowanie tego niedopatrzenia przy zachowaniu jednocześniej dobrej wydajności, generuje dość skomplikowaną implementację:

    mSubject
      .slidingBuffer(2,1)
      .scan(Observable.empty: Observable[DateTime]){
        case (o,h1+:h2+:_) if h2 != h1 => timer{h2}.merge{never}.map{_=>h2}.replay.refCount
        case (o,_)                     => o
     }.switchMap{x=>using(x.subscribe)(_=>just(x),_.unsubscribe)}
      .switchMap{x=>x}
      .subscribeOn(mScheduler)
      .observeOn(mScheduler)
      .subscribe{x=>
        aMap.filterNot{_._2._1 > x}
            .foldLeft(().point[Result]){
              case (acc,(id,_))=> for {
                _<-acc
                _<-remove(id)
              } yield ()
            } |> run
      }

 Operator scan wykorzystujemy do zapamiętania licznika. Nowo utworzony licznik, który też jest strumieniem, łączymy operatorem merge ze strumieniem never, nie generuje on żadnych zdarzeń, ale jednocześnie nigdy się nie kończy. Dzięki temu strumień licznika nigdy nie zostaje zamknięty. Operator replay zapewnia, że nowi obserwatorzy strumienia otrzymają zdarzenie licznika, nawet jeżli zadana data już minęła. Z kolei operator refCount umożliwi zwolnienie zasobów związanych z licznikiem, gdy już nie będzie on przez nikogo obserwowany. Tym razem zastosowaliśmy switchMap dwa razy, jest to pewna sztuczka. switchMap jest obserwatorem wewnętrznego stumienia – kiedy przychodzą nowe dane, przestaje on obserwować stary strumień i przestawia się na nowy. W tym wypadku liczba odniesień do naszego licznika wynosiłaby zero i zegar zostałby wyzerowany. Przy pomocy operatora using i switchMap tworzymy jedno dodatkowe odwołanie do licznika.

Tworzenie tablicy asocjacyjnej bezpiecznej wątkowo

 Jednym z zadań serwera dla gry CatChow będzie tworzenie pokojów, w których gracze będą mogli dobierać się w drużyny. W tym celu z każdym użytkownikiem należy powiązać identyfikator, który musi być unikatowy dla wszystkich aktualnie dostępnych graczy. Wedle moich zamierzeń, gra nie będzie wymagała rejestracji, więc nie ma potrzeby stosowania bazdy danych.

Rozważany przypadek użycia wygląda następująco:

  • Gracz odwiedza witrynę CatChow
  • Następuje sprawdzenie, czy gracz nie posiada aktywnej sesji z serwerem
  • Serwer wyświetla okno dialogowe w celu wprowadzenia nowego identyfikatora
  • Następuję weryfikacja unikalności identyfikatora
  • Następuje przejście do witryny pokoi gry
  • 2a. Gracz posiada aktywną sesję z serwerem
    • 2a1. Przejdź do kroku 5
  • 4a. Identyfikator jest już zajęty
    • 4a1. Wyświetl komunikat o konieczności wyboru innego identyfikatora
    • 4a2. Przejdź do kroku 3

 W celu rozwiązania tego problemu postanowiłem wykorzystać tablicę asocjacyjną, której kluczami będą identyfikatory graczy. Istotnym aspektem jest tutaj przejście między krokiem 3 i 4, jeśli jednocześnie kilku użytkowników próbowałoby użyc tego samego identyfikatora, mogłoby wystąpić zjawisko tzw. wyścigu wątków. Można w tym celu po prostu wykonywać te operacje na jednym wątku. W języku scala mogłoby wyglądać to mniej więcej tak:

def storeUserId(aId: String) = Future {
    
    if(mMap.get(aId).isDefined)
        throw new Exception()
    else mMap.put(aId, new SomeUserData())

} (MapStorageExecutionContext)

Mimo wszystko, bardziej elegancko wyglądałoby zamknięcie tego w pojedyńczą strukturę np.:

mConcurrentMap.run{(m:Map[String,SomeUserData]) => 
    if(m.get(aId).isDefined)
        throw new Exception()
    else m.put(aId, new SomeUserData()) 
}

 Załóżmy jednak, że przekazaliśmy program innemu deweloperowi do utrzymania. Nic nie stoi na przeszkodzie, żeby napisał on kod:

mConcurrentMap.run{(m:Map[String,SomeUserData]) => 
    if(m.get(aId).isDefined)
        throw new Exception()
    else Future{ 
        m.put(aId, new SomeUserData())
    } (SomeExecutionContext)    
}

 W celu zabezpieczenia się przed niepoprawnym użyciem naszej struktury można posłużyć się monadą Reader z biblioteki ScalaZ. Monadę Reader[T,U] można interpretować po prostu jako funkcję z T->U, jednak podniesienie jej do typu monadycznego umożliwia zostosowanie jej w tzw. for comprehension.

object ConcurrentMap {
  trait SimpleMap[Id,Val] {
    /** Gets a key from a map
      *  @param aKey map's key
      *  @return Some(Value) if map contains aKey or None otherwise
      */
    def get(aKey:Id):Option[Val]

    /** Puts a value under a specified key
      *  @param aKey a key under which aVal should be available
      *  @param aVal a value to associate with aKey
      *  @return Some(Value) if aKey was already associate with other value or None otherwise
      */
    def put(aKey:Id,aVal:Val): Option[Val]

    /** Remove a specified key and a value associated with it
      *  @param aKey a key which should be removed
      *  @return Some(Value) if aKey was associate with value or None otherwise
      */
    def remove(aKey:Id): Option[Val]
  }
  type Action[Id,Val,T] = Reader[SimpleMap[Id,Val],T]

  def get[Id,Val](aKey:Id): Action[Id,Val,Option[Val]] =
    Reader(m=>m.get(aKey))

  def put[Id,Val](aKey:Id,aVal:Val): Action[Id,Val,Option[Val]] =
    Reader(m=>m.put(aKey,aVal))

  def remove[Id,Val](aKey:Id): Action[Id,Val,Option[Val]] =
    Reader(m=>m.remove(aKey))


  def apply[Id,Val](aMap: SimpleMap[Id,Val])(implicit aContext: ExecutionContext) =
    new ConcurrentMap[Id,Val](aMap,aContext)
}


class ConcurrentMap[Id,Val](aMap:SimpleMap[Id,Val], aContext: ExecutionContext) {
  type Result[T] = Action[Id, Val, T]

  def run[T](aAction: Result[T]): Future[T] = Future {
    aAction.run(aMap)
  }(aContext)
}

Teraz ConcurrentMap może być stosowana w następujący sposób:

def storeUserId(aId: String) = mConcurrentMap.run{
  for{
    u<-get(aId)
    _<-if(u.isDefined) throw new Exception() else put(aId, new SomeUserData())
  } yield u
}

Pierwsze kroki – Przygotowanie projektu wykorzystującego Play Framework

 Jak to zazwyczaj bywa, podczas tworzenia back-endu należy zdecydować się na framework, który ułatwi tworzenie implementacji. Ze względu na to, że w projekcie postanowiłem wykorzystać język Scala, wybór był oczywisty – Play Framework (będę go nazywał skrótowo PF). W zasadnie nie ma on w tej chwili żadnych poważnych konkurentów oraz warto dodać, że oficjalna strona tego frameworka zawiera imponującą liczbę tutoriali.

 IDE IntelliJ posiada wbudowaną obsługę frameworku Play oraz zawiera kilka szablonów projektów, które go wykorzystują. Można więc praktycznie natychmiastowo rzucić się w wir pracy.

 Pierwszą kwestią, jaką postanowiłem się zająć, był wybór biblioteki do Dependency Injection. PF domyślnie wykorzystuje Guice, osobiście nie jestem jednak fanem adnotacji, czasem znajdują się one w kodzie, który nie powinien mieć świadomości istnienia DI. Moim osobistym faworytem jest Autofac, niestety jest on dostępny jedynie dla języków programowania platformy .NET. Po chwili szukania mój wybór padł na Scaldi, wykorzystuje on DSL, który jest bardzo czytelny oraz posiada wsparcie injekcji aktorów frameworku Akka, który zamierzam wykorzystać.

 Pojawił się jednak pewien problem – istnieje wprawdzie plugin, który umożliwia integrację PF i Scaldi, jednak (na chwilę obecną) nie istnieje wersja kompatybilna ze Scala 2.12. Postanowiłem spiąć PF i Scaldi własnoręcznie, strona PF okazała się tutaj bardzo pomocna, cała implementacja nie była specjalnie skomplikowana, to duży plus. Scaldi umożliwia tworzenie modułów z dependencjami, a następnie łączenie za pomocą operatora ‘::’ lub ‘++’. Jest to bardzo przydatna funkcjonalność; jeśli jakaś dependencja nie zostanie rozwikłana w jednym z modułów, to następują próby rozstrzygnięcia jej w pozostałych modułach. Jeśli jakieś wiązanie jest zdefiniowane w kilku modułach, to zostanie wykorzystane pierwsze z nich (kolejność kompozycji modułów ma więc znaczenie). Zachęcony ową funkcjonalnością, utworzyłem dwa moduły:

  • BaseAppModule – zawiera wiązania do typowych obiektów aplikacji utworzonej w PF (kontekst aplikacji, konfiguracja http itd.)
  • AppModule – zawiera wiązania do obiektów specyficznych dla mojej aplikacji (np. kontrolery, modele itd.)

 Nie obyło się bez drobnych problemów, ale ostatecznie wszystko zadziałało. Teraz można już zabrać się do dalszej pracy.