Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ final case class Storage(
cache: String = "tmp",
dir: String = "notebooks",
mounts: Map[String, Mount] = Map.empty,
lockNotebooks: Boolean = false,
wal: Wal = Wal()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package polynote.kernel


import polynote.messages.{CellID, Notebook, NotebookUpdate}
import zio.blocking.Blocking
import zio.{IO, RIO, Task, UIO, ZIO}

/**
Expand Down Expand Up @@ -54,7 +55,7 @@ trait NotebookRef {
/**
* Close the notebook.
*/
def close(): Task[Unit]
def close(): RIO[BaseEnv, Unit]

def isOpen: UIO[Boolean]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object NotebookSession {
def stream(path: String, input: Stream[Throwable, Frame]): ZIO[SessionEnv with NotebookManager, HTTPError, Stream[Throwable, Frame]] = {
for {
_ <- NotebookManager.assertValidPath(path)
publisher <- NotebookManager.open(path).orElseFail(NotFound(path))
publisher <- NotebookManager.open(path).tapError(Logging.error(s"Failed to open $path", _))
output <- ZQueue.unbounded[Take[Nothing, Message]]
publishMessage <- Env.add[SessionEnv with NotebookManager](Publish(output): Publish[Task, Message])
subscriber <- publisher.subscribe().orDie
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ package object server {

private val maxRetryDelay = Duration(8, TimeUnit.SECONDS)


override def open(path: String): RIO[BaseEnv with GlobalEnv, KernelPublisher] = openNotebooks.getOrCreate(path) {
for {
notebookRef <- repository.openNotebook(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import polynote.kernel.logging.Logging
import polynote.kernel.util.LongRef
import polynote.messages._
import polynote.server.repository.format.NotebookFormat
import polynote.server.repository.fs.{LocalFilesystem, NotebookFilesystem, WAL}, WAL.WALWriter
import polynote.server.repository.fs.{LocalFilesystem, NotebookFile, NotebookFilesystem, WAL}
import WAL.WALWriter
import scodec.Codec
import zio.{Fiber, IO, Promise, Queue, RIO, Ref, Schedule, Semaphore, Task, UIO, URIO, ZIO}
import zio.ZIO.effectTotal
import zio.blocking.effectBlocking
import zio.blocking.{Blocking, effectBlocking}
import zio.clock.currentDateTime
import zio.duration.Duration
import zio.stream.Take
Expand All @@ -41,6 +42,7 @@ class FileBasedRepository(
log: Logging.Service,
renameLock: Semaphore,
process: Promise[Nothing, Fiber[Nothing, Unit]],
notebookFile: NotebookFile,
wal: Task[WALWriter],
saveIntervalMillis: Long = 5000L,
maxSaveFails: Int = 12
Expand All @@ -52,9 +54,8 @@ class FileBasedRepository(
private val saveIntervalDuration = Duration(saveIntervalMillis, TimeUnit.MILLISECONDS)
private val consecutiveSaveFails = LongRef.zeroSync


private val setNeedSave = effectTotal(needSave.lazySet(true))
private val save = (renameLock.withPermit(get.flatMap(saveNotebook)) *> effectTotal(needSave.lazySet(false)))
private val save = (renameLock.withPermit(get >>= encodeNotebook >>= notebookFile.overwrite) *> effectTotal(needSave.lazySet(false)))
.catchAll {
err => consecutiveSaveFails.incrementAndGet.flatMap {
case count if count >= maxSaveFails =>
Expand Down Expand Up @@ -128,11 +129,12 @@ class FileBasedRepository(
} yield newPath
} <* setNeedSave

override def close(): Task[Unit] =
override def close(): RIO[BaseEnv, Unit] =
closed.succeed(()) *>
pending.offer(Take.End) *>
pending.awaitShutdown *>
process.await.flatMap(_.join) *>
notebookFile.close() *>
closed.await

override val isOpen: UIO[Boolean] = closed.isDone.map(!_)
Expand Down Expand Up @@ -232,32 +234,36 @@ class FileBasedRepository(
case false => ZIO.succeed(WALWriter.NoWAL)
}

def apply(notebook: Notebook, path: Path): RIO[BaseEnv with GlobalEnv, FileNotebookRef] = for {
def apply(path: String): RIO[BaseEnv with GlobalEnv, FileNotebookRef] = for {
log <- Logging.access
nbFile <- fs.openNotebookFile(pathOf(path))
notebook <- nbFile.readContent() >>= (decodeNotebook(path, _))
current <- Ref.make(0 -> notebook)
closed <- Promise.make[Throwable, Unit]
pending <- Queue.unbounded[Take[Nothing, (NotebookUpdate, Option[Promise[Nothing, (Int, Notebook)]])]]
renameLock <- Semaphore.make(1L)
process <- Promise.make[Nothing, Fiber[Nothing, Unit]]
env <- ZIO.environment[BaseEnv with GlobalEnv]
wal <- openWAL(notebook.path.stripPrefix("/")).tap(_.writeHeader(notebook)).provide(env).memoize
ref = new FileNotebookRef(current, pending, closed, log, renameLock, process, wal)
ref = new FileNotebookRef(current, pending, closed, log, renameLock, process, nbFile, wal)
_ <- ref.init().onError(_ => ref.close().orDie)
} yield ref
}

override def loadNotebook(path: String): RIO[BaseEnv with GlobalEnv, Notebook] = for {
fmt <- NotebookFormat.getFormat(pathOf(path))
content <- fs.readPathAsString(pathOf(path))
(noExtPath, _) = extractExtension(path)
nb <- fmt.decodeNotebook(noExtPath, content)
override def loadNotebook(pathStr: String): RIO[BaseEnv with GlobalEnv, Notebook] = for {
path <- effectTotal(pathOf(pathStr))
content <- fs.readPathAsString(path)
nb <- decodeNotebook(pathStr, content)
} yield nb

override def openNotebook(pathStr: String): RIO[BaseEnv with GlobalEnv, NotebookRef] = for {
nb <- loadNotebook(pathStr)
path = pathOf(pathStr)
ref <- FileNotebookRef(nb, path)
} yield ref
override def openNotebook(pathStr: String): RIO[BaseEnv with GlobalEnv, NotebookRef] =
FileNotebookRef(pathStr)

private def decodeNotebook(path: String, string: String) = for {
fmt <- NotebookFormat.getFormat(Paths.get(path))
(noExtPath, _) = extractExtension(path)
nb <- fmt.decodeNotebook(noExtPath, string)
} yield nb

private def encodeNotebook(nb: Notebook) = for {
fmt <- NotebookFormat.getFormat(Paths.get(nb.path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class TreeRepository (
}
}

override def close(): Task[Unit] = currentRef.get.flatMap(_.ref.close()) <* closed.succeed(())
override def close(): RIO[BaseEnv, Unit] = currentRef.get.flatMap(_.ref.close()) <* closed.succeed(())

override def awaitClosed: Task[Unit] = closed.await

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package polynote.server.repository.fs
import java.io.{FileNotFoundException, InputStream, OutputStream}
import java.io.{FileNotFoundException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.charset.StandardCharsets
import java.nio.file.{FileVisitOption, Files, Path, StandardOpenOption}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import fs2.Chunk
import polynote.kernel.BaseEnv
import polynote.kernel.{BaseEnv, GlobalEnv}
import zio.blocking.{Blocking, effectBlocking}
import zio.interop.catz._
import zio.{RIO, Semaphore, Task, ZIO}
import zio.{Fiber, RIO, RManaged, Semaphore, Task, ZIO}
import zio.ZIO.effect

import scala.collection.JavaConverters._
import LocalFilesystem.FileChannelWALWriter
import polynote.kernel.environment.Config

class LocalFilesystem(maxDepth: Int = 4) extends NotebookFilesystem {

Expand All @@ -22,6 +24,8 @@ class LocalFilesystem(maxDepth: Int = 4) extends NotebookFilesystem {
content <- readBytes(is).ensuring(ZIO.effectTotal(is.close()))
} yield new String(content.toArray, StandardCharsets.UTF_8)

override def openNotebookFile(path: Path): RIO[BaseEnv with GlobalEnv, NotebookFile] = LocalFilesystem.FileChannelNotebookFile(path)

override def writeStringToPath(path: Path, content: String): RIO[BaseEnv, Unit] = for {
_ <- createDirs(path)
_ <- effectBlocking(Files.write(path, content.getBytes(StandardCharsets.UTF_8))).uninterruptible
Expand Down Expand Up @@ -65,6 +69,7 @@ class LocalFilesystem(maxDepth: Int = 4) extends NotebookFilesystem {
}

object LocalFilesystem {
import StandardOpenOption._

final class FileChannelWALWriter private (mkChannel: => FileChannel, lock: Semaphore) extends WAL.WALWriter {

Expand All @@ -90,11 +95,56 @@ object LocalFilesystem {
}

object FileChannelWALWriter {
import StandardOpenOption._
def apply(path: Path): RIO[Blocking, FileChannelWALWriter] = for {
lock <- Semaphore.make(1L)
channel <- effectBlocking(FileChannel.open(path, WRITE, CREATE_NEW))
} yield new FileChannelWALWriter(channel, lock)
}

final class FileChannelNotebookFile private (channel: FileChannel, writeLock: Semaphore) extends NotebookFile {
override def overwrite(content: String): RIO[BaseEnv, Unit] = writeLock.withPermit {
val bytes = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))
effect(channel.position(0L)) *>
effectBlocking(channel.write(bytes)).doWhile(_ => bytes.hasRemaining) *>
effectBlocking(channel.truncate(bytes.position()))
}

override def readContent(): RIO[BaseEnv, String] =
effectBlocking {
val size = channel.size() match {
case s if s > Int.MaxValue => throw new IOException(s"Notebook file size ($s) exceeds limit of 2GB")
case s => s.toInt
}
new Array[Byte](size)
}.flatMap {
bytes =>
val buf = ByteBuffer.wrap(bytes)
effectBlocking(channel.read(buf, buf.position())).doWhile(_ => buf.hasRemaining) *>
effect(new String(bytes, StandardCharsets.UTF_8))
}

override def close(): RIO[BaseEnv, Unit] = writeLock.withPermit {
effectBlocking(channel.force(true)) *> effectBlocking(channel.close())
}
}

object FileChannelNotebookFile {

// TODO: should allow other polynote instances to read the file? We'd need UX for read-only mode.
def apply(path: Path): RIO[Blocking with Config, FileChannelNotebookFile] = {

// file lock will be released when channel is closed
def lockFile(channel: FileChannel) =
effectBlocking(Option(channel.tryLock(0L, 0L, false)))
.someOrFail(new IOException(s"Couldn't obtain a lock on $path – is another instance of Polynote using this file?"))
.whenM(Config.access.map(_.storage.lockNotebooks))

for {
channel <- effectBlocking(FileChannel.open(path, READ, WRITE, CREATE))
_ <- lockFile(channel)
semaphore <- Semaphore.make(1L)
} yield new FileChannelNotebookFile(channel, semaphore)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import java.io.OutputStream
import java.nio.ByteBuffer
import java.nio.file.Path

import polynote.kernel.BaseEnv
import polynote.kernel.{BaseEnv, GlobalEnv}
import polynote.messages.Message
import scodec.bits.BitVector
import zio.blocking.Blocking
import zio.{RIO, Task, ZIO}
import zio.{RIO, RManaged, Task, ZIO}

/**
* Encapsulates useful Notebook-related FS stuff.
Expand All @@ -20,6 +20,9 @@ trait NotebookFilesystem {

def readPathAsString(path: Path): RIO[BaseEnv, String]

// TODO: this really should be Managed
def openNotebookFile(path: Path): RIO[BaseEnv with GlobalEnv, NotebookFile]

def writeStringToPath(path: Path, content: String): RIO[BaseEnv, Unit]

def createLog(path: Path): RIO[BaseEnv, WAL.WALWriter]
Expand All @@ -38,3 +41,9 @@ trait NotebookFilesystem {

def init(path: Path): RIO[BaseEnv, Unit]
}

trait NotebookFile {
def overwrite(content: String): RIO[BaseEnv, Unit]
def readContent(): RIO[BaseEnv, String]
def close(): RIO[BaseEnv, Unit]
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package polynote.server.repository

import java.io.{ByteArrayOutputStream, File, OutputStream}
import java.io.{ByteArrayOutputStream, File, FileNotFoundException, OutputStream}
import java.nio.channels.SeekableByteChannel
import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -11,8 +11,8 @@ import polynote.kernel.BaseEnv
import polynote.messages.{Notebook, ShortList}
import polynote.server.MockServerSpec
import polynote.server.repository.format.NotebookFormat
import polynote.server.repository.fs.{NotebookFilesystem, WAL}
import zio.{RIO, ZIO}
import polynote.server.repository.fs.{NotebookFile, NotebookFilesystem, WAL}
import zio.{RIO, RManaged, ZIO, ZManaged}
import zio.blocking.effectBlocking

import scala.collection.JavaConverters._
Expand All @@ -25,8 +25,16 @@ class FileBasedRepositorySpec extends FreeSpec with Matchers with BeforeAndAfter

private val notebooks = new ConcurrentHashMap[Path, String]()

private class File(path: Path) extends NotebookFile {
override def overwrite(content: String): RIO[BaseEnv, Unit] = ZIO.effectTotal(notebooks.put(path, content))
override def readContent(): RIO[BaseEnv, String] = ZIO.effectTotal(Option(notebooks.get(path))).someOrFail(new FileNotFoundException())
override def close(): RIO[BaseEnv, Unit] = ZIO.unit
}

override def readPathAsString(path: Path): RIO[BaseEnv, String] = withKeyContent(path)((_, content) => content)

override def openNotebookFile(path: Path): RIO[BaseEnv, NotebookFile] = ZIO.succeed(new File(path))

override def writeStringToPath(path: Path, content: String): RIO[BaseEnv, Unit] = ZIO(notebooks.put(tmpDir.resolve(path), content))

override def createLog(path: Path): RIO[BaseEnv, WAL.WALWriter] = ZIO.succeed(WAL.WALWriter.NoWAL)
Expand Down