@enclave-vm/stream
Streaming protocol implementation for the EnclaveJS runtime. Provides NDJSON parsing, end-to-end encryption, and automatic reconnection with event buffering.Installation
npm install @enclave-vm/stream
NDJSON Protocol
EnclaveJS uses NDJSON (Newline-Delimited JSON) for streaming events. Each line is a complete JSON object representing a stream event.Serialization
import {
serializeEvent,
serializeEvents,
} from '@enclave-vm/stream';
// Serialize a single event
const line = serializeEvent({
type: 'stdout',
sessionId: 'ses_...',
seq: 1,
timestamp: Date.now(),
payload: { chunk: 'Hello\n' },
});
// '{"type":"stdout",...}\n'
// Serialize multiple events
const lines = serializeEvents([event1, event2, event3]);
// '{"type":"stdout",...}\n{"type":"log",...}\n{"type":"final",...}\n'
Parsing
import {
parseLine,
parseLines,
NdjsonStreamParser,
} from '@enclave-vm/stream';
// Parse a single line
const result = parseLine('{"type":"stdout",...}');
if (result.success) {
console.log(result.data); // Typed StreamEvent
}
// Parse multiple lines
const events = parseLines(ndjsonString);
// Returns array of parse results
// Incremental stream parsing
const parser = new NdjsonStreamParser();
parser.on('event', (event) => {
console.log('Received event:', event.type);
});
parser.on('error', (error) => {
console.error('Parse error:', error);
});
// Feed chunks as they arrive
parser.write('{"type":"stdout"');
parser.write(',...}\n{"type":');
parser.write('"log",...}\n');
Node.js Streams
import {
createNdjsonParseStream,
createNdjsonSerializeStream,
parseNdjsonStream,
} from '@enclave-vm/stream';
// Create a transform stream for parsing
const parseStream = createNdjsonParseStream();
// Create a transform stream for serialization
const serializeStream = createNdjsonSerializeStream();
// Parse an async iterable of chunks
const events = parseNdjsonStream(responseBody);
for await (const event of events) {
console.log(event.type, event.payload);
}
End-to-End Encryption
EnclaveJS supports optional encryption using ECDH key exchange and AES-256-GCM.Key Exchange (ECDH)
import {
generateKeyPair,
exportPublicKey,
importPublicKey,
deriveSharedSecret,
createClientHello,
createServerHello,
processClientHello,
processServerHello,
} from '@enclave-vm/stream';
// Client: Generate key pair and create hello
const clientKeys = await generateKeyPair('P-256');
const clientHello = createClientHello(clientKeys);
// Server: Process client hello and respond
const serverKeys = await generateKeyPair('P-256');
const { sharedSecret } = await processClientHello(clientHello, serverKeys);
const serverHello = createServerHello(serverKeys, sessionId);
// Client: Process server hello
const { sharedSecret: clientSecret } = await processServerHello(
serverHello,
clientKeys
);
// Both now have the same shared secret
Key Derivation (HKDF)
import {
deriveKey,
deriveSessionKeys,
deriveSessionCryptoKeys,
} from '@enclave-vm/stream';
// Derive encryption key from shared secret
const key = await deriveKey(sharedSecret, {
info: 'enclavejs-session',
length: 32, // AES-256
});
// Derive both client and server keys
const { clientKey, serverKey } = await deriveSessionKeys(
sharedSecret,
sessionId
);
// Derive as CryptoKey objects for Web Crypto API
const cryptoKeys = await deriveSessionCryptoKeys(sharedSecret, sessionId);
Encryption/Decryption (AES-GCM)
import {
encrypt,
decrypt,
encryptJson,
decryptJson,
createEncryptedEnvelope,
generateNonce,
generateCounterNonce,
toBase64,
fromBase64,
SessionEncryptionContext,
} from '@enclave-vm/stream';
// Low-level encryption
const nonce = generateNonce();
const encrypted = await encrypt(key, nonce, plaintext);
const decrypted = await decrypt(key, nonce, encrypted);
// JSON encryption helpers
const encryptedJson = await encryptJson(key, { data: 'secret' });
const decryptedJson = await decryptJson(key, encryptedJson);
// Create encrypted envelope for transmission
const envelope = await createEncryptedEnvelope(key, event, nonceCounter);
// High-level session encryption context
const ctx = new SessionEncryptionContext(key);
// Encrypt outgoing events
const encrypted = await ctx.encrypt(event);
// Decrypt incoming events
const decrypted = await ctx.decrypt(envelope);
Encryption Constants
import {
AES_GCM_NONCE_SIZE, // 12 bytes
AES_GCM_TAG_SIZE, // 16 bytes
AES_256_KEY_SIZE, // 32 bytes
MAX_MESSAGES_PER_KEY, // Max messages before key rotation
} from '@enclave-vm/stream';
Reconnection Handling
Built-in support for automatic reconnection with event buffering and sequence tracking.Connection State Machine
import { ConnectionState, ReconnectionStateMachine } from '@enclave-vm/stream';
// Connection states
ConnectionState.DISCONNECTED // Not connected
ConnectionState.CONNECTING // Connection in progress
ConnectionState.CONNECTED // Active connection
ConnectionState.RECONNECTING // Attempting reconnection
// State machine
const stateMachine = new ReconnectionStateMachine({
maxAttempts: 5,
initialDelay: 1000,
maxDelay: 30000,
backoffMultiplier: 2,
});
stateMachine.on('stateChange', (newState, oldState) => {
console.log(`Connection: ${oldState} -> ${newState}`);
});
stateMachine.on('reconnect', (attempt) => {
console.log(`Reconnection attempt ${attempt}`);
});
// Trigger state transitions
stateMachine.connect();
stateMachine.disconnect();
stateMachine.connectionFailed();
stateMachine.connectionSucceeded();
Reconnection Configuration
import {
DEFAULT_RECONNECTION_CONFIG,
type ReconnectionConfig,
} from '@enclave-vm/stream';
const config: ReconnectionConfig = {
enabled: true, // Enable auto-reconnection
maxAttempts: 5, // Max reconnection attempts
initialDelay: 1000, // Initial delay (ms)
maxDelay: 30000, // Max delay between attempts (ms)
backoffMultiplier: 2, // Exponential backoff multiplier
jitter: 0.1, // Random jitter (0-1)
};
console.log(DEFAULT_RECONNECTION_CONFIG);
Sequence Tracking
import { SequenceTracker } from '@enclave-vm/stream';
const tracker = new SequenceTracker();
// Track received events
tracker.received(event);
// Check for gaps
if (tracker.hasGap()) {
console.log('Missing events:', tracker.getMissingSequences());
}
// Get last received sequence
const lastSeq = tracker.getLastSequence();
// Request replay from sequence
const replayFrom = tracker.getReplaySequence();
Event Buffering
import { EventBuffer } from '@enclave-vm/stream';
const buffer = new EventBuffer({
maxSize: 1000, // Max events to buffer
maxAge: 60000, // Max age in ms
});
// Buffer events during disconnection
buffer.add(event);
// Get buffered events for replay
const events = buffer.drain();
// Check buffer status
console.log(buffer.size);
console.log(buffer.isEmpty);
Heartbeat Monitoring
import { HeartbeatMonitor } from '@enclave-vm/stream';
const monitor = new HeartbeatMonitor({
interval: 30000, // Expected interval (ms)
timeout: 45000, // Timeout before considered dead (ms)
});
monitor.on('timeout', () => {
console.log('Connection appears dead, reconnecting...');
});
// Call when heartbeat received
monitor.heartbeatReceived();
// Start/stop monitoring
monitor.start();
monitor.stop();
Re-exported Types
This package re-exports all types from@enclave-vm/types for convenience:
import {
// Protocol
SessionId,
CallId,
generateSessionId,
// Events
StreamEvent,
EventType,
isStdoutEvent,
// Schemas
StreamEventSchema,
parseStreamEvent,
} from '@enclave-vm/stream';