Skip to main content

@enclave-vm/stream

Streaming protocol implementation for the EnclaveJS runtime. Provides NDJSON parsing, end-to-end encryption, and automatic reconnection with event buffering.

Installation

npm install @enclave-vm/stream

NDJSON Protocol

EnclaveJS uses NDJSON (Newline-Delimited JSON) for streaming events. Each line is a complete JSON object representing a stream event.

Serialization

import {
  serializeEvent,
  serializeEvents,
} from '@enclave-vm/stream';

// Serialize a single event
const line = serializeEvent({
  type: 'stdout',
  sessionId: 'ses_...',
  seq: 1,
  timestamp: Date.now(),
  payload: { chunk: 'Hello\n' },
});
// '{"type":"stdout",...}\n'

// Serialize multiple events
const lines = serializeEvents([event1, event2, event3]);
// '{"type":"stdout",...}\n{"type":"log",...}\n{"type":"final",...}\n'

Parsing

import {
  parseLine,
  parseLines,
  NdjsonStreamParser,
} from '@enclave-vm/stream';

// Parse a single line
const result = parseLine('{"type":"stdout",...}');
if (result.success) {
  console.log(result.data);  // Typed StreamEvent
}

// Parse multiple lines
const events = parseLines(ndjsonString);
// Returns array of parse results

// Incremental stream parsing
const parser = new NdjsonStreamParser();

parser.on('event', (event) => {
  console.log('Received event:', event.type);
});

parser.on('error', (error) => {
  console.error('Parse error:', error);
});

// Feed chunks as they arrive
parser.write('{"type":"stdout"');
parser.write(',...}\n{"type":');
parser.write('"log",...}\n');

Node.js Streams

import {
  createNdjsonParseStream,
  createNdjsonSerializeStream,
  parseNdjsonStream,
} from '@enclave-vm/stream';

// Create a transform stream for parsing
const parseStream = createNdjsonParseStream();

// Create a transform stream for serialization
const serializeStream = createNdjsonSerializeStream();

// Parse an async iterable of chunks
const events = parseNdjsonStream(responseBody);
for await (const event of events) {
  console.log(event.type, event.payload);
}

End-to-End Encryption

EnclaveJS supports optional encryption using ECDH key exchange and AES-256-GCM.

Key Exchange (ECDH)

import {
  generateKeyPair,
  exportPublicKey,
  importPublicKey,
  deriveSharedSecret,
  createClientHello,
  createServerHello,
  processClientHello,
  processServerHello,
} from '@enclave-vm/stream';

// Client: Generate key pair and create hello
const clientKeys = await generateKeyPair('P-256');
const clientHello = createClientHello(clientKeys);

// Server: Process client hello and respond
const serverKeys = await generateKeyPair('P-256');
const { sharedSecret } = await processClientHello(clientHello, serverKeys);
const serverHello = createServerHello(serverKeys, sessionId);

// Client: Process server hello
const { sharedSecret: clientSecret } = await processServerHello(
  serverHello,
  clientKeys
);

// Both now have the same shared secret

Key Derivation (HKDF)

import {
  deriveKey,
  deriveSessionKeys,
  deriveSessionCryptoKeys,
} from '@enclave-vm/stream';

// Derive encryption key from shared secret
const key = await deriveKey(sharedSecret, {
  info: 'enclavejs-session',
  length: 32,  // AES-256
});

// Derive both client and server keys
const { clientKey, serverKey } = await deriveSessionKeys(
  sharedSecret,
  sessionId
);

// Derive as CryptoKey objects for Web Crypto API
const cryptoKeys = await deriveSessionCryptoKeys(sharedSecret, sessionId);

Encryption/Decryption (AES-GCM)

import {
  encrypt,
  decrypt,
  encryptJson,
  decryptJson,
  createEncryptedEnvelope,
  generateNonce,
  generateCounterNonce,
  toBase64,
  fromBase64,
  SessionEncryptionContext,
} from '@enclave-vm/stream';

// Low-level encryption
const nonce = generateNonce();
const encrypted = await encrypt(key, nonce, plaintext);
const decrypted = await decrypt(key, nonce, encrypted);

// JSON encryption helpers
const encryptedJson = await encryptJson(key, { data: 'secret' });
const decryptedJson = await decryptJson(key, encryptedJson);

// Create encrypted envelope for transmission
const envelope = await createEncryptedEnvelope(key, event, nonceCounter);

// High-level session encryption context
const ctx = new SessionEncryptionContext(key);

// Encrypt outgoing events
const encrypted = await ctx.encrypt(event);

// Decrypt incoming events
const decrypted = await ctx.decrypt(envelope);

Encryption Constants

import {
  AES_GCM_NONCE_SIZE,   // 12 bytes
  AES_GCM_TAG_SIZE,     // 16 bytes
  AES_256_KEY_SIZE,     // 32 bytes
  MAX_MESSAGES_PER_KEY, // Max messages before key rotation
} from '@enclave-vm/stream';

Reconnection Handling

Built-in support for automatic reconnection with event buffering and sequence tracking.

Connection State Machine

import { ConnectionState, ReconnectionStateMachine } from '@enclave-vm/stream';

// Connection states
ConnectionState.DISCONNECTED  // Not connected
ConnectionState.CONNECTING    // Connection in progress
ConnectionState.CONNECTED     // Active connection
ConnectionState.RECONNECTING  // Attempting reconnection

// State machine
const stateMachine = new ReconnectionStateMachine({
  maxAttempts: 5,
  initialDelay: 1000,
  maxDelay: 30000,
  backoffMultiplier: 2,
});

stateMachine.on('stateChange', (newState, oldState) => {
  console.log(`Connection: ${oldState} -> ${newState}`);
});

stateMachine.on('reconnect', (attempt) => {
  console.log(`Reconnection attempt ${attempt}`);
});

// Trigger state transitions
stateMachine.connect();
stateMachine.disconnect();
stateMachine.connectionFailed();
stateMachine.connectionSucceeded();

Reconnection Configuration

import {
  DEFAULT_RECONNECTION_CONFIG,
  type ReconnectionConfig,
} from '@enclave-vm/stream';

const config: ReconnectionConfig = {
  enabled: true,           // Enable auto-reconnection
  maxAttempts: 5,          // Max reconnection attempts
  initialDelay: 1000,      // Initial delay (ms)
  maxDelay: 30000,         // Max delay between attempts (ms)
  backoffMultiplier: 2,    // Exponential backoff multiplier
  jitter: 0.1,             // Random jitter (0-1)
};

console.log(DEFAULT_RECONNECTION_CONFIG);

Sequence Tracking

import { SequenceTracker } from '@enclave-vm/stream';

const tracker = new SequenceTracker();

// Track received events
tracker.received(event);

// Check for gaps
if (tracker.hasGap()) {
  console.log('Missing events:', tracker.getMissingSequences());
}

// Get last received sequence
const lastSeq = tracker.getLastSequence();

// Request replay from sequence
const replayFrom = tracker.getReplaySequence();

Event Buffering

import { EventBuffer } from '@enclave-vm/stream';

const buffer = new EventBuffer({
  maxSize: 1000,       // Max events to buffer
  maxAge: 60000,       // Max age in ms
});

// Buffer events during disconnection
buffer.add(event);

// Get buffered events for replay
const events = buffer.drain();

// Check buffer status
console.log(buffer.size);
console.log(buffer.isEmpty);

Heartbeat Monitoring

import { HeartbeatMonitor } from '@enclave-vm/stream';

const monitor = new HeartbeatMonitor({
  interval: 30000,   // Expected interval (ms)
  timeout: 45000,    // Timeout before considered dead (ms)
});

monitor.on('timeout', () => {
  console.log('Connection appears dead, reconnecting...');
});

// Call when heartbeat received
monitor.heartbeatReceived();

// Start/stop monitoring
monitor.start();
monitor.stop();

Re-exported Types

This package re-exports all types from @enclave-vm/types for convenience:
import {
  // Protocol
  SessionId,
  CallId,
  generateSessionId,

  // Events
  StreamEvent,
  EventType,
  isStdoutEvent,

  // Schemas
  StreamEventSchema,
  parseStreamEvent,
} from '@enclave-vm/stream';