Reactive all the things

@mgonto


@benlesh

@mgonto

software developer

Auth0 Inc.

Javascript

Developer Advocate

auth0.com/signup

@benlesh

- Great culture (no really)

- Huge fans of Reactive Programming and RxJS

- We love developers with framework experience

http://jobs.netflix.com

Reactive Frontend

Functional Reactive Programming?

- This talk is NOT about FRP

- This talk is about integrating Functional Programming

- This talk is about Reactive Programming

- FRP is a different animal

Why go functional?

I have nice, familiar, imperative code:

function oddlyExcited(nums) {
  var result = [];
  nums.forEach(function(n) {
    if(n % 2 === 1) {
      result.push(n + '!!!');
    }
  });
  return result;
}
          

JavaScript, as you know it, is going to change

Cores aren't getting much faster

(Now you can't scale by waiting a month)
(source: csgillespie.wordpress.com)

Real concurrency is coming to JavaScript

(but not yet)

Learn Functional Programming

- immutable state

- no side effects

- work could easily be scaled across threads

(* pun!)

Array map(), filter(), reduce()

function oddlyExcited(nums) {
  return nums.filter(function(x) {
    return x % 2 === 1;
  }).map(function(x) {
    return x + '!!!';
  });
}
          
working on my app at Netflix, I encountered a problem with this approach

Argus

- dozens of graphs

- realtime multiplexed data over WebSockets

- lots of rich user interactions

(It's an Ember app)

The first big demo

- Feeling confident

- Created reusable, composable graphs

- Got all of our arrays of real-time data scrubbed into graphs

- We got our mouse interactions working

- Used test data that mirrored production data volume

But we used production data for the demo

Too much Array map, filter, reduce

- iterates over the entire array at each step

- creates new arrays at each step

- those intermediary arrays need to be garbage collected

I need stream processing!

RxJS Observables enable that
array.filter(x => x % 2 === 1).map(x => x + '!!!');
observable.filter(x => x % 2 === 1).map(x => x + '!!!');

What else can Observables do?

Observables

any collection of values

any amount of time

Observables

merged, concatenated and zipped

collection

Observables are a pattern to

- Start a data stream

- Emit 0 to N messages

- Teardown the data stream

What are "data streams?"

- Arrays of data

- Mouse/Keyboard Interactions

- DOM Events

- Network I/O (e.g. Ajax, WebSockets)

- Animations

- Speech Recognition

- Joystick Input

- Anything, really

Meanwhile, back at Netflix...

Sockets Die

When our users closed the lid and walked away, or lost network connectivity, the socket would disconnect.

multiplexed socket reconnection

- Need to resend real-time data subscription requests

- That means maintaining state on each recent request

- Lots of complicated logic in our event handlers

Socket Observable

- connect the socket on subscription

- emit all messages that arrive on the socket

- error the observable on errors and bad closes

- diconnect the socket on disposal

Multiplexed Data Observable

- sends sub request on subscription

- mapped from the socket observable

- filters to only pertinent messages

- sends an unsub request on disposal

Observables can retry!

// a socket that is singleton
var socket = Rx.DOM.fromWebSocket('ws://socket-server.com').
singleInstance();

function getMultiplexData(id) {
  return Observable.create((observer) => {
    socket.onNext(JSON.stringify({ id: id, type: 'sub' }));

    var disposable = socket.map(e => JSON.parse(e.data)).
      filter(d => d.id === id).
      forEach(observer);

    return function(){
      socket.onNext(JSON.stringify({ id: id, type: 'unsub' }));
      disposable.dispose();
    }
  }).
  retry(10);
}
          

Fair Warning

- RxJS has a decent learning curve

- You need to change how you think about problems

- There are a lot of operators to learn

- sometimes sync, sometimes async

Reactive Programming

var c = a + b;

// Do something with c

Reactive Programming

var cStream = Observable.combineLatest(aStream, bStream, function(a, b) {
  return a + b
});

cStream.forEach(function(c) {
  // Do something with c
});

In Angular 2.X, Observables will be first class

But how can we use this today?

Using RxJS: Reactive Extension for Javascript

In this case, we'll use the Angular Toolkit

Let's start simple :)

Let's imagine a counter app

Current counter value {{counter}}
<input type="button" value="Click me to increase counter"
ng-click="increaseCounter()" />
<span>Current counter value {{counter}}</span>
<input type="button" value="Click me to increase counter"
ng-click="increaseCounter()" />
<span>Current counter value {{counter}}</span>
angular.module('counter', []);

angular.module('counter').controller('MainCtrl',
  function(ApiServer, $scope) {
      $scope.counter = 0;

      $scope.increaseCounter = function() {
        ApiServer.getCounterAmount(new Date())
          .then(function(newCount) {
            return ApiServer.logCounter(newCount).then(function() {
              return newCount;
            });
          })
          .then(function(newCount) {
            $scope.counter = $scope.counter + newCount;
          });
      }
});

How we'd regularly do it

angular.module('counter', ['rx']);

angular.module('counter').controller('MainCtrl',
  function(ApiServer, rx, $scope) {
      var disposable = $scope.$createObservableFunction('increaseCounter')
      .flatMap(function() {
          return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
      })
      .do(ApiServer.logCounter)
      .scan(function(acc, val) {return acc + val;})
      .subscribe(function(counter) {
          $scope.counter = counter;
          console.log("Total counter is", counter);
      }, function(error) {
          console.error("There was an error");
      });

      $scope.$on('$destroy', function() {
        disposable.dispose();
      })
});

We depend on rx module

angular.module('counter', ['rx']);

angular.module('counter').controller('MainCtrl',
  function(ApiServer, rx, $scope) {
      var disposable = $scope.$createObservableFunction('increaseCounter')
      .flatMap(function() {
          return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
      })
      .do(ApiServer.logCounter)
      .scan(function(acc, val) {return acc + val;})
      .subscribe(function(counter) {
          $scope.counter = counter;
          console.log("Total counter is", counter);
      }, function(error) {
          console.error("There was an error");
      });

      $scope.$on('$destroy', function() {
        disposable.dispose();
      })
});

We get the clicks stream

angular.module('counter', ['rx']);

angular.module('counter').controller('MainCtrl',
  function(ApiServer, rx, $scope) {
      var disposable = $scope.$createObservableFunction('increaseCounter')
      .flatMap(function() {
          return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
      })
      .do(ApiServer.logCounter)
      .scan(function(acc, val) {return acc + val;})
      .subscribe(function(counter) {
          $scope.counter = counter;
          console.log("Total counter is", counter);
      }, function(error) {
          console.error("There was an error");
      });

      $scope.$on('$destroy', function() {
        disposable.dispose();
      })
});

For each new value we get the amount to sum

angular.module('counter', ['rx']);

angular.module('counter').controller('MainCtrl',
  function(ApiServer, rx, $scope) {
      var disposable = $scope.$createObservableFunction('increaseCounter')
      .flatMap(function() {
          return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
      })
      .do(ApiServer.logCounter)
      .scan(function(acc, val) {return acc + val;})
      .subscribe(function(counter) {
          $scope.counter = counter;
          console.log("Total counter is", counter);
      }, function(error) {
          console.error("There was an error");
      });

      $scope.$on('$destroy', function() {
        disposable.dispose();
      })
});

As a side effect, we log each of them

angular.module('counter', ['rx']);

angular.module('counter').controller('MainCtrl',
  function(ApiServer, rx, $scope) {
      var disposable = $scope.$createObservableFunction('increaseCounter')
      .flatMap(function() {
          return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
      })
      .do(ApiServer.logCounter)
      .scan(function(acc, val) {return acc + val;})
      .subscribe(function(counter) {
          $scope.counter = counter;
          console.log("Total counter is", counter);
      }, function(error) {
          console.error("There was an error");
      });

      $scope.$on('$destroy', function() {
        disposable.dispose();
      })
});

We sum each new value

angular.module('counter', ['rx']);

angular.module('counter').controller('MainCtrl',
  function(ApiServer, rx, $scope) {
      var disposable = $scope.$createObservableFunction('increaseCounter')
      .flatMap(function() {
          return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
      })
      .do(ApiServer.logCounter)
      .scan(function(acc, val) {return acc + val;})
      .subscribe(function(counter) {
          $scope.counter = counter;
          console.log("Total counter is", counter);
      }, function(error) {
          console.error("There was an error");
      });

      $scope.$on('$destroy', function() {
        disposable.dispose();
      })
});

We subscribe to listen to changes and handle ALL errors

angular.module('counter', ['rx']);

angular.module('counter').controller('MainCtrl',
  function(ApiServer, rx, $scope) {
      var disposable = $scope.$createObservableFunction('increaseCounter')
      .flatMap(function() {
          return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
      })
      .do(ApiServer.logCounter)
      .scan(function(acc, val) {return acc + val;})
      .subscribe(function(counter) {
          $scope.counter = counter;
          console.log("Total counter is", counter);
      }, function(error) {
          console.error("There was an error");
      });

      $scope.$on('$destroy', function() {
        disposable.dispose();
      })
});

We dispose the observable

Clicking on a counter are very few events

Let's actually try with more events!

<moving-text class="moving-text" text="ngConf is awesome!!!"></moving-text>
angular.module('mover').directive('movingText', function() {
    return {
        restrict: 'E',
        replace: 'true',
        templateUrl: '/js/movingText.html',
        scope: {
            text: '@'
        },
        controller: MovingController
    }
});
<div ng-mousemove="mouseMoved()">
  <div class="text-container">
    <span ng-repeat="letter in letters" ng-style="{top: letter.top, left: letter.left}" style="position: absolute;">
      {{letter.text}}
    </span>
  </div>
</div>
<div ng-mousemove="mouseMoved()">
  <div class="text-container">
    <span ng-repeat="letter in letters" ng-style="{top: letter.top, left: letter.left}" style="position: absolute;">
      {{letter.text}}
    </span>
  </div>
</div>
function directiveController() {
  $scope.letters = [];

   var mouseMoved = $scope.$createObservableFunction('mouseMoved')
      .map(function (e) {
        var offset = getOffset(textContainer);
        return {
          offsetX : e.clientX - offset.left,
          offsetY : e.clientY - offset.top
        };
      })
      .flatMap(function(delta) {
        return rx.Observable.fromArray(_.map($scope.text, function(letter, index) {
          return {
            letter: letter,
            delta: delta,
            index: index
          };
        }));
      })
      // ...
          
function directiveController() {
  $scope.letters = [];

   var mouseMoved = $scope.$createObservableFunction('mouseMoved')
      .map(function (e) {
        var offset = getOffset(textContainer);
        return {
          offsetX : e.clientX - offset.left,
          offsetY : e.clientY - offset.top
        };
      })
      .flatMap(function(delta) {
        return rx.Observable.fromArray(_.map($scope.text, function(letter, index) {
          return {
            letter: letter,
            delta: delta,
            index: index
          };
        }));
      })
      // ...
          
// ...
  .flatMap(function(letterConfig) {
    return rx.Observable.timer(letterConfig.index * 100)
        .map(function() {
          return {
            text: letterConfig.letter,
            top: letterConfig.delta.offsetY,
            left: letterConfig.delta.offsetX + letterConfig.index * 20 + 15,
            index: letterConfig.index
          };
      });
  })
  .subscribe(function(letterConfig) {
    $scope.letters[letterConfig.index] = letterConfig;
  });

 $scope.$on('$destroy', function(){
   mouseMoved.dispose();
 });
}
// ...
  .flatMap(function(letterConfig) {
    return rx.Observable.timer(letterConfig.index * 100)
        .map(function() {
          return {
            text: letterConfig.letter,
            top: letterConfig.delta.offsetY,
            left: letterConfig.delta.offsetX + letterConfig.index * 20 + 15,
            index: letterConfig.index
          };
      });
  })
  .subscribe(function(letterConfig) {
    $scope.letters[letterConfig.index] = letterConfig;
  });

 $scope.$on('$destroy', function(){
   mouseMoved.dispose();
 });
}
// ...
  .flatMap(function(letterConfig) {
    return rx.Observable.timer(letterConfig.index * 100)
        .map(function() {
          return {
            text: letterConfig.letter,
            top: letterConfig.delta.offsetY,
            left: letterConfig.delta.offsetX + letterConfig.index * 20 + 15,
            index: letterConfig.index
          };
      });
  })
  .subscribe(function(letterConfig) {
    $scope.letters[letterConfig.index] = letterConfig;
  });

 $scope.$on('$destroy', function(){
   mouseMoved.dispose();
 });
}

This is just the beginning

Let's react to everything!

T-Shirt time

Thank you for listening

links.gon.to/ngconf-ama

Reactive All the things

@mgonto


@benlesh