Skip to main content
API reference for the @enclave-vm/stream package - streaming protocol implementation including NDJSON parsing, encryption, and reconnection handling.

Installation

npm install @enclave-vm/stream

NDJSON Parsing

serializeEvent(event)

Serialize an event to NDJSON format (single line).
function serializeEvent(event: MaybeEncrypted<StreamEvent>): string
Example:
import { serializeEvent } from '@enclave-vm/stream';

const line = serializeEvent({
  type: 'stdout',
  sessionId: 'sess_123',
  seq: 1,
  payload: { data: 'Hello' }
});
// '{"type":"stdout","sessionId":"sess_123","seq":1,"payload":{"data":"Hello"}}'

serializeEvents(events)

Serialize multiple events to NDJSON format.
function serializeEvents(events: MaybeEncrypted<StreamEvent>[]): string
Example:
import { serializeEvents } from '@enclave-vm/stream';

const ndjson = serializeEvents([event1, event2, event3]);
// Each event on its own line

parseLine(line)

Parse a single NDJSON line into an event.
function parseLine(line: string): ParseResult<ParsedStreamEvent | ParsedEncryptedEnvelope>

type ParseResult<T> =
  | { success: true; data: T }
  | { success: false; error: string; line: string };
Example:
import { parseLine } from '@enclave-vm/stream';

const result = parseLine('{"type":"stdout","sessionId":"sess_123","seq":1,"payload":{"data":"Hi"}}');
if (result.success) {
  console.log(result.data.type); // 'stdout'
}

parseLines(data)

Parse multiple NDJSON lines into events.
function parseLines(data: string): {
  events: (ParsedStreamEvent | ParsedEncryptedEnvelope)[];
  errors: Array<{ line: number; error: string; content: string }>;
}
Example:
import { parseLines } from '@enclave-vm/stream';

const { events, errors } = parseLines(ndjsonData);
console.log(`Parsed ${events.length} events, ${errors.length} errors`);

NdjsonStreamParser

Incremental NDJSON parser for streaming data. Handles partial lines across chunks.
class NdjsonStreamParser {
  constructor(options: {
    onEvent: (event: ParsedStreamEvent | ParsedEncryptedEnvelope) => void;
    onError: (error: { line: number; error: string; content: string }) => void;
  });

  feed(chunk: string): void;
  flush(): void;
  reset(): void;
  getLineNumber(): number;
  hasPendingData(): boolean;
}
Example:
import { NdjsonStreamParser } from '@enclave-vm/stream';

const parser = new NdjsonStreamParser({
  onEvent: (event) => console.log('Event:', event.type),
  onError: (error) => console.error('Parse error:', error.error),
});

// Feed chunks as they arrive
parser.feed('{"type":"stdout"');
parser.feed(',"sessionId":"sess_123","seq":1,"payload":{"data":"Hi"}}\n');

// Flush any remaining data when stream ends
parser.flush();

createNdjsonParseStream()

Create a transform stream that parses NDJSON. Works with browser fetch() and Node.js streams.
function createNdjsonParseStream(): TransformStream<string, ParsedStreamEvent | ParsedEncryptedEnvelope>
Example:
import { createNdjsonParseStream } from '@enclave-vm/stream';

const response = await fetch('/api/stream');
const reader = response.body
  .pipeThrough(new TextDecoderStream())
  .pipeThrough(createNdjsonParseStream())
  .getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  console.log('Event:', value.type);
}

createNdjsonSerializeStream()

Create a transform stream that serializes events to NDJSON.
function createNdjsonSerializeStream(): TransformStream<MaybeEncrypted<StreamEvent>, string>

parseNdjsonStream(stream)

Async generator that parses NDJSON from a ReadableStream.
async function* parseNdjsonStream(
  stream: ReadableStream<Uint8Array>
): AsyncGenerator<ParsedStreamEvent | ParsedEncryptedEnvelope>
Example:
import { parseNdjsonStream } from '@enclave-vm/stream';

const response = await fetch('/api/stream');

for await (const event of parseNdjsonStream(response.body)) {
  console.log('Event:', event.type);
}

ECDH Key Exchange

generateKeyPair(curve?)

Generate an ephemeral ECDH key pair.
async function generateKeyPair(curve?: SupportedCurve): Promise<EcdhKeyPair>

interface EcdhKeyPair {
  publicKey: CryptoKey;
  privateKey: CryptoKey;
}
ParameterTypeDefaultDescription
curveSupportedCurve'P-256'Elliptic curve to use
Example:
import { generateKeyPair } from '@enclave-vm/stream';

const keyPair = await generateKeyPair();
// Or with specific curve
const keyPair384 = await generateKeyPair('P-384');

exportPublicKey(publicKey)

Export a public key to base64 format.
async function exportPublicKey(publicKey: CryptoKey): Promise<SerializedPublicKey>

interface SerializedPublicKey {
  publicKeyB64: string;
  curve: SupportedCurve;
}

importPublicKey(publicKeyB64, curve?)

Import a public key from base64 format.
async function importPublicKey(
  publicKeyB64: string,
  curve?: SupportedCurve
): Promise<CryptoKey>

deriveSharedSecret(privateKey, peerPublicKey)

Derive shared secret from private key and peer’s public key.
async function deriveSharedSecret(
  privateKey: CryptoKey,
  peerPublicKey: CryptoKey
): Promise<Uint8Array>
Example:
import { generateKeyPair, deriveSharedSecret, importPublicKey } from '@enclave-vm/stream';

// Client side
const clientKeyPair = await generateKeyPair();
const serverPubKey = await importPublicKey(serverPublicKeyB64);
const sharedSecret = await deriveSharedSecret(clientKeyPair.privateKey, serverPubKey);

createClientHello(keyPair)

Create a client hello message for the encryption handshake.
async function createClientHello(keyPair: EcdhKeyPair): Promise<ClientHello>

createServerHello(keyPair, keyId)

Create a server hello message for the encryption handshake.
async function createServerHello(keyPair: EcdhKeyPair, keyId: string): Promise<ServerHello>

processClientHello(clientHello)

Process a client hello and generate server response.
async function processClientHello(clientHello: ClientHello): Promise<{
  serverKeyPair: EcdhKeyPair;
  peerPublicKey: CryptoKey;
  serverHello: ServerHello;
  keyId: string;
}>

processServerHello(serverHello)

Process a server hello and extract peer’s public key.
async function processServerHello(serverHello: ServerHello): Promise<{
  peerPublicKey: CryptoKey;
  keyId: string;
}>

EcdhError

Error class for ECDH operations.
class EcdhError extends Error {
  readonly code: string;
  constructor(message: string, code: string);
}

Key Derivation (HKDF)

deriveKey(sharedSecret, salt, info, keyLength?)

Derive a key using HKDF-SHA256.
async function deriveKey(
  sharedSecret: Uint8Array,
  salt: Uint8Array | null,
  info: string,
  keyLength?: number
): Promise<Uint8Array>
ParameterTypeDefaultDescription
sharedSecretUint8ArrayRequiredShared secret from ECDH
saltUint8Array | nullnullOptional salt (defaults to zeros)
infostringRequiredContext info string
keyLengthnumber32Output key length in bytes

deriveSessionKeys(sharedSecret, sessionId)

Derive session keys for bidirectional communication.
async function deriveSessionKeys(
  sharedSecret: Uint8Array,
  sessionId: string
): Promise<{
  clientToServerKey: Uint8Array;
  serverToClientKey: Uint8Array;
}>

importAesGcmKey(keyBytes)

Import raw key bytes as a CryptoKey for AES-GCM.
async function importAesGcmKey(keyBytes: Uint8Array): Promise<CryptoKey>

deriveSessionCryptoKeys(sharedSecret, sessionId)

Derive and import session keys as CryptoKeys.
async function deriveSessionCryptoKeys(
  sharedSecret: Uint8Array,
  sessionId: string
): Promise<{
  clientToServerKey: CryptoKey;
  serverToClientKey: CryptoKey;
}>
Example:
import { deriveSessionCryptoKeys, deriveSharedSecret } from '@enclave-vm/stream';

const sharedSecret = await deriveSharedSecret(privateKey, peerPublicKey);
const { clientToServerKey, serverToClientKey } = await deriveSessionCryptoKeys(
  sharedSecret,
  'sess_123'
);

HkdfError

Error class for HKDF operations.
class HkdfError extends Error {
  readonly code: string;
  constructor(message: string, code: string);
}

AES-GCM Encryption

encrypt(key, plaintext, nonce, additionalData?)

Encrypt data using AES-GCM.
async function encrypt(
  key: CryptoKey,
  plaintext: Uint8Array,
  nonce: Uint8Array,
  additionalData?: Uint8Array
): Promise<Uint8Array>

decrypt(key, ciphertext, nonce, additionalData?)

Decrypt data using AES-GCM.
async function decrypt(
  key: CryptoKey,
  ciphertext: Uint8Array,
  nonce: Uint8Array,
  additionalData?: Uint8Array
): Promise<Uint8Array>

encryptJson(key, keyId, data, nonce?)

Encrypt a JSON object and create an encrypted envelope payload.
async function encryptJson(
  key: CryptoKey,
  keyId: string,
  data: unknown,
  nonce?: Uint8Array
): Promise<EncryptedEnvelopePayload>

decryptJson(key, payload)

Decrypt an encrypted envelope payload and parse as JSON.
async function decryptJson<T>(
  key: CryptoKey,
  payload: EncryptedEnvelopePayload
): Promise<T>

createEncryptedEnvelope(key, keyId, sessionId, seq, innerEvent, nonce?)

Create an encrypted envelope from an event.
async function createEncryptedEnvelope(
  key: CryptoKey,
  keyId: string,
  sessionId: SessionId,
  seq: number,
  innerEvent: unknown,
  nonce?: Uint8Array
): Promise<EncryptedEnvelope>

generateNonce()

Generate a random 12-byte nonce for AES-GCM.
function generateNonce(): Uint8Array

generateCounterNonce(prefix, counter)

Generate a counter-based nonce (8 bytes prefix + 4 bytes counter).
function generateCounterNonce(prefix: Uint8Array, counter: bigint): Uint8Array

toBase64(bytes)

Encode bytes to base64.
function toBase64(bytes: Uint8Array): string

fromBase64(base64)

Decode base64 to bytes.
function fromBase64(base64: string): Uint8Array

AesGcmError

Error class for AES-GCM operations.
class AesGcmError extends Error {
  readonly code: string;
  constructor(message: string, code: string);
}

SessionEncryptionContext

Manages encryption key state for a session.
class SessionEncryptionContext {
  constructor(key: CryptoKey, keyInfo: SessionKeyInfo);

  // Properties
  readonly keyId: string;

  // Methods
  needsRotation(): boolean;
  encrypt(plaintext: Uint8Array): Promise<{ ciphertext: Uint8Array; nonce: Uint8Array }>;
  encryptJson(data: unknown): Promise<EncryptedEnvelopePayload>;
  createEnvelope(sessionId: SessionId, seq: number, innerEvent: unknown): Promise<EncryptedEnvelope>;
  decrypt(payload: EncryptedEnvelopePayload): Promise<Uint8Array>;
  decryptJson<T>(payload: EncryptedEnvelopePayload): Promise<T>;
  getNonceCounter(): bigint;
  toKeyInfo(): SessionKeyInfo;

  // Static methods
  static fromKeyBytes(keyBytes: Uint8Array, keyId: string): Promise<SessionEncryptionContext>;
}
Example:
import { SessionEncryptionContext } from '@enclave-vm/stream';

// Create from key bytes
const ctx = await SessionEncryptionContext.fromKeyBytes(keyBytes, 'key_123');

// Encrypt an event
const envelope = await ctx.createEnvelope('sess_123', 1, {
  type: 'stdout',
  payload: { data: 'Hello' }
});

// Check if key rotation is needed
if (ctx.needsRotation()) {
  // Perform key rotation
}

// Decrypt
const decrypted = await ctx.decryptJson(envelope.payload);

Reconnection

ConnectionState

Connection state enumeration.
const ConnectionState = {
  Disconnected: 'disconnected',
  Connecting: 'connecting',
  Connected: 'connected',
  Reconnecting: 'reconnecting',
  Failed: 'failed',
  Closed: 'closed',
} as const;

type ConnectionState = (typeof ConnectionState)[keyof typeof ConnectionState];

DEFAULT_RECONNECTION_CONFIG

Default reconnection configuration.
const DEFAULT_RECONNECTION_CONFIG: ReconnectionConfig = {
  maxRetries: 5,
  initialDelayMs: 1000,
  maxDelayMs: 30000,
  backoffMultiplier: 2,
  jitter: true,
  jitterFactor: 0.3,
};

interface ReconnectionConfig {
  maxRetries: number;
  initialDelayMs: number;
  maxDelayMs: number;
  backoffMultiplier: number;
  jitter: boolean;
  jitterFactor: number;
}

ReconnectionStateMachine

Manages connection state and automatic reconnection.
class ReconnectionStateMachine {
  constructor(options: {
    config?: Partial<ReconnectionConfig>;
    onEvent: (event: ReconnectionEvent) => void;
  });

  getState(): ConnectionState;
  getRetryCount(): number;
  connect(): void;
  onConnected(): void;
  onDisconnected(reason?: string): void;
  onFatalError(reason: string): void;
  close(): void;
  reset(): void;
  canReconnect(): boolean;
}

type ReconnectionEvent =
  | { type: 'state_change'; state: ConnectionState; previousState: ConnectionState }
  | { type: 'retry_scheduled'; attempt: number; delayMs: number }
  | { type: 'retry_started'; attempt: number }
  | { type: 'connected' }
  | { type: 'disconnected'; reason?: string }
  | { type: 'failed'; reason: string };
Example:
import { ReconnectionStateMachine, ConnectionState } from '@enclave-vm/stream';

const reconnect = new ReconnectionStateMachine({
  config: { maxRetries: 3 },
  onEvent: (event) => {
    switch (event.type) {
      case 'state_change':
        console.log(`State: ${event.previousState} -> ${event.state}`);
        break;
      case 'retry_scheduled':
        console.log(`Retry ${event.attempt} in ${event.delayMs}ms`);
        break;
      case 'connected':
        console.log('Connected!');
        break;
      case 'failed':
        console.error('Connection failed:', event.reason);
        break;
    }
  },
});

reconnect.connect();
// ... when connection succeeds
reconnect.onConnected();
// ... when connection drops
reconnect.onDisconnected('Network error');

SequenceTracker

Tracks sequence numbers and detects gaps for replay.
class SequenceTracker {
  constructor(maxGaps?: number);

  receive(seq: number): { gap: boolean; missingStart?: number; missingEnd?: number };
  getLastSeq(): number;
  getGaps(): Array<{ start: number; end: number }>;
  clearGap(start: number, end: number): void;
  hasGaps(): boolean;
  reset(): void;
}
Example:
import { SequenceTracker } from '@enclave-vm/stream';

const tracker = new SequenceTracker();

tracker.receive(1); // { gap: false }
tracker.receive(2); // { gap: false }
tracker.receive(5); // { gap: true, missingStart: 3, missingEnd: 4 }

console.log(tracker.getGaps()); // [{ start: 3, end: 4 }]

EventBuffer

Buffer for storing events during reconnection.
class EventBuffer {
  constructor(maxSize?: number);

  add(event: StreamEvent | EncryptedEnvelope): boolean;
  getAll(): (StreamEvent | EncryptedEnvelope)[];
  drain(): (StreamEvent | EncryptedEnvelope)[];
  size(): number;
  isFull(): boolean;
  clear(): void;
}
Example:
import { EventBuffer } from '@enclave-vm/stream';

const buffer = new EventBuffer(100);

buffer.add(event1);
buffer.add(event2);

// Get all events and clear buffer
const events = buffer.drain();

HeartbeatMonitor

Monitors heartbeats to detect stale connections.
class HeartbeatMonitor {
  constructor(options: { timeoutMs: number; onTimeout: () => void });

  start(): void;
  stop(): void;
  reset(): void;
  onHeartbeat(): void;
  getTimeSinceLastHeartbeat(): number;
}
Example:
import { HeartbeatMonitor } from '@enclave-vm/stream';

const monitor = new HeartbeatMonitor({
  timeoutMs: 30000,
  onTimeout: () => {
    console.log('Connection stale, reconnecting...');
    reconnect();
  },
});

monitor.start();

// When heartbeat received
monitor.onHeartbeat();

// When done
monitor.stop();

Re-exported Types

This package re-exports all types from @enclave-vm/types:
export * from '@enclave-vm/types';
See @enclave-vm/types API for the complete type reference.