"""Implementation of the backend using sc3nb."""
from __future__ import annotations
import tempfile
import warnings
from abc import abstractmethod
from functools import reduce
from operator import iconcat
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Sequence, Union
from weakref import WeakKeyDictionary
from mesonic.backend.bases import (
Backend,
BufferManager,
EventHandler,
RecordEventHandler,
RecordManager,
SynthEventHandler,
SynthManager,
)
from mesonic.buffer import Buffer, BufferInfo
from mesonic.events import (
Event,
RecordEvent,
RecordEventType,
SynthEvent,
SynthEventType,
)
from mesonic.record import Record
from mesonic.synth import ParameterInfo, Synth
if TYPE_CHECKING:
from typing import List
from mesonic.context import Context
from sc3nb.sc_objects.server import ServerOptions
import sc3nb.osc.osc_communication as scosc
import sc3nb.sc_objects.buffer as scbuf
import sc3nb.sc_objects.node as scnode
import sc3nb.sc_objects.recorder as screcorder
from sc3nb.sc import startup
from sc3nb.sc_objects.score import Score
from sc3nb.sc_objects.synthdef import SynthDef
[docs]
class EventHandlerSC3NB(EventHandler):
"""A abstract EventHandler as basis sc3nb EventHandlers.
Breaks down the handling of mutliple Events to just single events by using
the sc3nb.Bundler.
"""
backend: "BackendSC3NB"
@abstractmethod
def _handle_event(self, event: Event):
...
[docs]
def handle(self, time, events: Iterable[Event], reversed: bool, **kwargs) -> None:
if time is None:
time = 0
with self.backend.sc.server.bundler(time):
for event in events:
if reversed:
event = event.reverse()
self._handle_event(event)
[docs]
class BufferManagerSC3NB(BufferManager):
"""BufferManager for sc3nb"""
def _create_buffer(self, buf: scbuf.Buffer) -> Buffer:
assert buf.allocated
buf_info = BufferInfo(buf.samples, buf.channels, buf.sr) # type: ignore
buffer = Buffer(self._context, buf_info)
self._buffers[buffer] = buf
return buffer
[docs]
def from_data(self, data, sr, **kwargs) -> Buffer:
buf = scbuf.Buffer(server=self._backend.sc.server).load_data(data, sr, **kwargs)
return self._create_buffer(buf)
[docs]
def from_file(
self,
path,
starting_frame: int = 0,
num_frames: int = -1,
channels: Optional[Union[int, Sequence[int]]] = None,
**kwargs,
) -> Buffer:
buf = scbuf.Buffer(server=self._backend.sc.server).read(
path, starting_frame, num_frames, channels, **kwargs
)
return self._create_buffer(buf)
[docs]
class SynthEventHandlerSC3NB(EventHandlerSC3NB, SynthEventHandler):
"""SynthEventHandler for sc3nb"""
def __init__(
self, backend: "BackendSC3NB", synths: WeakKeyDictionary[Synth, scnode.Synth]
) -> None:
self.backend = backend
self.synths = synths
def _handle_event(self, event: Event):
assert isinstance(event, SynthEvent), "wrong Event type"
synth = event.synth
try:
backend_part = self.synths[synth]
except KeyError:
warnings.warn(f"Received event for unkown Synth: {synth}")
return
etype = event.etype
if synth.mutable:
node = backend_part
# TODO use pattern matching when `python_requires = >= 3.10`
if etype is SynthEventType.START:
node.new(controls=event.data)
elif etype is SynthEventType.STOP:
node.free()
elif etype is SynthEventType.PAUSE:
node.run(False)
elif etype is SynthEventType.RESUME:
node.run(True)
elif etype is SynthEventType.SET:
data = event.data
node.set(data["name"], data["new_value"])
else: # immutable synth
if etype not in [SynthEventType.START, SynthEventType.STOP]:
raise RuntimeError(
f"Immutable Synth should only produce {SynthEventType.START}"
f" and {SynthEventType.STOP} but got {etype}"
)
new_immutable_synth = backend_part
new_immutable_synth(event=event)
[docs]
class SynthManagerSC3NB(SynthManager):
_backend: "BackendSC3NB"
buffer_synthdefs: Dict[str, str] = {
"playbuf": """
{ |out=0, bufnum={{BUFNUM}}, rate=1, loop=0, pan=0, amp=0.3 |
var sig = PlayBuf.ar({{NUM_CHANNELS}}, bufnum,
rate*BufRateScale.kr(bufnum),
loop: loop,
doneAction: Done.freeSelf);
Out.ar(out, Pan2.ar(sig, pan, amp))
}""",
}
buffer_synthdefs_slots = {
"NUM_CHANNELS": lambda scbuffer: scbuffer.channels,
"BUFNUM": lambda scbuffer: scbuffer.bufnum,
}
sent_synthdefs = set()
def _create_event_handler(
self, backend: "BackendSC3NB", synths: WeakKeyDictionary[Synth, scnode.Synth]
) -> SynthEventHandler:
return SynthEventHandlerSC3NB(backend, synths)
[docs]
def create(
self, name: str, track: int = 0, *, mutable: bool = True, **kwargs
) -> Synth:
# check if synth name is known to sc3nb
synth_desc = SynthDef.get_description(name, self._backend.sc.lang)
if synth_desc is None:
raise ValueError(
f"sc3nb backend does not know a Synth with the name '{name}'"
)
# collect the parameters
# Parameter(self, info.name, info.default)
params_info: List[ParameterInfo] = []
for param_name, info in synth_desc.items():
params_info.append(ParameterInfo(param_name, info.default))
context_group: scnode.Group = self._backend.get_context_vars(self._context)[
CONTEXT_GROUP
]
# create the backend part
if mutable: # create sc3nb Synth as backend part.
node = scnode.Synth(
name,
new=False,
target=context_group.nodeid,
server=self._backend.sc.server,
**kwargs,
)
backend_part = node
else: # immutable Synth -> create helper method as backend part
def new_immutable_synth(event: SynthEvent):
msg = scosc.OSCMessage(
scnode.SynthCommand.NEW,
[
name,
-1,
scnode.AddAction.TO_HEAD.value,
context_group.nodeid,
]
+ reduce(iconcat, event.data.items(), []),
)
self._backend.sc.server.send(msg, bundle=True, await_reply=False)
backend_part = new_immutable_synth
# create the frontend / mesonic Synth
synth = Synth(
context=self._context,
name=name,
mutable=mutable,
param_info=params_info,
track=track,
)
# save the frontend backend pair
self._synths[synth] = backend_part
return synth
def _create_buffer_synthdef(self, scbuffer: scbuf.Buffer, synth_name) -> str:
try:
synth_def_code = SynthManagerSC3NB.buffer_synthdefs[synth_name]
except KeyError as error:
raise ValueError(
f"'{synth_name}' is not a known buffer SynthDef pattern"
) from error
gap_values = {
gap: fun(scbuffer)
for gap, fun in SynthManagerSC3NB.buffer_synthdefs_slots.items()
}
identifier = f"sc3nb_{synth_name}_{scbuffer.bufnum}"
if identifier not in SynthManagerSC3NB.sent_synthdefs:
SynthDef(
name=f"sc3nb_{synth_name}_{scbuffer.bufnum}", definition=synth_def_code
).set_contexts(gap_values).add()
SynthManagerSC3NB.sent_synthdefs.add(identifier)
return identifier
[docs]
def from_buffer(
self, buffer: Buffer, synth_name: str = "playbuf", **synth_kwargs
) -> Synth:
buffer_manager = self._context.managers.get("buffers", None)
if buffer_manager is None:
raise RuntimeError("Context does not have a valid BufferManager.")
scbuffer = buffer_manager._buffers[buffer]
synth_name = self._create_buffer_synthdef(scbuffer, synth_name)
synth = self.create(synth_name, **synth_kwargs)
synth.metadata.update({"from_buffer": buffer})
return synth
[docs]
def add_buffer_synth_def(self, name, code=None, **kwargs):
if code is None:
raise ValueError('Must provide "code"')
identifier_start = f"sc3nb_{name}"
if name in SynthManagerSC3NB.buffer_synthdefs:
SynthManagerSC3NB.sent_synthdefs = {
synth_def
for synth_def in SynthManagerSC3NB.sent_synthdefs
if not synth_def.startswith(identifier_start)
}
SynthManagerSC3NB.buffer_synthdefs[name] = code
[docs]
def add_synth_def(self, name, code=None, **kwargs):
if code is None:
raise ValueError('Must provide "code"')
SynthDef(name, code).add()
[docs]
class RecordEventHandlerSC3NB(EventHandlerSC3NB, RecordEventHandler):
def __init__(
self,
backend: "BackendSC3NB",
records: WeakKeyDictionary[Record, screcorder.Recorder],
):
self.backend = backend
self.records = records
def _handle_event(self, event: Event):
assert isinstance(event, RecordEvent), "wrong Event type"
record = event.record
sc_recorder = self.records[record]
if record.finished:
warnings.warn("Record already finished.")
return
try:
# TODO: use pattern matching when `python_requires = >= 3.10`
if event.etype is RecordEventType.START:
if sc_recorder._state != screcorder.RecorderState.RECORDING:
sc_recorder.start()
elif event.etype is RecordEventType.STOP:
sc_recorder.stop()
record._finished = True # pylint: disable=W0212
elif event.etype is RecordEventType.PAUSE:
sc_recorder.pause()
elif event.etype is RecordEventType.RESUME:
sc_recorder.resume()
except RuntimeError as error:
warnings.warn(f"Illegal backend recorder state: {error}")
[docs]
class RecordManagerSC3NB(RecordManager):
"""RecordManager for sc3nb"""
def _create_event_handler(
self, backend: "BackendSC3NB", records: WeakKeyDictionary["Record", Any]
) -> RecordEventHandler:
return RecordEventHandlerSC3NB(backend, records)
[docs]
def create(
self,
path,
track=0,
nr_channels: int = 2,
rec_header: str = "wav",
rec_format: str = "int16",
bufsize: int = 65536,
**kwargs,
) -> "Record":
sc_recorder = screcorder.Recorder(
path=path,
nr_channels=nr_channels,
rec_header=rec_header,
rec_format=rec_format,
bufsize=bufsize,
server=self._backend.sc.server,
**kwargs,
)
record = Record(context=self._context, path=path, track=track)
self._records[record] = sc_recorder
return record
# context data keys
# group for the context
CONTEXT_GROUP = "context_group"
# init hook id of the context group
CONTEXT_GROUP_HOOK = "context_group_hook"
[docs]
class BackendSC3NB(Backend):
"""Backend for sc3nb"""
def __init__(self, **kwargs) -> None:
super().__init__()
self.sc = startup(**kwargs)
@property
def sampling_rate(self):
return self.sc.server.nominal_sr
[docs]
def register_context(self, context: "Context") -> None:
# Create a group for this context
main_group = scnode.Group(server=self.sc.server)
# Make sure that this group will be created at each server init.
main_group_hook = self.sc.server.add_init_hook(main_group.new)
# Save the objects as context variables.
self._contexts[context] = {
CONTEXT_GROUP: main_group,
CONTEXT_GROUP_HOOK: main_group_hook,
}
[docs]
def unregister_context(self, context: "Context") -> None:
# free context group and the hook
self._contexts[context][CONTEXT_GROUP].deep_free()
self.sc.server.remove_init_hook(self._contexts[context][CONTEXT_GROUP_HOOK])
# remove context from the context lists
del self._contexts[context]
if len(self._contexts) == 0:
self.sc.exit()
[docs]
def stop(self, context: "Context") -> None:
# clear server schedule and free the context group.
self.sc.server.clear_schedule()
self._contexts[context][CONTEXT_GROUP].deep_free()
[docs]
def create_synth_manager(self, context: "Context") -> SynthManagerSC3NB:
return SynthManagerSC3NB(self, context)
[docs]
def create_buffer_manager(self, context: "Context") -> BufferManagerSC3NB:
return BufferManagerSC3NB(self, context)
[docs]
def create_record_manager(self, context: "Context") -> RecordManagerSC3NB:
return RecordManagerSC3NB(self, context)
[docs]
def render_nrt(
self,
context: "Context",
output_path,
options: Optional["ServerOptions"] = None,
**backend_kwargs,
) -> None:
assert (
context.backend is self
), "Provided a Context that not belongs to this Backend"
# Get the timeline
timeline = context.timeline
# Determine the time offset so that negative timestamps are positiv
time_offset = min(timeline.first_timestamp, 0)
with self.sc.server.bundler(0, send_on_exit=False) as bundler:
# Add required SynthDefs to this Bundler
for synth_def_name in context.synths.names:
try:
synth_def_blob = SynthDef.synth_defs[synth_def_name]
except KeyError as e:
raise RuntimeError(
"The SynthDef {synth_def} is not saved in SynthDef.synth_defs"
" as blob. Has it been added using SynthDef.add ?"
) from e
SynthDef.send(synth_def_blob, server=context.backend.sc.server)
# Add the context group (parent of all the Synths) to this Bundler
self._contexts[context][CONTEXT_GROUP].new(target=0)
# Add the SynthEvents from the timeline to this Bundler
for timepoint, events in timeline.to_dict().items():
synth_events = [
event for event in events if isinstance(event, SynthEvent)
]
context.synths.event_handler.handle(
timepoint + time_offset, synth_events, reversed=False
)
# Add end message to Bundler with the end_time of the Timeline
bundler.add(timeline.end_time, "/c_set", [0, 0])
# use a tempfile for the .osc file
with tempfile.TemporaryDirectory() as osc_path:
osc_file = Path(osc_path) / "nrtscore.osc"
# Use the sc3nb.Score class with the bundler messages to render
Score.record_nrt(
bundler.messages(),
osc_file.as_posix(),
output_path,
options=options,
**backend_kwargs,
)
[docs]
def quit(self) -> None:
super().quit()
self.sc.exit()