本文整理汇总了TypeScript中xstream.create函数的典型用法代码示例。如果您正苦于以下问题:TypeScript create函数的具体用法?TypeScript create怎么用?TypeScript create使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了create函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的TypeScript代码示例。
示例1: socketDriver
return function socketDriver (sink$, streamAdapter) {
let connections = {};
const {observer, stream} = streamAdapter.makeSubject();
const newConnection$ = xs.create();
const disconnection$ = xs.create();
sink$.addListener({
next (event) {
if (event.type === 'BROADCAST') {
Object.values(connections).forEach(connection => {
(connection as any).send(JSON.stringify(event.data), (err) => {
if (err) {
console.error(err);
}
});
});
} else if (event.type === 'SEND') {
connections[event.id].send(JSON.stringify(event.data), (err) => {
if (err) {
console.error(err);
}
});
} else {
throw new Error(`Unrecognized event ${event}`);
}
},
error (err) {
console.error(err);
}
});
socketServer.on('connection', (ws) => {
console.log('Connected!')
// TODO actually do ids well
const id = uuid.v4();
console.log(`id: ${id}`);
connections[id] = ws;
newConnection$.shamefullySendNext(id);
ws.on('message', (data) => {
console.log(`received message from ${id}: ${data}`);
observer.next({data: JSON.parse(data), id});
});
ws.on('close', () => {
console.log('closed', id);
delete connections[id];
disconnection$.shamefullySendNext(id);
});
});
return {
messages: stream,
connections: newConnection$,
disconnections: disconnection$
};
}
开发者ID:Widdershin,项目名称:schism,代码行数:58,代码来源:server.ts
示例2: start
return function delayOperator<T>(inputStream: Stream<T>): Stream<T> {
const stream = xs.fromObservable(inputStream);
let delayListener: any = null;
const producer = {
start(listener: Listener<T>) {
delayListener = makeDelayListener<T>(
schedule,
currentTime,
delayTime,
listener
);
stream.addListener(delayListener);
},
stop() {
if (delayListener) {
stream.removeListener(delayListener);
}
},
};
return adapt(xs.create<T>(producer));
};
开发者ID:,项目名称:,代码行数:25,代码来源:
示例3: DOMDriver
function DOMDriver(vnode$: Stream<VNode>, name = 'DOM'): MainDOMSource {
domDriverInputGuard(vnode$);
const sanitation$ = xs.create<null>();
const rootElement$ = xs
.merge(vnode$.endWhen(sanitation$), sanitation$)
.map(vnode => vnodeWrapper.call(vnode))
.fold(patch, toVNode(rootElement))
.drop(1)
.map(unwrapElementFromVNode)
.compose(dropCompletion) // don't complete this stream
.startWith(rootElement as any);
// Start the snabbdom patching, over time
const listener = {error: reportSnabbdomError};
if (document.readyState === 'loading') {
document.addEventListener('readystatechange', () => {
if (document.readyState === 'interactive') {
rootElement$.addListener(listener);
}
});
} else {
rootElement$.addListener(listener);
}
return new MainDOMSource(
rootElement$,
sanitation$,
[],
isolateModule,
delegators,
name,
);
}
开发者ID:nlarche,项目名称:cyclejs,代码行数:33,代码来源:makeDOMDriver.ts
示例4: start
return function delayByOperator<T>(stream: Stream<T>): Stream<T> {
return xs.create<T>({
start(listener) {
const {schedule, currentTime} = timeSource.createOperator();
stream.addListener({
next(t: T) {
const delay = delaySelector(t);
schedule.next(listener, currentTime() + delay, t);
},
error(err: Error) {
schedule.error(listener, currentTime(), err);
},
complete() {
schedule.complete(listener, currentTime());
},
});
},
stop() {},
});
};
开发者ID:ntilwalli,项目名称:cyclejs,代码行数:25,代码来源:time.ts
示例5: startResponseStream
export function createResponse$(reqInput: RequestInput): Stream<Response> {
return xs.create<Response>({
start: function startResponseStream(listener) {
try {
const reqOptions = normalizeRequestInput(reqInput);
this.request = optionsToSuperagent(reqOptions);
if (reqOptions.progress) {
this.request = this.request.on('progress', (res: Response) => {
res.request = reqOptions;
listener.next(res);
});
}
this.request.end((err: any, res: Response) => {
if (err) {
if (err.response) {
err.response.request = reqOptions;
}
listener.error(err);
} else {
res.request = reqOptions;
listener.next(res);
listener.complete();
}
});
} catch (err) {
listener.error(err);
}
},
stop: function stopResponseStream() {
if (this.request && this.request.abort) {
this.request.abort();
}
},
});
}
开发者ID:joeldentici,项目名称:cyclejs,代码行数:35,代码来源:http-driver.ts
示例6: setupZapping
function setupZapping([graph, zapSpeed]: [Dagre.Graph, ZapSpeed]): Diagram {
const registry: ZapRegistry = new ZapRegistry();
const sourceNodes: Array<string> = graph['sources']();
sourceNodes.forEach(id => {
zapVisit(id, 0, graph, registry);
});
const rawZap$ = xs.create<Zap>({
start(listener: Listener<Zap>) {
for (let i = 0, N = registry.records.length; i < N; i++) {
const record = registry.records[i];
const id = record.id;
record.stream.setDebugListener({
next: (value) => listener.next({ id, type: 'next', value } as Zap),
error: (err) => listener.next({ id, type: 'error', value: err } as Zap),
complete: () => listener.next({ id, type: 'complete' } as Zap),
});
}
},
stop() {},
});
const actualZaps$ = rawZap$
.compose(timeSpread(zapSpeedToMilliseconds(zapSpeed)));
const stopZaps$ = actualZaps$
.mapTo([]).compose(debounce<Array<Zap>>(200))
.startWith([]);
const zaps$ = xs.merge(actualZaps$, stopZaps$);
return { graph, zaps$ };
}
开发者ID:whitecolor,项目名称:cyclejs,代码行数:33,代码来源:graphSerializer.ts
示例7: Error
function createResponse$(url: string): ResponseStream {
const res$: ResponseStream = xs.create<any>({
start: listener => {
if (typeof url !== `string`) {
listener.error(
new Error(
`Observable of requests given to JSONP ` +
`Driver must emit URL strings.`,
),
);
}
try {
jsonp(url, (err: Error, res: any) => {
if (err) {
listener.error(err);
} else {
listener.next(res);
listener.complete();
}
});
} catch (err) {
listener.error(err);
}
},
stop: () => {},
}) as ResponseStream;
res$.request = url;
return res$;
}
开发者ID:joeldentici,项目名称:cyclejs,代码行数:31,代码来源:index.ts
示例8: startPanel
export function startPanel(graph$: Stream<string>): void {
const adHocContainer: Element = document.createElement('DIV');
adHocContainer.id = '#ad-hoc-container';
document.body.appendChild(adHocContainer);
const domDriver = makeDOMDriver(
document.querySelector('#tools-container') || adHocContainer,
);
const domSinkProxy = xs.create<VNode>();
const domSource = domDriver(domSinkProxy, xsSA);
const panelSources = {graph: graph$, DOM: domSource};
const panelSinks = Panel(panelSources);
styles.inject();
domSinkProxy.imitate(panelSinks.DOM);
panelSinks.zapSpeed.addListener({
next(s: ZapSpeed) {
// alert('PANEL posting message to graph serializer: ' + s);
window['postMessageToGraphSerializer'](s);
},
error(err: any) { },
complete() { },
});
}
开发者ID:whitecolor,项目名称:cyclejs,代码行数:25,代码来源:index.ts
示例9: adapt
return function throttleOperator<T>(inputStream: Stream<T>): Stream<T> {
const state = {lastEventTime: -Infinity}; // so that the first event is always scheduled
const stream = xs.fromObservable(inputStream);
let throttleListener: any = null;
const throttledStream = xs.create<T>({
start(listener) {
throttleListener = makeThrottleListener<T>(
schedule,
currentTime,
period,
listener,
state
);
stream.addListener(throttleListener);
},
stop() {
if (throttleListener) {
stream.removeListener(throttleListener);
}
},
});
return adapt(throttledStream);
};
开发者ID:,项目名称:,代码行数:27,代码来源:
示例10: testUnsubscription
function testUnsubscription(Time: TimeSource, operator: any, done: Mocha.Done) {
const PERIOD = 20;
const taps: Array<number> = [];
const custom = adapt(
xs.create({
start: listener => {
listener.next(0);
setTimeout(() => listener.next(1), 1 * PERIOD);
setTimeout(() => listener.next(2), 2 * PERIOD);
setTimeout(() => {
listener.next(3);
listener.complete();
}, 3 * PERIOD);
},
stop: () => {
assert.deepEqual(taps, [0]);
Time.dispose();
done();
},
})
);
const tapped = map(custom, (x: number) => {
taps.push(x);
return x;
});
const composed = compose(
tapped,
operator
);
take(composed, 1).subscribe({});
}
开发者ID:,项目名称:,代码行数:34,代码来源:
注:本文中的xstream.create函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论