Reactive all the things
@mgonto
@benlesh
@mgonto
software developer
Auth0 Inc.
Javascript
Developer Advocate
@benlesh
- Great culture (no really)
- Huge fans of Reactive Programming and RxJS
- We love developers with framework experience
http://jobs.netflix.com
Reactive Frontend
Functional Reactive Programming?
- This talk is NOT about FRP
- This talk is about integrating Functional Programming
- This talk is about Reactive Programming
- FRP is a different animal
Why go functional?
I have nice, familiar, imperative code:
function oddlyExcited(nums) {
var result = [];
nums.forEach(function(n) {
if(n % 2 === 1) {
result.push(n + '!!!');
}
});
return result;
}
JavaScript, as you know it, is going to change
Cores aren't getting much faster
(Now you can't scale by waiting a month)
(source: csgillespie.wordpress.com)
Real concurrency is coming to JavaScript
(but not yet)
Learn Functional Programming
- immutable state
- no side effects
- work could easily be scaled across threads
Array map(), filter(), reduce()
function oddlyExcited(nums) {
return nums.filter(function(x) {
return x % 2 === 1;
}).map(function(x) {
return x + '!!!';
});
}
working on my app at Netflix, I encountered a problem with this approach
Argus
- dozens of graphs
- realtime multiplexed data over WebSockets
- lots of rich user interactions
(It's an Ember app)
The first big demo
- Feeling confident
- Created reusable, composable graphs
- Got all of our arrays of real-time data scrubbed into graphs
- We got our mouse interactions working
- Used test data that mirrored production data volume
But we used production data for the demo
Too much Array map, filter, reduce
- iterates over the entire array at each step
- creates new arrays at each step
- those intermediary arrays need to be garbage collected
I need stream processing!
RxJS Observables enable that
array.filter(x => x % 2 === 1).map(x => x + '!!!');
observable.filter(x => x % 2 === 1).map(x => x + '!!!');
What else can Observables do?
Observables
any collection of values
any amount of time
Observables
merged, concatenated and zipped
collection
Observables are a pattern to
- Start a data stream
- Emit 0 to N messages
- Teardown the data stream
What are "data streams?"
- Arrays of data
- Mouse/Keyboard Interactions
- DOM Events
- Network I/O (e.g. Ajax, WebSockets)
- Animations
- Speech Recognition
- Joystick Input
- Anything, really
Meanwhile, back at Netflix...
Sockets Die
When our users closed the lid and walked away, or lost network connectivity, the socket would disconnect.
multiplexed socket reconnection
- Need to resend real-time data subscription requests
- That means maintaining state on each recent request
- Lots of complicated logic in our event handlers
Socket Observable
- connect the socket on subscription
- emit all messages that arrive on the socket
- error the observable on errors and bad closes
- diconnect the socket on disposal
Multiplexed Data Observable
- sends sub request on subscription
- mapped from the socket observable
- filters to only pertinent messages
- sends an unsub request on disposal
Observables can retry!
// a socket that is singleton
var socket = Rx.DOM.fromWebSocket('ws://socket-server.com').
singleInstance();
function getMultiplexData(id) {
return Observable.create((observer) => {
socket.onNext(JSON.stringify({ id: id, type: 'sub' }));
var disposable = socket.map(e => JSON.parse(e.data)).
filter(d => d.id === id).
forEach(observer);
return function(){
socket.onNext(JSON.stringify({ id: id, type: 'unsub' }));
disposable.dispose();
}
}).
retry(10);
}
Fair Warning
- RxJS has a decent learning curve
- You need to change how you think about problems
- There are a lot of operators to learn
- sometimes sync, sometimes async
Reactive Programming
var c = a + b;
// Do something with c
Reactive Programming
var cStream = Observable.combineLatest(aStream, bStream, function(a, b) {
return a + b
});
cStream.forEach(function(c) {
// Do something with c
});
In Angular 2.X, Observables will be first class
But how can we use this today?
Using RxJS: Reactive Extension for Javascript
In this case, we'll use the Angular Toolkit
Let's start simple :)
Let's imagine a counter app
Current counter value {{counter}}
<input type="button" value="Click me to increase counter"
ng-click="increaseCounter()" />
<span>Current counter value {{counter}}</span>
<input type="button" value="Click me to increase counter"
ng-click="increaseCounter()" />
<span>Current counter value {{counter}}</span>
angular.module('counter', []);
angular.module('counter').controller('MainCtrl',
function(ApiServer, $scope) {
$scope.counter = 0;
$scope.increaseCounter = function() {
ApiServer.getCounterAmount(new Date())
.then(function(newCount) {
return ApiServer.logCounter(newCount).then(function() {
return newCount;
});
})
.then(function(newCount) {
$scope.counter = $scope.counter + newCount;
});
}
});
How we'd regularly do it
angular.module('counter', ['rx']);
angular.module('counter').controller('MainCtrl',
function(ApiServer, rx, $scope) {
var disposable = $scope.$createObservableFunction('increaseCounter')
.flatMap(function() {
return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
})
.do(ApiServer.logCounter)
.scan(function(acc, val) {return acc + val;})
.subscribe(function(counter) {
$scope.counter = counter;
console.log("Total counter is", counter);
}, function(error) {
console.error("There was an error");
});
$scope.$on('$destroy', function() {
disposable.dispose();
})
});
We depend on rx module
angular.module('counter', ['rx']);
angular.module('counter').controller('MainCtrl',
function(ApiServer, rx, $scope) {
var disposable = $scope.$createObservableFunction('increaseCounter')
.flatMap(function() {
return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
})
.do(ApiServer.logCounter)
.scan(function(acc, val) {return acc + val;})
.subscribe(function(counter) {
$scope.counter = counter;
console.log("Total counter is", counter);
}, function(error) {
console.error("There was an error");
});
$scope.$on('$destroy', function() {
disposable.dispose();
})
});
We get the clicks stream
angular.module('counter', ['rx']);
angular.module('counter').controller('MainCtrl',
function(ApiServer, rx, $scope) {
var disposable = $scope.$createObservableFunction('increaseCounter')
.flatMap(function() {
return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
})
.do(ApiServer.logCounter)
.scan(function(acc, val) {return acc + val;})
.subscribe(function(counter) {
$scope.counter = counter;
console.log("Total counter is", counter);
}, function(error) {
console.error("There was an error");
});
$scope.$on('$destroy', function() {
disposable.dispose();
})
});
For each new value we get the amount to sum
angular.module('counter', ['rx']);
angular.module('counter').controller('MainCtrl',
function(ApiServer, rx, $scope) {
var disposable = $scope.$createObservableFunction('increaseCounter')
.flatMap(function() {
return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
})
.do(ApiServer.logCounter)
.scan(function(acc, val) {return acc + val;})
.subscribe(function(counter) {
$scope.counter = counter;
console.log("Total counter is", counter);
}, function(error) {
console.error("There was an error");
});
$scope.$on('$destroy', function() {
disposable.dispose();
})
});
As a side effect, we log each of them
angular.module('counter', ['rx']);
angular.module('counter').controller('MainCtrl',
function(ApiServer, rx, $scope) {
var disposable = $scope.$createObservableFunction('increaseCounter')
.flatMap(function() {
return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
})
.do(ApiServer.logCounter)
.scan(function(acc, val) {return acc + val;})
.subscribe(function(counter) {
$scope.counter = counter;
console.log("Total counter is", counter);
}, function(error) {
console.error("There was an error");
});
$scope.$on('$destroy', function() {
disposable.dispose();
})
});
We sum each new value
angular.module('counter', ['rx']);
angular.module('counter').controller('MainCtrl',
function(ApiServer, rx, $scope) {
var disposable = $scope.$createObservableFunction('increaseCounter')
.flatMap(function() {
return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
})
.do(ApiServer.logCounter)
.scan(function(acc, val) {return acc + val;})
.subscribe(function(counter) {
$scope.counter = counter;
console.log("Total counter is", counter);
}, function(error) {
console.error("There was an error");
});
$scope.$on('$destroy', function() {
disposable.dispose();
})
});
We subscribe to listen to changes and handle ALL errors
angular.module('counter', ['rx']);
angular.module('counter').controller('MainCtrl',
function(ApiServer, rx, $scope) {
var disposable = $scope.$createObservableFunction('increaseCounter')
.flatMap(function() {
return rx.Observable.fromPromise(ApiServer.getCounterAmount(new Date()));
})
.do(ApiServer.logCounter)
.scan(function(acc, val) {return acc + val;})
.subscribe(function(counter) {
$scope.counter = counter;
console.log("Total counter is", counter);
}, function(error) {
console.error("There was an error");
});
$scope.$on('$destroy', function() {
disposable.dispose();
})
});
We dispose the observable
Clicking on a counter are very few events
Let's actually try with more events!
<moving-text class="moving-text" text="ngConf is awesome!!!"></moving-text>
angular.module('mover').directive('movingText', function() {
return {
restrict: 'E',
replace: 'true',
templateUrl: '/js/movingText.html',
scope: {
text: '@'
},
controller: MovingController
}
});
<div ng-mousemove="mouseMoved()">
<div class="text-container">
<span ng-repeat="letter in letters" ng-style="{top: letter.top, left: letter.left}" style="position: absolute;">
{{letter.text}}
</span>
</div>
</div>
<div ng-mousemove="mouseMoved()">
<div class="text-container">
<span ng-repeat="letter in letters" ng-style="{top: letter.top, left: letter.left}" style="position: absolute;">
{{letter.text}}
</span>
</div>
</div>
function directiveController() {
$scope.letters = [];
var mouseMoved = $scope.$createObservableFunction('mouseMoved')
.map(function (e) {
var offset = getOffset(textContainer);
return {
offsetX : e.clientX - offset.left,
offsetY : e.clientY - offset.top
};
})
.flatMap(function(delta) {
return rx.Observable.fromArray(_.map($scope.text, function(letter, index) {
return {
letter: letter,
delta: delta,
index: index
};
}));
})
// ...
function directiveController() {
$scope.letters = [];
var mouseMoved = $scope.$createObservableFunction('mouseMoved')
.map(function (e) {
var offset = getOffset(textContainer);
return {
offsetX : e.clientX - offset.left,
offsetY : e.clientY - offset.top
};
})
.flatMap(function(delta) {
return rx.Observable.fromArray(_.map($scope.text, function(letter, index) {
return {
letter: letter,
delta: delta,
index: index
};
}));
})
// ...
// ...
.flatMap(function(letterConfig) {
return rx.Observable.timer(letterConfig.index * 100)
.map(function() {
return {
text: letterConfig.letter,
top: letterConfig.delta.offsetY,
left: letterConfig.delta.offsetX + letterConfig.index * 20 + 15,
index: letterConfig.index
};
});
})
.subscribe(function(letterConfig) {
$scope.letters[letterConfig.index] = letterConfig;
});
$scope.$on('$destroy', function(){
mouseMoved.dispose();
});
}
// ...
.flatMap(function(letterConfig) {
return rx.Observable.timer(letterConfig.index * 100)
.map(function() {
return {
text: letterConfig.letter,
top: letterConfig.delta.offsetY,
left: letterConfig.delta.offsetX + letterConfig.index * 20 + 15,
index: letterConfig.index
};
});
})
.subscribe(function(letterConfig) {
$scope.letters[letterConfig.index] = letterConfig;
});
$scope.$on('$destroy', function(){
mouseMoved.dispose();
});
}
// ...
.flatMap(function(letterConfig) {
return rx.Observable.timer(letterConfig.index * 100)
.map(function() {
return {
text: letterConfig.letter,
top: letterConfig.delta.offsetY,
left: letterConfig.delta.offsetX + letterConfig.index * 20 + 15,
index: letterConfig.index
};
});
})
.subscribe(function(letterConfig) {
$scope.letters[letterConfig.index] = letterConfig;
});
$scope.$on('$destroy', function(){
mouseMoved.dispose();
});
}