Skip to content

Commit cb219aa

Browse files
pranavjoshi001Pranav Joshi
andauthored
Add per-conversation streaming switch (deliveryMode='stream') (#443)
* add deliveryMode support * deliveryMode support to enable streaming from ABS side * comment resolved --------- Co-authored-by: Pranav Joshi <pranavjoshi@microsoft.com>
1 parent 13f7a42 commit cb219aa

File tree

2 files changed

+169
-2
lines changed

2 files changed

+169
-2
lines changed

src/directLine.test.ts

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,4 +243,151 @@ describe('MockSuite', () => {
243243
expect(actualError.status).toStrictEqual(429);
244244
expect(endTime - startTime).toStrictEqual(10);
245245
});
246+
247+
describe('StreamingMode', () => {
248+
249+
test('Setting streaming adds deliveryMode=stream to outgoing activity', () => {
250+
// override directline with streaming enabled
251+
directline = new DirectLineExport.DirectLine({ ...services, streaming: true });
252+
253+
const streamingActivity = DirectLineMock.mockActivity('streaming-test');
254+
const scenario = function* (): IterableIterator<Observable<unknown>> {
255+
yield Observable.timer(200, scheduler);
256+
yield directline.postActivity(streamingActivity);
257+
};
258+
259+
subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
260+
261+
const actual: Array<DirectLineExport.Activity> = [];
262+
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));
263+
264+
scheduler.flush();
265+
266+
expect(streamingActivity.deliveryMode).toStrictEqual('stream');
267+
expect(actual[0].deliveryMode).toStrictEqual('stream');
268+
});
269+
270+
test('Not setting streaming does not add deliveryMode at all to outgoing activity', () => {
271+
const normalActivity = DirectLineMock.mockActivity('normal-test');
272+
const scenario = function* (): IterableIterator<Observable<unknown>> {
273+
yield Observable.timer(200, scheduler);
274+
yield directline.postActivity(normalActivity);
275+
};
276+
277+
subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
278+
279+
const actual: Array<DirectLineExport.Activity> = [];
280+
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));
281+
282+
scheduler.flush();
283+
284+
expect(normalActivity.deliveryMode).toBeUndefined();
285+
expect(actual[0].deliveryMode).toBeUndefined();
286+
});
287+
288+
test('Setting streaming overrides passed deliveryMode "normal" in activity to "stream"', () => {
289+
directline = new DirectLineExport.DirectLine({ ...services, streaming: true });
290+
291+
const presetActivity: DirectLineExport.Message = {
292+
type: 'message',
293+
from: { id: 'sender' },
294+
text: 'preset',
295+
deliveryMode: 'normal'
296+
};
297+
298+
const scenario = function* (): IterableIterator<Observable<unknown>> {
299+
yield Observable.timer(200, scheduler);
300+
yield directline.postActivity(presetActivity);
301+
};
302+
303+
subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
304+
305+
const actual: Array<DirectLineExport.Activity> = [];
306+
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));
307+
308+
scheduler.flush();
309+
310+
expect(presetActivity.deliveryMode).toStrictEqual('stream');
311+
expect(actual[0].deliveryMode).toStrictEqual('stream');
312+
});
313+
314+
test('Not setting streaming preserves passed deliveryMode "normal" in activity', () => {
315+
const presetActivity: DirectLineExport.Message = {
316+
type: 'message',
317+
from: { id: 'sender' },
318+
text: 'preset-nonstream',
319+
deliveryMode: 'normal'
320+
};
321+
322+
const scenario = function* (): IterableIterator<Observable<unknown>> {
323+
yield Observable.timer(200, scheduler);
324+
yield directline.postActivity(presetActivity);
325+
};
326+
327+
subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
328+
329+
const actual: Array<DirectLineExport.Activity> = [];
330+
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));
331+
332+
scheduler.flush();
333+
334+
expect(presetActivity.deliveryMode).toStrictEqual('normal');
335+
expect(actual[0].deliveryMode).toStrictEqual('normal');
336+
});
337+
338+
test.each([
339+
{ streaming: true, expectedDeliveryMode: 'stream', testName: 'Streaming' },
340+
{ streaming: false, expectedDeliveryMode: undefined, testName: 'Non-streaming' }
341+
])('$testName + 403 post returns retry and preserves deliveryMode', ({ streaming, expectedDeliveryMode }) => {
342+
services.ajax = DirectLineMock.mockAjax(server, (urlOrRequest) => {
343+
if (typeof urlOrRequest === 'string') {
344+
throw new Error();
345+
}
346+
347+
if (urlOrRequest.url && urlOrRequest.url.indexOf('/conversations') > 0 && !/activities/u.test(urlOrRequest.url)) {
348+
// start conversation
349+
const response: Partial<AjaxResponse> = {
350+
response: server.conversation,
351+
status: 201,
352+
xhr: { getResponseHeader: () => 'n/a' } as unknown as XMLHttpRequest
353+
};
354+
return response as AjaxResponse;
355+
}
356+
357+
if (urlOrRequest.url && /activities/u.test(urlOrRequest.url)) {
358+
const response: Partial<AjaxResponse> = {
359+
status: 403,
360+
xhr: { getResponseHeader: () => 'n/a' } as unknown as XMLHttpRequest
361+
};
362+
const error = new Error('Forbidden');
363+
throw Object.assign(error, response);
364+
}
365+
366+
throw new Error();
367+
});
368+
369+
directline = new DirectLineExport.DirectLine({
370+
...services,
371+
...(streaming ? { streaming: true } : {})
372+
});
373+
374+
const retryActivity = DirectLineMock.mockActivity('will-retry');
375+
const scenario = function* (): IterableIterator<Observable<unknown>> {
376+
yield Observable.timer(200, scheduler);
377+
yield directline.postActivity(retryActivity);
378+
};
379+
380+
let postResult: string | undefined;
381+
subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe({
382+
next: v => { postResult = v as string; },
383+
error: () => {},
384+
complete: () => {}
385+
}));
386+
387+
scheduler.flush();
388+
389+
expect(retryActivity.deliveryMode).toStrictEqual(expectedDeliveryMode);
390+
expect(postResult).toStrictEqual('retry');
391+
});
392+
});
246393
});

src/directLine.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,8 @@ export interface User {
305305
role?: UserRole
306306
}
307307

308+
export type DeliveryMode = "normal" | "stream";
309+
308310
export interface IActivity {
309311
type: string,
310312
channelData?: any,
@@ -313,7 +315,8 @@ export interface IActivity {
313315
eTag?: string,
314316
from: User,
315317
id?: string,
316-
timestamp?: string
318+
timestamp?: string,
319+
deliveryMode?: DeliveryMode
317320
}
318321

319322
export type AttachmentLayout = "list" | "carousel";
@@ -372,7 +375,13 @@ export interface DirectLineOptions {
372375
timeout?: number,
373376
// Attached to all requests to identify requesting agent.
374377
botAgent?: string,
375-
conversationStartProperties?: any
378+
conversationStartProperties?: any,
379+
/**
380+
* Per-conversation switch for streaming delivery mode.
381+
* If true, every outgoing activity will include deliveryMode: 'stream'.
382+
* If false/omitted, deliveryMode is not sent (defaults to 'normal' in ABS).
383+
*/
384+
streaming?: boolean
376385
}
377386

378387
export interface Services {
@@ -477,12 +486,17 @@ export class DirectLine implements IBotConnection {
477486
private pollingInterval: number = 1000; //ms
478487

479488
private tokenRefreshSubscription: Subscription;
489+
private streaming: boolean;
480490

481491
constructor(options: DirectLineOptions & Partial<Services>) {
482492
this.secret = options.secret;
483493
this.token = options.secret || options.token;
484494
this.webSocket = (options.webSocket === undefined ? true : options.webSocket) && typeof WebSocket !== 'undefined' && WebSocket !== undefined;
485495

496+
if (options.streaming) {
497+
this.streaming = options.streaming;
498+
}
499+
486500
if (options.conversationStartProperties && options.conversationStartProperties.locale) {
487501
if (Object.prototype.toString.call(options.conversationStartProperties.locale) === '[object String]') {
488502
this.localeOnStartConversation = options.conversationStartProperties.locale;
@@ -754,6 +768,12 @@ export class DirectLine implements IBotConnection {
754768
}
755769

756770
postActivity(activity: Activity) {
771+
// If streaming is enabled for this DirectLine instance, always set deliveryMode to 'stream'
772+
// default would be 'normal' and not passing anything meaning 'normal' as well on ABS side
773+
if (this.streaming) {
774+
activity.deliveryMode = 'stream';
775+
}
776+
757777
// If user id is set, check if it match activity.from.id and always override it in activity
758778
if (this.userIdOnStartConversation && activity.from && activity.from.id !== this.userIdOnStartConversation) {
759779
console.warn('DirectLineJS: Activity.from.id does not match with user id, ignoring activity.from.id');

0 commit comments

Comments
 (0)