fs2-nats brings the full NATS surface to the Typelevel stack: core pub/sub and request/reply, JetStream, Key-Value and Object Store — all modelled as fs2 Streams and cats-effect Resources, so subscriptions are values and lifecycles are safe by construction.
A functional, streaming NATS client for Scala 3 — built on Cats Effect 3 and FS2.
Every subscription is a Stream. Every connection is a Resource. Nothing happens until you ask it to.
fs2-nats lets you talk to NATS the functional way. Subscriptions are native fs2.Streams,
the client is a Resource that cleans up after itself, reconnection and backpressure are
built in, and the whole surface — core pub/sub, request/reply, JetStream, Key-Value, and
Object Store — stays inside Cats Effect from the first byte to the last. No callbacks, no
hidden threads, no surprises. 🎉
fs2.Stream[F, NatsMessage].creds)TLSContext| Version | |
|---|---|
| Scala | 3.3.7 (Scala 3.3 LTS) |
| JDK | 11, 17, 21, 25 — minimum JDK 11 |
| Cats Effect | 3.7.x |
| FS2 | 3.13.x |
| NATS Server | 2.9+ recommended |
NATS server versions: core messaging, headers, and JetStream work against NATS Server 2.2+. The Key-Value and Object Store features use the JetStream Direct Get API, which requires NATS Server 2.9+. The test suite runs against the latest
natsDocker image.
Latest release:
0.2.0— published to Maven Central for Scala 3.
sbt
libraryDependencies += "de.thatscalaguy" %% "fs2-nats" % "0.2.0"
Mill
ivy"de.thatscalaguy::fs2-nats:0.2.0"
scala-cli
//> using dep de.thatscalaguy::fs2-nats:0.2.0
Start a NATS server:
docker run -p 4222:4222 nats:latest
import cats.effect.{IO, IOApp, ExitCode}
import com.comcast.ip4s.{Host, Port}
import fs2.Chunk
import fs2.nats.client.{ClientConfig, NatsClient}
object Main extends IOApp:
override def run(args: List[String]): IO[ExitCode] =
val config = ClientConfig(
host = Host.fromString("localhost").get,
port = Port.fromInt(4222).get
)
NatsClient.connect[IO](config).use { client =>
for
// Subscribe to a subject
_ <- client.subscribe("hello.world").use { messages =>
for
// Publish a message
_ <- client.publish(
"hello.world",
Chunk.array("Hello, NATS!".getBytes)
)
// Receive the message
msg <- messages.take(1).compile.lastOrError
_ <- IO.println(s"Received: ${msg.payloadAsString}")
yield ()
}
yield ExitCode.Success
}
import fs2.nats.protocol.Headers
val headers = Headers(
"X-Request-Id" -> "abc123",
"X-Timestamp" -> System.currentTimeMillis().toString
)
client.publish(
"events.created",
Chunk.array("""{"id": 1}""".getBytes),
headers
)
// Subscribe to all events under events.*
client.subscribe("events.*").use { messages =>
messages.evalMap { msg =>
IO.println(s"${msg.subject}: ${msg.payloadAsString}")
}.compile.drain
}
// Subscribe to all events under events.>
client.subscribe("events.>").use { messages =>
// Handles events.a, events.a.b, events.a.b.c, etc.
messages.compile.drain
}
// Multiple subscribers in same queue group share messages
client.subscribe("work.queue", queueGroup = Some("workers")).use { messages =>
messages.evalMap { msg =>
processWork(msg)
}.compile.drain
}
import fs2.nats.client.ClientEvent
client.events.evalMap {
case ClientEvent.Connected(info) =>
IO.println(s"Connected to ${info.serverId}")
case ClientEvent.Disconnected(reason, willReconnect) =>
IO.println(s"Disconnected: $reason, reconnecting: $willReconnect")
case ClientEvent.Reconnected(info, attempt) =>
IO.println(s"Reconnected after $attempt attempts")
case ClientEvent.SlowConsumer(sid, subject, dropped) =>
IO.println(s"Slow consumer on $subject, dropped $dropped messages")
case other =>
IO.println(s"Event: $other")
}.compile.drain
// Send a request and await a single reply (shared response inbox).
val reply = client.request("service.echo", Chunk.array("ping".getBytes))
// Fails fast with NatsError.NoResponders if nobody is listening (503),
// or NatsError.Timeout if no reply arrives within the timeout.
JetStream is obtained as a Resource over a connected client (requires a
JetStream-enabled server, e.g. nats-server -js). It owns the publish window
and supervised fibers, and releases them with the Resource.
import fs2.nats.jetstream.*
import fs2.nats.jetstream.protocol.*
client.jetStream().use { js =>
for
// Stream management
_ <- js.addStream(StreamConfig(name = "ORDERS", subjects = List("orders.>")))
// Persistent publish with PubAck (+ dedup via Nats-Msg-Id)
ack <- js.publish(
"orders.new",
Chunk.array("order #1".getBytes),
opts = PublishOptions(msgId = Some("order-1"))
)
_ <- IO.println(s"stored seq=${ack.seq} duplicate=${ack.duplicate}")
// Pull consumer: create + fetch + ack
c <- js.createConsumer(
"ORDERS",
ConsumerConfig(durable = Some("workers"), filterSubject = Some("orders.new"))
)
msgs <- c.fetch(batch = 10, maxWait = 2.seconds)
_ <- msgs.traverse_(m => process(m.payload) *> m.ack)
yield ()
}
Continuous pull consumption (background pull loop owned by the Resource):
c.consume().use { stream =>
stream.evalMap(m => process(m.payload) *> m.ack).compile.drain
}
Push consumption (durable or ephemeral, optional queue group). Idle heartbeats are filtered and flow-control requests answered automatically; ephemeral consumers are deleted on release:
js.subscribePush(
"ORDERS",
ConsumerConfig(durable = Some("push-workers"), deliverGroup = Some("workers"))
)
.use(_.evalMap(m => process(m.payload) *> m.ack).compile.drain)
Ack semantics: ack (fire-and-forget), ackSync (double-ack, awaits server
confirmation), nak / nakWithDelay, inProgress (resets the ack-wait timer),
and term / termWith. Finalizing acks take effect once; inProgress is
repeatable.
Reconnect: push and pull subscriptions ride the client's automatic
subscription replay on reconnect; the pull consume loop additionally re-issues
its request on a cadence so it resumes after a dropped connection.
A Key-Value bucket is an opinionated JetStream stream (KV_<bucket>, subjects
$KV.<bucket>.>). KV handles are obtained from the JetStream context. Reads use
JetStream Direct Get when the bucket allows it (allow_direct, the default),
so a get returns the raw message payload with no JSON/base64 decoding on the
hot path; writes ride the JetStream publish/coalescing window.
import fs2.nats.kv.*
client.jetStream().use { js =>
for
// Create a bucket keeping the last 5 revisions of each key
kv <- js.createKeyValue(KvConfig(bucket = "config", history = 5))
// Put returns the new revision (the entry's stream sequence)
rev <- kv.put("db.url", Chunk.array("postgres://localhost".getBytes))
cur <- kv.get("db.url") // Option[KvEntry] (Direct Get)
// Optimistic concurrency: only writes if the revision still matches
rev2 <- kv.update("db.url", Chunk.array("postgres://prod".getBytes), rev)
// create fails if the key exists; delete writes a tombstone, purge
// collapses a key's history
_ <- kv.delete("legacy")
keys <- kv.keys.compile.toList // live keys (excludes deletes)
yield ()
}
Watch the snapshot and live updates. The stream delivers the current entries,
then a single KvWatchEvent.EndOfData marker, then live changes:
kv.watch(">").use { stream =>
stream.evalMap {
case KvWatchEvent.Entry(e) => onChange(e.key, e.value, e.operation)
case KvWatchEvent.EndOfData => IO.println("caught up")
}.compile.drain
}
Bucket management lives on the JetStream context: createKeyValue, keyValue
(bind to an existing bucket), deleteKeyValue, keyValueStatus, and
keyValueNames. create/update raise NatsError.KeyValueWrongLastSequence
when their optimistic-concurrency precondition fails.
keys/history/watch stream from a gap-resetting ordered consumer, so a
reconnect mid-watch recovers in order rather than missing updates.
An Object Store bucket is an opinionated JetStream stream (OBJ_<bucket>,
subjects $O.<bucket>.C.> for chunks and $O.<bucket>.M.> for per-object
meta). It stores arbitrarily large binary objects by chunking them across the
stream, with a rolled-up meta message recording each object's size, chunk count,
and SHA-256 digest. Both put and get are fully streaming — neither
materializes a whole object in memory.
import fs2.nats.objectstore.*
client.jetStream().use { js =>
for
os <- js.createObjectStore(ObjConfig(bucket = "assets"))
// Stream bytes in (here from a file); chunks are pipelined through the
// publish window and coalesced into socket writes. Nothing is buffered whole.
info <- os.put(
ObjectMeta("logo.png", maxChunkSize = 128 * 1024),
fs2.io.file.Files[IO].readAll(fs2.io.file.Path("logo.png"))
)
// Stream bytes out; the SHA-256 digest is verified once all chunks are read.
_ <- os.get("logo.png").flatMap {
case Some(r) => r.data.through(sink).compile.drain
case None => IO.unit
}
// Convenience for small objects and files
_ <- os.putBytes(ObjectMeta("readme.txt"), Chunk.array("hi".getBytes))
txt <- os.getBytes("readme.txt") // Option[Chunk[Byte]]
_ <- os.putFile("backup.tar", fs2.io.file.Path("backup.tar"))
_ <- os.getToFile("backup.tar", fs2.io.file.Path("restored.tar"))
yield ()
}
Objects support links (addLink/addBucketLink, transparently resolved on
get/info), metadata updates and rename (no re-upload), delete, list,
watch (snapshot + EndOfData + live updates), and seal (make the bucket
read-only). Reads of object meta use the JetStream Direct Get fast path when
the bucket allows it; chunk reads use the gap-resetting ordered consumer, so a
get recovers in order across a reconnect. Bucket management lives on the
JetStream context: createObjectStore, objectStore, deleteObjectStore,
objectStoreStatus, objectStoreNames.
fs2-nats supports every client-side NATS authentication mechanism. Choose one
by setting ClientConfig.credentials.
ClientConfig(host = host, port = port, credentials = Some(NatsCredentials.Token("s3cr3t")))
ClientConfig(host = host, port = port, credentials = Some(NatsCredentials.UserPassword("user", "pass")))
Provide the NKey seed (an S... string); the client signs the server's nonce
and derives the public key from it:
ClientConfig(host = host, port = port, credentials = Some(NatsCredentials.NKey("SUAB...seed...")))
.creds files)Operator-mode deployments (NGS / Synadia Cloud / self-hosted with nsc) issue a
.creds file bundling a user JWT and an NKey seed. Load it directly:
import fs2.io.file.Path
import fs2.nats.client.{ClientConfig, NatsClient, NatsCredentials}
NatsCredentials.fromCredsFile[IO](Path("user.creds")).flatMap { creds =>
NatsClient
.connect[IO](ClientConfig(host = host, port = port, credentials = Some(creds)))
.use { client => /* ... */ }
}
NatsCredentials.fromCreds(content) parses an already-loaded string.
Set useTls = true and supply a TLSContext — one is required; the client
never falls back to plaintext or a default context:
import fs2.io.net.Network
val tls = Network[IO].tlsContext.system // or .fromSSLContext(...)
NatsClient
.connect[IO](ClientConfig(host = host, port = port, useTls = true), tlsContext = Some(tls))
.use { client => /* ... */ }
The client follows the standard NATS handshake: it reads the plaintext INFO,
then upgrades the connection to TLS. Servers configured with
handshake_first: true (TLS before INFO) are not supported.
For mutual TLS, build the TLSContext from an SSLContext whose KeyManager
presents your client certificate (and whose TrustManager trusts the server's
CA), then pass it exactly as above:
val tls = Network[IO].tlsContext.fromSSLContext(mySslContext)
import scala.concurrent.duration._
import fs2.nats.client._
val config = ClientConfig(
host = Host.fromString("nats.example.com").get,
port = Port.fromInt(4222).get,
useTls = false,
tlsParams = None,
name = Some("my-app"),
credentials = Some(NatsCredentials.UserPassword("user", "pass")),
backoff = BackoffConfig(
baseDelay = 100.millis,
maxDelay = 30.seconds,
factor = 2.0,
maxRetries = None // unlimited
),
queueCapacity = 10000,
slowConsumerPolicy = SlowConsumerPolicy.Block,
verbose = false,
pedantic = false,
echo = true
)
When a subscription queue fills up:
SlowConsumerPolicy.Block - Backpressure (default)SlowConsumerPolicy.DropNew - Drop incoming messagesSlowConsumerPolicy.DropOldest - Drop oldest queued messagesSlowConsumerPolicy.ErrorAndDrop - Emit event and dropimport fs2.nats.client.Backoff
// Exponential backoff with jitter (recommended)
val policy = Backoff.exponentialWithJitter(
base = 100.millis,
max = 30.seconds,
factor = 2.0,
maxRetries = Some(10)
)
// Fixed delay
val fixed = Backoff.fixed(5.seconds, maxRetries = Some(5))
// No delay (for testing)
val immediate = Backoff.immediate(maxRetries = 3)
fs2.nats
├── client/
│ ├── NatsClient # Main public API
│ ├── ClientConfig # Configuration
│ ├── ConnectionManager # Connection lifecycle & reconnection
│ └── Backoff # Retry policies
├── protocol/
│ ├── ProtocolParser # Incremental NATS protocol parser
│ ├── NatsModel # Protocol data types (Info, Connect, etc.)
│ ├── Headers # NATS/1.0 headers support
│ └── NatsFrame # Parsed frame ADT
├── transport/
│ ├── Transport # Transport abstraction
│ ├── NatsSocket # TCP transport
│ └── TlsTransport # TLS transport wrapper
├── subscriptions/
│ ├── SubscriptionManager # Message routing & slow consumer handling
│ ├── SidAllocator # Subscription ID allocation
│ └── NatsMessage # User-facing message type
├── publish/
│ ├── Publisher # Publish with max_payload validation
│ └── SerializationUtils # Protocol serialization
└── errors/
└── NatsError # Error ADT
Run unit tests:
sbt test
Run integration tests (requires NATS server):
docker-compose up -d
sbt integration/test
docker-compose down
docker-compose.yml defines a broker per auth mode — nats-nkey (4223),
nats-token (4224), nats-userpass (4225), nats-creds (4226, operator mode),
nats-tls (4227) and nats-mtls (4228) — alongside the default nats (4222).
The decentralized-JWT fixtures (integration/src/test/resources/testuser.creds,
nats-creds.conf) are generated with nsc, and the TLS certificates are
regenerated with integration/src/test/resources/tls/gen-certs.sh.
A .githooks/pre-push hook runs the same gate CI enforces (githubWorkflowCheck,
scalafmtCheckAll/headerCheckAll, test, mimaReportBinaryIssues, doc).
Enable it once per clone:
git config core.hooksPath .githooks
Bypass a single push with SKIP_PREPUSH=1 git push.
See the examples/ directory for complete examples:
Basic.scala - Simple publish/subscribeRequestReplyExample - Request/reply patternQueueGroupExample - Load-balanced workersJetStreamExample.scala - Streams, persistent publish, and pull consumption (requires -js)KeyValueExample.scala - Key-Value buckets: put/get, optimistic concurrency, keys, and watch (requires -js)ObjectStoreExample.scala - Object Store: streaming put/get, links, list, watch, and seal (requires -js)Run examples:
sbt "runMain fs2.nats.examples.Basic"
fs2-nats is built and maintained by ThatScalaGuy
Need extended or help on your project? Get in touch at thatscalaguy.de.
Contributions are welcome! Issues and pull requests are happily accepted over on GitHub. The pre-push hook above runs the same checks as CI, so enabling it is the quickest way to keep the build green.
Licensed under the Apache License 2.0.
Built and maintained by Sven — need it in production, extended or reviewed?
GitHub ↗Hire the author →