Parallel vs. Async Course Introduction you don’t know JS
Agenda
Async Patterns
Parallel vs. Async
Callbacks
Thunks
Promises
Generators/Coroutines
Event Reactive(observables)
CSP(channel-oriented concurrency)
Single Threaded JavaScript parallel(并行)既多线程同时协同工作,关键在于优化(optimization)
async(异步)是单线程的(single thread)
Concurrency 虽然js是单线程,但是可以通过event loop来更改事件的执行顺序来模拟出异步状态,在用户看来就是并发的,尽管js仍是单线程的既同一时刻只执行一个事件。
Async is manage our concurrency
Callback Callback Hell 最简单的回调函数:
1 2 3 setTimeout (function ( ) { console .log ('callback' ) }, 1000 )
Callbacks == Continuations
callback Hell:
1 2 3 4 5 6 7 8 9 setTimeout (function ( ) { console .log ('one' ) setTimeout (function ( ) { console .log ('two' ) setTimeout (function ( ) { console .log ('three' ) }, 1000 ) }, 1000 ) }, 1000 )
以上就是通常所说的回调地狱,也叫回调金字塔。回调地狱不仅仅是因为套嵌和缩进所造成的阅读问题,例如以下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 function one (cb ){ console .log ('one' ) setTimeout (cb, 1000 ) } function two (cb ){ console .log ('two' ) setTimeout (cb, 1000 ) } function three (cb ){ console .log ('three' ) } one (function ( ) { two (three) })
尽管缩进问题和套嵌问题已经改善,但是可读性仍然没有质的提高。
Exercise 1
This exercise calls for you to write some async flow-control code. To start off with, you’ll use callbacks only. 这个练习要求您编写一些异步流控制代码。首先,您将只使用回调。
Expected behavior: 期望的行为:
Request all 3 files at the same time (in “parallel”). 同时请求所有3个文件(“并行”)。
Render them ASAP (don’t just blindly wait for all to finish loading) 尽快渲染(不要盲目等待加载完成)
BUT, render them in proper (obvious) order: “file1”, “file2”, “file3”. 但是,以适当的(明显的)顺序呈现它们:“file1”、“file2”、“file3”。
After all 3 are done, output “Complete!”. 完成所有3项后,输出“Complete!”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 function fakeAjax (url, cb ) { var fake_responses = { file1 : 'The first text' , file2 : 'The middle text' , file3 : 'The last text' } var randomDelay = (Math .round (Math .random () * 1e4 ) % 8000 ) + 1000 console .log ('Requesting: ' + url) setTimeout (function ( ) { cb (fake_responses[url]) }, randomDelay) } function output (text ) { console .log (text) } function getFile (file ) { fakeAjax (file, function (text ) { handleResponse (file, text) }) } const response = {}function handleResponse (fileName, contents ) { if (!(fileName in response)) { response[fileName] = contents } const fileNames = ['file1' , 'file2' , 'file3' ] for (let i = 0 ; i < fileNames.length ; i++) { const filename = fileNames[i] if (filename in response) { if (typeof filename == 'string' ) { output (response[filename]) response[filename] = false } } else { return } } } getFile ('file1' )getFile ('file2' )getFile ('file3' )
Callback Problems: Inversion of Control 在这里的inversion of control指的是在代码中同步的部分是由我们预期的时间执行,由我们自己控制的,但是回调部分中的代码是交由event loop执行,无法预估执行时间,由event loop控制执行的。
1 2 3 4 5 6 7 tracCheckout ( purchaseInfo, function finish ( ) { chargeCreditCard (purchaseInfo) showThankYouPage } )
以上是一个用来追踪付款的函数,这里想象了一个场景,有一天顾客提出了一个问题:信用卡重复支付了5次。经过排查可以怀疑是回调函数重复执行了5次,但是回调无法追踪,也没有具体文档。这就引出了一个问题,既我们默认回调是可以信任的,没有任何问题的。但实际上,不同回调实现的算法不同,所以会有各种的Corner cases。回调会有以下一些信任问题:
not too early
not too late
not too many times
not too few times
no lost context
no swallowed errors ……
Callback Problems: Not Reason-able 一般人的大脑基本是单线程的,在同一时刻只能思考一件事,js引擎也是一样。
当我们大脑的思考模式与js引擎工作模式分离的时候,也就是bug开始产生的时候。
以下是大脑工作模式的伪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 start task1 : do some stuff pause start task2 : do some other stuff pause resume task1 : do more stuff pause resume task2 : finish stuff resume task1 : finish stuff
使用callback时,工作模式更像下面的伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 start task1 : do some stuff pause resume task1 : do more stuff pause resume task1 : finish stuff start task2 : do some stuff pause resume task2 : do more stuff pause resume task2 : finish stuff
但我们需要一些函数执行完在执行里一个函数的情况可以称为Temporal Dependency,使用回调函数处理temporal dependency的话就要套嵌回调函数。但是很明显,我们的大脑并不适合这种工作模式,理解起来很困难。
1 2 3 4 5 console .log ('First half of my program' )setTimeout (() => { console .log ('Second half of my program' ) }, 1000 )
Non Fixes Async Patterns: separate callbacks 1 2 3 4 5 6 7 8 9 10 11 12 function trySomething (ok, err ) { setTimeout (function ( ) { const num = Math .random () if (num > .5 ) ok (num) else err (num) }, 1000 ) } trySomething ( num => console .log (`Success ${num} ` ), num => console .log (`Sorry ${num} ` ) )
Async Pattern: “error-first style” 1 2 3 4 5 6 7 8 9 10 11 12 13 14 function trySomething (callback ) { setTimeout (function ( ) { const num = Math .random () if (num > .5 ) callback (null , num) else callback ('Too low!' ) }) } trySomething ( (err, num ) => { if (err) console .log (err) else console .log (`Number ${num} ` ) } )
Async Pattern: nested-callback tasks 1 2 3 4 5 6 7 8 9 10 11 12 13 14 function getData (data, callback ) { setTimeout (() => { callback (data) }, 1000 ) } getData (10 , function (num1 ) { const x = 1 + num1 getData (30 , function (num2 ) { const y = 1 + num2 getData ( `Meaning of life: ${x + y} ` , answer => console .log (answer) ) }) })
Thunks Synchronous and Asynchronous Thunks Thunk is a function that has everything already that it needs to do to give you some value back.
This is a synchronous thunk:
1 2 3 4 5 6 7 8 9 function add (x, y ) { return x + y } const thunk = function ( ) { return add (10 , 15 ) } thunk ()
This is asynchronous thunk, only need to pass the callback argument:
1 2 3 4 5 6 7 8 9 function addAsync (x, y, cb ) { setTimeout (() => { cb (x + y)}, 1000 ) } const thunk = cb => addAsync (10 , 15 , cb)thunk (sum => sum )
Time is most complex factor of state in your program, managing time is most complex factor of state in your program
1 2 3 4 5 6 7 function makeThunk (fn ) { const args = [].slice .call (arguments , 1 ) return function callback (cb ) { args.push (cb) fn.apply (null , args) } }
1 2 3 4 5 6 7 8 9 function addAsync (x, y, cb ) { setTimeout (() => { cb (x + y)}, 1000 ) } const thunk = makeThunk (addAsycn, 10 , 15 )thunk (sum => { console .log (sum) })
Async Patterns: nested-thunk tasks 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 const get10 = makeThunk (getData, 10 )const get30 = makeThunk (getData, 30 )get10 (num1 => { const x = 1 + num1 get30 (num2 => { const y = 1 + num2 const getAnswer = makeThunk (getData, `Meaning of life: ${x + y} ` ) getAnswer (answer => console .log (answer) ) }) })
Exercise 2
You’ll do the same thing as the previous exercise(s), but now you should use thunks. 您将做与前面的练习相同的事情,但是现在您应该使用thunks。
Expected behavior: 期望的行为:
Request all 3 files at the same time (in “parallel”). 同时请求所有3个文件(“并行”)。
Render them ASAP (don’t just blindly wait for all to finish loading) 尽快渲染(不要盲目等待加载完成)
BUT, render them in proper (obvious) order: “file1”, “file2”, “file3”. 但是,以适当的(明显的)顺序呈现它们:“file1”、“file2”、“file3”。
After all 3 are done, output “Complete!”. 完成所有3项后,输出“Complete!”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import { callbackify } from 'util' function fakeAjax (url, cb ) { var fake_responses = { file1 : 'The first text' , file2 : 'The middle text' , file3 : 'The last text' } var randomDelay = (Math .round (Math .random () * 1e4 ) % 8000 ) + 1000 console .log ('Requesting: ' + url) setTimeout (function ( ) { cb (fake_responses[url]) }, randomDelay) } function output (text ) { console .log (text) } function getFile (file ) { let text, fn fakeAjax (file, response => { if (fn) fn (response) else text = response }) return callback => { if (text) callback (text) else fn = callback } } const thunk1 = getFile ('file1' ), thunk2 = getFile ('file2' ), thunk3 = getFile ('file3' ) thunk1 (text1 => { output (text1) thunk2 (text2 => { output (text2) thunk3 (text3 => { output (text3) output ('Complete!' ) }) }) })
Thunks and Closures Why should thunk be something that’s part of my arsenal of weapons that I used to take down asynchronous programming issure?
In the thunk pattern by using the closure to maintain the state of something. We eliminate time as a complecting factor of state.
在上面的代码中,我们在闭包中设置变量保存异步调用返回的数据和尚未传入的回调函数,通过判断数据的状态来决定是在回调中直接处理数据还是将回调函数传入异步函数中返回处理过后的数据。通过这种方法,我们解决了数据返回的时间问题,既可以确认通过thunk,传入一个回调函数,会返回一个处理后的结果,这也是promise的基本原理之一,也是promise如此革命性的愿意之一。
Promises Native Promises Promise就像去肯德基买汉堡,把钱交给收银员后并不会立即得到一个汉堡,而是一个订单号。拿着订单号去柜台可以在汉堡准备好之后领到汉堡。在拿到订单号之后,虽然并没有立即获得汉堡,但是可以将订单号视为汉堡进行处理,可以将汉堡自己吃掉也可以交给另一个人。
promise是一个包裹future value的容器。promise提供了一种可以忽略value的时间状态来composing的方法,所以在函数式编程的理论中,promise是一个monad。
在callback模式中,我们提供一个callback给异步方法,由异步方法决定何时调用回调函数。在promise中,我们调用promise,在promise执行结束后通知我们,由我们决定对执行结果进行怎样处理。promise也可以视为一个eventListener,在执行结束后会有一个“Completion Events”。
Promis API 1 2 3 4 5 6 7 8 9 10 11 12 13 14 function finish ( ) { chargeCreditCard (purchaseInfo) showThankYouPage () } function error (err ) { logStatsError (err) finish () } const listener = trackCheckout (purchaseInfo)listener.on ('completion' , finish) listener.on ('error' , error)
1 2 3 4 5 6 7 8 9 10 function trackCheckout (info ) { return new Promise ( function (resolve, reject ) { } ) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 function finish ( ) { chargeCreditCard (purchaseInfo) showThankYouPage () } function error (err ) { logStatsError (err) finish () } const promise = trackCheckout (purchaseInfo)promise.then ( finish, error )
通过promise,我们确认将要返回一个future value,可以自行决定对其如何进行处理,我们把控制权从异步方法转移回了我们自己,既通过消灭时间状态来消灭inversion of control。
Promise Trust
Only resloved once
Either success or error
Messages passed/kept
Exceptions becomes errors
Immutable once resloved
Promise Flow Control 1 2 3 4 5 doFirstThing then doSecondThing then doThirdThing then complete or error
通过Chainning Promises,我们可以使用promise来处理数据流。
1 2 3 4 5 6 7 doFirstThing () .then (() => doSecondThing) .then (() => doThirdThing) .then ( complete, error )
1 2 3 4 5 6 7 8 9 10 function delay (num ) { return new Promise (function (reslove, reject ) { setTimeout (resolve,num) }) } delay (100 ) .then (() => delay (50 )) .then (() => delay (200 )) .then (() => console .log ('All done' ))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 function getData (data ) { return new Promise ((res, rej ) => { setTimeout (() => { res (data) }, 1000 ) }) } let xgetData (10 ) .then (num1 => { x = 1 + num1 return getData (30 ) }) .then (num2 => { const y = 1 + num2 return getData (`Meaning of life: ${x + y} ` ) }) .then (answer => { console .log (answer) })
Exercise 3
You’ll do the same thing as the previous exercise(s), but now you should use thunks. 您将做与前面的练习相同的事情,但是现在您应该使用thunks。
Expected behavior: 期望的行为:
Request all 3 files at the same time (in “parallel”). 同时请求所有3个文件(“并行”)。
Render them ASAP (don’t just blindly wait for all to finish loading) 尽快渲染(不要盲目等待加载完成)
BUT, render them in proper (obvious) order: “file1”, “file2”, “file3”. 但是,以适当的(明显的)顺序呈现它们:“file1”、“file2”、“file3”。
After all 3 are done, output “Complete!”. 完成所有3项后,输出“Complete!”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 function fakeAjax (url, cb ) { var fake_responses = { file1 : 'The first text' , file2 : 'The middle text' , file3 : 'The last text' } var randomDelay = (Math .round (Math .random () * 1e4 ) % 8000 ) + 1000 console .log ('Requesting: ' + url) setTimeout (function ( ) { cb (fake_responses[url]) }, randomDelay) } function output (text ) { console .log (text) } function getFile (file ) { return new Promise ((resolve, reject ) => { fakeAjax (file, resolve) }) } const promise1 = getFile ('file1' ), promise2 = getFile ('file2' ), promise3 = getFile ('file3' ) promise1 .then (output) .then (() => promise2) .then (output) .then (() => promise3) .then (output) .then (() => output ('complete' ))
Exercise 4 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 function fakeAjax (url, cb ) { var fake_responses = { file1 : 'The first text' , file2 : 'The middle text' , file3 : 'The last text' } var randomDelay = (Math .round (Math .random () * 1e4 ) % 8000 ) + 1000 console .log ('Requesting: ' + url) setTimeout (function ( ) { cb (fake_responses[url]) }, randomDelay) } function output (text ) { console .log (text) } function getFile (file ) { return new Promise (function (resolve ) { fakeAjax (file, resolve) }) } const files = ['file1' , 'file2' , 'file3' ]const parallel = files .map (file => getFile (file)) .reduce ((chain, nextPromise ) => { return chain .then (function nextFile ( ) { return nextPromise }) .then (output) }, Promise .resolve ()) .then (function complete ( ) { output ('complete' ) })
Abstractions Promise.all() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 Promise .all ([ doTaks1a (), doTaks1b (), doTaks1c () ]) .then (function (results ){ return doTask2 ( Math .max ( results[0 ], results[1 ], results[2 ] ) ) })
Promise.all()将多个promise组合成的数组作为参数,参数中的promise状态更新后更新本身的状态。
Promise.race() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 const p = trySomeAsyncThing ()Promise .race ([ p, new Promise (function (_, reject ) { setTimeout (function ( ) { reject ('Timeout' ) }, 3000 ) }) ]) .then ( success, error )
Promise.race()将多个promise组合成的数组作为参数,并返回最先执行完毕的那个promise对象。
github.com/getify/native-promise-only
Sequences & Gates sequence = automatically chained promises
Github Library Asynquence
1 2 3 4 5 6 7 8 9 10 11 const delay1Second = done => { setTimeout (done, 1000 ) }ASQ () .then (delay1Second) .gate ( delay1Second, delay1Second ) .then (done => { console .log ('2 seconds passed!' ) })
Exercise 5 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 function fakeAjax (url, cb ) { var fake_responses = { file1 : 'The first text' , file2 : 'The middle text' , file3 : 'The last text' } var randomDelay = (Math .round (Math .random () * 1e4 ) % 8000 ) + 1000 console .log ('Requesting: ' + url) setTimeout (function ( ) { cb (fake_responses[url]) }, randomDelay) } function output (text ) { console .log (text) } function getFile (file ) { return ASQ (function (done ) { fakeAjax (file, done) }) } ASQ () .seq (getFile ('file1' )) .val (output) .seq (getFile ('file2' )) .val (output) .seq (getFile ('file3' )) .val (output) .val (function ( ) { output ('Complete!' ) })
Exercise 6 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 function fakeAjax (url, cb ) { var fake_responses = { file1 : 'The first text' , file2 : 'The middle text' , file3 : 'The last text' } var randomDelay = (Math .round (Math .random () * 1e4 ) % 8000 ) + 1000 console .log ('Requesting: ' + url) setTimeout (function ( ) { cb (fake_responses[url]) }, randomDelay) } function output (text ) { console .log (text) } function getFile (file ) { return ASQ (function (done ) { fakeAjax (file, done) }) } ASQ () .seq (...['file1' , 'file2' , 'file3' ].map (getFile).map (seq => () => seq.val (output))) .val (function ( ) { output ('Complete!' ) })
Generator Generator Example If promises were about solving the inversion of control issue in callback, generators are about solving the non-local, non-sequential reasonability problem.
A generator is a syntactic form of a state machine, is a pauseable function.
1 2 3 4 5 6 7 8 9 function *gen ( ) { console .log ('hello' ) yield console .log ('world' ) } const it = gen () it.next () it.next ()
Messaging 1 2 3 4 5 6 7 8 9 10 11 12 function *main ( ) { yield 1 yield 2 yield 3 } const iterator = main ()iterator.next () iterator.next () iterator.next () iterator.next ()
1 2 3 4 5 6 function coroutine (generator ) { const iterator = generator () return function ( ){ return iterator.next .apply (iterator, arguments ) } }
1 2 3 4 5 6 7 8 9 const run = coroutine (function *(){ const x = 1 + (yield ), y = 1 + (yield ) yield (x + y) }) run () run (10 ) console .log (`Meaning of life: ${run(30 ).value} ` )
Messaging Quertions generator can never finish. Sometimes you have to design a generator never finish.
可以在generator内部使用while(true)这个无限循环来实现生成unique ID之类的功能。
Async Generators 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 function coroutine (generator ) { const iterator = generator () return function ( ){ return iterator.next .apply (iterator, arguments ) } } function getData (data ) { setTimeout (function ( ) { run (data) }, 1000 ) } const run = coroutine (function *() { const x = 1 + (yield getData (10 )) const y = 1 + (yield getData (30 )) const answer = (yield getData ( `Meaning of life: ${x + y} ` )) console .log (answer) console .log ('This will be first?' ) }) run ()
通过使用generator,我们拥有了暂停函数执行的能力,这使得我们可以以同步的模式来编写异步代码,同时Error handler也可以以同步的方式来进行处理。
Promises + Generator 1 2 3 4 5 6 7 8 9 10 11 12 13 14 function getData (data ) { return ASQ (done => { setTimeout (function ( ) {done (data)}, 1000 )} ) } ASQ () .runner (function *() { const x = 1 + (yield getData (10 )) const y = 1 + (yield getData (30 )) const answer = (yield getData ( `Meaning of life: ${x + y} ` )) yield answer }) .val (answer => {console .log (answer)})
原生的promise和generator代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 function doWhenDataReecived (data ) { returnNextElement.next (data) } function *createFlow ( ) { const data = yield fetch ('http://twitter.com/will/tweets/1' ) console .log (data) } const returnNextElement = createFlow ()const futureData = returnNextElement.next ()futureData.then (doWhenDataReceived)
使用es6的async await语法,代码类似如下:
1 2 3 4 5 6 7 8 9 async function createFlow ( ) { console .log ('Me first!' ) const data = await fetch ('https://twitter.com/will/tweets/1' ) console .log (data) } createFlow ()console .log ('Me second!' )
因为目前promise是无法取消的,所以最新的async await语法也是无法取消的。但是使用generator和promise结合的语法是,可以直接使用generator.next来跳过promise执行。
Exercise 7 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 function fakeAjax (url, cb ) { var fake_responses = { file1 : 'The first text' , file2 : 'The middle text' , file3 : 'The last text' } var randomDelay = (Math .round (Math .random () * 1e4 ) % 8000 ) + 1000 console .log ('Requesting: ' + url) setTimeout (function ( ) { cb (fake_responses[url]) }, randomDelay) } function output (text ) { console .log (text) } function getFile (file ) { return ASQ (function (done ) { fakeAjax (file, done) }) } ASQ () .runner (function * loadFiles ( ) { var p1 = getFile ('file1' ) var p2 = getFile ('file2' ) var p3 = getFile ('file3' ) output (yield p1) output (yield p2) output (yield p3) }) .val (function ( ) { output ('Complete!' ) })
Quiz
What is “callback hell”? Why do callbacks suffer from “inversion of control” and “unreasonability”?
What is a Promise? How does it solve inversion of control issues?
How do you pause a generator? How do you resume it?
How do we combine generators and promises for flow control?
davidwalsh.name/es6-generatros
Observables Events + Promises Concurrency: Events (+ Promises)?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 const p1 = new Promise (function executor (resolve, reject ){ $('#btn' ).click (event => { const className = event.target .className if (/foobar/ .test (className)) { resolve (className) } else { reject () } }) }) p1.then (className => { console .log (className) })
以上代码的功能为点击按钮,测试按钮的className是否符合正则,然后输出className,因为使用了promise,所以以上代码只能执行一次。所以我们可以改为如下,以多次运行:
1 2 3 4 5 6 7 8 9 10 11 12 13 $('#btn' ).click (event => { const className = event.target .className , p1 = new Promise (function executor (resolve, reject ){ if (/foobar/ .test (className)) { resolve (className) } else { reject () } }) p1.then (className => {console .log (className)}) })
但是以上代码又变的没有什么意义,我们创建了一个promise后又在函数内直接使用了.then方法消费了promise。
1 2 3 4 5 6 $('#btn' ).click (event => { [event] .map (event => event.target .className ) .filter (className => /foobar/ .test (className)) .forEach (className => {console .log (className)}) })
以上代码我们还是将consumer和producer耦合在了一起,我们需要一种模式将producer和consumer解耦。
Enter Observables Observable在课程中主要指的是微软的rx.js。
Observable is kind of like a chain of calculated fields in a spreadsheet.
在Excel表格中,单元格可以依据其他单元格的数据进行计算的结果做为数据保存,这就像是一个数据流,当我们更改其中一个单元格数据时,其他单元格的数据也会重新进行计算并更改。
Observable是一个producer,产生数据流,经过一系列计算转换后由consumer进行使用。
An observable is an adapter hooked onto an event source, that produces a promise, every time a new event comes through.
1 2 3 4 5 6 7 8 9 const observable = Rx .Observable .formEvent (btn, 'click' )observable .map (clickEvent => clickEvent.target .className ) .filter (className => /foobar/ .test (className)) .distinctUntilChanged () .subscribe (data => { console .log (data[1 ]) })
可以通过rxmarbles.com 来观察rx.js的各种api的输出结果,理解操作符作用。
Reactive Sequences 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 function fromEvent (element, eventType ) { return ASQ .react (proceed => { $(element).bind (eventType, proceed) }) } const rsq = fromEvent (btn, 'click' ) rsq .val (event => event.target .className ) .then ((done, className) { if (/foobar/ .test (className)) { done (className) } }) .val (calssName => {console .log (className)})
1 2 3 4 5 function fromEvent (el, eventType ) { const rsq = ASQ .react .of () $(el).bind (eventType, rsq.push ) return rsq }
1 2 3 4 5 6 7 8 9 10 11 const rsq1 = ASQ .react .of (), rsq2 = ASQ .react .of (1 , 2 , 3 ) let x = 10 setInterval (() => { rsq1.push (x++) rsq2.push (x++) }, 500 ) rsq1.val (v => {console .log (`1:${v} ` )}) rsq2.val (v => {console .log (`2:${v} ` )})
Exercise 8
In this exercise, we’re going to practice with “reactive sequences” programming (aka “observables”) using asynquence.
Set up a reactive sequence for the stream of click events from the button. Hint: ASQ.react.of()
and rsq.push(..)
Set up another reactive sequence that samples the sequence stream from (2) once per second. In other words, if you click the button multiple times in a second, you only get one event message. Hint: setInterval(..)
The sampled sequence should add a “clicked!” message to the list.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 $(document ).ready (function ( ) { var $btn = $('#btn' ), $list = $('#list' ), clicks = ASQ .react .of (), msgs = ASQ .react .of (), latest $btn.click (function (evt ) { clicks.push (evt) }) setInterval (function ( ) { if (latest) { msgs.push ('clicked!' ) latest = null } }, 1000 ) clicks.val (function (evt ) { latest = evt }) msgs.val (function (msg ) { $list.append ($('<div>' + msg + '</div>' )) }) })
CSP Concurrency + Channels Channel is kind of like a stream, kind of like a pipe.
Channel提供了consumer向producer发出信息的机制。
CSP: Communicating Sequential Processes
Conmunicating Sequential Processes is all about modeling your application with lots of tiny independent pieces that we call processes, and they get to a point where they say we need to coordinate.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 const channel = chan ()function *process1 ( ) { yield put (channel, 'Hello' ) const msg = yield take (channel) console .log (msg) } function *process2 ( ) { const greeting = yield take (channel) yield put (channel, greeting + 'World' ) console .log ('Done' ) }
Blocking Channels 1 2 3 4 5 6 7 8 9 10 11 12 13 csp.go (function *() { while (true ) { yield csp.put (ch, Math .random ()) } }) csp.go (function *() { while (true ) { yield csp.take ( csp.timeout (500 ) ) let num = yield csp.take (ch) console .log (num) } })
1 2 3 4 5 6 csp.go (function *(){ while (true ) { const msg = yield csp.alts (ch1, ch2, ch3) console .log (msg) } })
Event Channels 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 function fromEvent (el, eventType ) { const ch = csp.chan () $(el).bind (eventType, event => { csp.putAsync (ch, event) }) return ch } csp.go (function *() { const ch = fromEvent (el, 'mousemove' ) while (true ) { const evt = yield csp.take (ch) console .log ( evt.clientX + '.' + evt.clientY ) } })
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ASQ ().runner ( ASQ .csp .go (function *process1 (ch ) { yield ASQ .csp .put (ch, 'Hello' ) const msg = yield ASQ .csp .take (ch) console .log (msg) }), ASQ .csp .go (function *process2 (ch ) { const greeting = yield ASQ .csp .take (ch) yield ASQ .csp .put (ch, greeting + " World" ) console .log ('done!' ) }) )
Exercise 9 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 $(document ).ready (function ( ) { var $btn = $('#btn' ), $list = $('#list' ), clicks = ASQ .csp .chan (), msgs = ASQ .csp .chan (), queuedClick $btn.click (listenToClicks) ASQ ().runner (ASQ .csp .go (sampleClicks), ASQ .csp .go (logClick)) function listenToClicks (evt ) { if (!queuedClick) { queuedClick = ASQ .csp .putAsync (clicks, evt) queuedClick.then (function ( ) { queuedClick = null }) } } function * sampleClicks ( ) { while (true ) { yield ASQ .csp .take (ASQ .csp .timeout (1000 )) yield ASQ .csp .take (clicks) yield ASQ .csp .put (msgs, 'clicked!' ) } } function * logClick ( ) { while (true ) { var msg = yield ASQ .csp .take (msgs) $list.append ($('<div>' + msg + '</div>' )) } } })
Recap
Callbacks / Thunks
Promises
Generators
Observables
CSP go-routines