diff --git a/maelzel/core/chain.py b/maelzel/core/chain.py index fa05632..9ae0977 100644 --- a/maelzel/core/chain.py +++ b/maelzel/core/chain.py @@ -160,6 +160,7 @@ def __init__(self, if not isinstance(items, list): items = list(items) for item in items: + assert isinstance(item, MEvent) if item.parent is not None: # We need to make a copy in this case item = item.copy() @@ -1804,6 +1805,33 @@ def absorbInitialOffset(self, removeRedundantOffsets=True): self.removeRedundantOffsets() self._changed() + def _cropped(self, startbeat: F, endbeat: F, absorbOffset=False + ) -> Self: + items = [] + # frame = chain.absOffset() + for item, offset in self.itemsWithOffset(): + if offset > endbeat or (offset == endbeat and item.dur > 0): + break + + if item.dur == 0 and startbeat <= offset: + items.append(item.clone(offset=offset - startbeat)) + elif offset + item.dur > startbeat: + # Add a cropped part or the entire item? + if startbeat <= offset and offset + item.dur <= endbeat: + items.append(item.clone(offset=offset - startbeat)) + else: + if isinstance(item, MEvent): + item2 = item.cropped(startbeat, endbeat) + items.append(item2.clone(offset=item2.offset - startbeat)) + else: + # TODO: combine these two operations, if needed + self = item._cropped(startbeat, endbeat, absorbOffset=True) + items.append(self.clone(offset=self.offset - startbeat)) + out = self.clone(items=items, offset=startbeat) + if absorbOffset: + out.absorbInitialOffset() + return out + def cropped(self, start: beat_t, end: beat_t) -> Self | None: """ Returns a copy of this chain, cropped to the given beat range @@ -1821,41 +1849,13 @@ def cropped(self, start: beat_t, end: beat_t) -> Self | None: sco = self.activeScorestruct() startbeat = sco.asBeat(start) endbeat = sco.asBeat(end) - cropped = _cropped(self, startbeat=startbeat, endbeat=endbeat) - if not cropped: + cropped = self._cropped(startbeat=startbeat, endbeat=endbeat) + if not cropped.items: return None cropped.removeRedundantOffsets() return cropped -def _cropped(chain: Chain, startbeat: F, endbeat: F, absorbOffset=False - ) -> Chain: - items = [] - # frame = chain.absOffset() - for item, offset in chain.itemsWithOffset(): - if offset > endbeat or (offset == endbeat and item.dur > 0): - break - - if item.dur == 0 and startbeat <= offset: - items.append(item.clone(offset=offset - startbeat)) - elif offset + item.dur > startbeat: - # Add a cropped part or the entire item? - if startbeat <= offset and offset + item.dur <= endbeat: - items.append(item.clone(offset=offset - startbeat)) - else: - if isinstance(item, MEvent): - item2 = item.cropped(startbeat, endbeat) - items.append(item2.clone(offset=item2.offset - startbeat)) - else: - # TODO: combine these two operations, if needed - chain = _cropped(item, startbeat, endbeat, absorbOffset=True) - items.append(chain.clone(offset=chain.offset - startbeat)) - out = chain.clone(items=items, offset=startbeat) - if absorbOffset: - out.absorbInitialOffset() - return out - - class PartGroup: """ This class represents a group of parts diff --git a/maelzel/core/offline.py b/maelzel/core/offline.py index 6aa6fd1..b9f8a65 100644 --- a/maelzel/core/offline.py +++ b/maelzel/core/offline.py @@ -8,6 +8,8 @@ import numpy as np import csoundengine +import csoundengine.instr +from csoundengine import Event from csoundengine.sessionhandler import SessionHandler from maelzel.core import renderer @@ -44,6 +46,25 @@ def __init__(self, renderer: OfflineRenderer): def sched(self, event: csoundengine.event.Event): return self.renderer._schedSessionEvent(event) + def schedEvent(self, event: Event) -> csoundengine.schedevent.SchedEvent: + return self.renderer.schedEvent(event) + + def makeTable(self, + data: np.ndarray | list[float] | None = None, + size: int | tuple[int, int] = 0, + sr: int = 0, + ) -> TableProxy: + return self.renderer.makeTable(data=data, size=size, sr=sr) + + def readSoundfile(self, + path: str, + chan=0, + skiptime=0., + delay=0., + force=False, + ) -> TableProxy: + return self.renderer.readSoundfile(soundfile=path, chan=chan, skiptime=skiptime) + class OfflineRenderer(renderer.Renderer): """ @@ -159,8 +180,8 @@ def __init__(self, self.showAtExit = False """Display the results at exit if running in jupyter""" - self.csoundRenderer: csoundengine.offline.Renderer = self._makeCsoundRenderer() - """The actual csoundengine.Renderer""" + self.csoundRenderer: csoundengine.offline.OfflineSession = self._makeCsoundRenderer() + """The actual csoundengine.OfflineSession""" def isRealtime(self) -> bool: """Is this a realtime renderer?""" @@ -177,12 +198,12 @@ def getSession(self) -> csoundengine.session.Session | None: return self._session return None - def _makeCsoundRenderer(self) -> csoundengine.offline.Renderer: + def _makeCsoundRenderer(self) -> csoundengine.offline.OfflineSession: """ - Construct a :class:`csoundengine.Renderer` from this OfflineRenderer + Construct a :class:`csoundengine.OfflineSession` from this OfflineRenderer Returns: - the corresponding :class:`csoundengine.Renderer` + the corresponding :class:`csoundengine.offline.OfflineSession` """ from maelzel.core import playback renderer = self.presetManager.makeRenderer(self.sr, ksmps=self.ksmps, @@ -237,7 +258,7 @@ def getInstr(self, instrname: str) -> csoundengine.instr.Instr | None: return instr @property - def scheduledEvents(self) -> dict[int, csoundengine.offline.SchedEvent]: + def scheduledEvents(self) -> dict[int, csoundengine.schedevent.SchedEvent]: """The scheduled events""" return self.csoundRenderer.scheduledEvents @@ -362,7 +383,7 @@ def registerInstr(self, name: str, instrdef: csoundengine.instr.Instr self.instrs[name] = instrdef self.csoundRenderer.registerInstr(instrdef) - def play(self, obj: mobj.MObj, **kws) -> csoundengine.offline.SchedEventGroup: + def play(self, obj: mobj.MObj, **kws) -> csoundengine.schedevent.SchedEventGroup: """ Schedule the events generated by this obj to be renderer offline @@ -448,7 +469,7 @@ def schedEvents(self, coreevents: list[SynthEvent], sessionevents: list[csoundengine.event.Event] = None, whenfinished: Callable = None - ) -> csoundengine.offline.SchedEventGroup: + ) -> csoundengine.schedevent.SchedEventGroup: """ Schedule multiple events as returned by :meth:`MObj.events() ` @@ -474,7 +495,7 @@ def schedEvents(self, scoreEvents = [self.schedEvent(ev) for ev in coreevents] if sessionevents: scoreEvents.extend(self._schedSessionEvent(ev) for ev in sessionevents) - return csoundengine.offline.SchedEventGroup(scoreEvents) + return csoundengine.schedevent.SchedEventGroup(scoreEvents) def definedInstrs(self) -> dict[str, csoundengine.instr.Instr]: """ @@ -505,7 +526,7 @@ def playSample(self, skip=0., fade: float | tuple[float, float] | None = None, crossfade=0.02, - ) -> csoundengine.offline.SchedEvent: + ) -> csoundengine.schedevent.SchedEvent: """ Play a sample through this renderer @@ -540,7 +561,7 @@ def sched(self, args: list[float] | dict[str, float] = None, whenfinished=None, relative=True, - **kws) -> csoundengine.offline.SchedEvent: + **kws) -> csoundengine.schedevent.SchedEvent: """ Schedule a csound event @@ -778,11 +799,6 @@ def __enter__(self): if session: session.setHandler(_OfflineSessionHandler(self)) - # if playback.isSessionActive(): - # session = self.getSession() - # if not session: - # self._session = session = playback.playSession() - # self._oldSessionSchedCallback = session.setSchedCallback(self._schedSessionEvent) return self def __exit__(self, exc_type, exc_value, traceback): @@ -883,8 +899,6 @@ def render(outfile='', run=True, endtime=0., show=False, - tempfolder='', - fmt='wav', **kws ) -> OfflineRenderer: """ @@ -919,6 +933,7 @@ def render(outfile='', endtime: if given, sets the end time of the rendered segment. A value of 0. indicates to render everything. A value is needed if there are endless events + show: display the resulting OfflineRenderer when running inside jupyter workspace: if given, this workspace overrides the active workspace Returns: diff --git a/maelzel/core/playback.py b/maelzel/core/playback.py index c3dbed5..2062382 100644 --- a/maelzel/core/playback.py +++ b/maelzel/core/playback.py @@ -26,6 +26,8 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: import csoundengine.baseschedevent + import csoundengine.tableproxy + import csoundengine.instr from typing import Sequence, Callable from .mobj import MObj from maelzel.snd import audiosample @@ -349,7 +351,7 @@ def _playEngine(numchannels: int = None, """ config = Workspace.getActive().config engineName = config['play.engineName'] - if engine := csoundengine.getEngine(engineName): + if engine := csoundengine.Engine.activeEngines.get(engineName): if any(_ is not None for _ in (numchannels, backend, outdev, verbose, buffersize, latency)): prettylog('WARNING', "\nThe sound engine has been started already. Any configuration passed " @@ -439,7 +441,7 @@ def isSessionActive() -> bool: Returns True if the sound engine is active """ name = getConfig()['play.engineName'] - return csoundengine.getEngine(name) is not None + return name in csoundengine.Engine.activeEngines def _dummySynth(dur=0.001, engine: csoundengine.Engine = None) -> csoundengine.synth.Synth: @@ -708,7 +710,7 @@ def __init__(self, self.session: csoundengine.session.Session = playSession() """The corresponding Session, can be used to access the session during the context""" - self.engine: csoundengine.Engine = self.session.engine + self.engine: csoundengine.engine.Engine = self.session.engine """The play engine, can be used during the context""" self.synthgroup: csoundengine.synth.SynthGroup | None = None @@ -895,17 +897,17 @@ def schedEvents(self, def registerPreset(self, presetdef: PresetDef) -> bool: return self.session.registerInstr(presetdef.getInstr()) - def _automate(self, - token: int, - param: str, - pairs: Sequence[float] | np.ndarray, - delay=0., - overtake=False): - future = self._tokenToFuture.get(token) - if not future: - raise ValueError(f"Invalid token {token}. Known tokens: {self._tokenToFuture}") - event = SynthAutomation(token=token, param=param, data=pairs, delay=delay, overtake=overtake) - self._automationEvents.append(event) + # def _automate(self, + # token: int, + # param: str, + # pairs: Sequence[float] | np.ndarray, + # delay=0., + # overtake=False): + # future = self._tokenToFuture.get(token) + # if not future: + # raise ValueError(f"Invalid token {token}. Known tokens: {self._tokenToFuture}") + # event = SynthAutomation(token=token, param=param, data=pairs, delay=delay, overtake=overtake) + # self._automationEvents.append(event) def _presetFromToken(self, token: int) -> PresetDef | None: future = self._tokenToFuture.get(token) @@ -918,15 +920,15 @@ def _presetFromToken(self, token: int) -> PresetDef | None: presetdef = self.presetManager.getPreset(event.instr) return presetdef - def _set(self, token: int, param: str, value: float, delay: float): - presetdef = self._presetFromToken(token) - if presetdef is None: - raise RuntimeError(f"Unknown token {token}") - params = presetdef.dynamicParams(aliases=True, aliased=True) - if param not in params: - raise KeyError(f"Parameter {param} not known. Possible parameters: {params}") - event = SynthAutomation(token=token, param=param, data=[0, value], delay=delay) - self._automationEvents.append(event) + # def _set(self, token: int, param: str, value: float, delay: float): + # presetdef = self._presetFromToken(token) + # if presetdef is None: + # raise RuntimeError(f"Unknown token {token}") + # params = presetdef.dynamicParams(aliases=True, aliased=True) + # if param not in params: + # raise KeyError(f"Parameter {param} not known. Possible parameters: {params}") + # event = SynthAutomation(token=token, param=param, data=[0, value], delay=delay) + # self._automationEvents.append(event) def getSynth(self, token: int) -> csoundengine.synth.Synth | None: """ @@ -943,34 +945,6 @@ def getSynth(self, token: int) -> csoundengine.synth.Synth | None: raise RuntimeError("Synths are only accessible after render") return self._tokenToSynth.get(token) - # def _scheduleAutomations(self, synths: list[csoundengine.synth.Synth] - # ) -> None: - # """ - # This is called as callback with the synths generated from the future events - # - # There should be a 1:1 correspondence between the scheduled core events, - # the tokens and the synths. - # - # """ - # for event in self._automationEvents: - # if event.token is None: - # logger.error(f"Automation event {event} has no valid token (token is None)") - # continue - # if event.token < 0 or event.token >= len(synths): - # logger.error(f"Token out of range in automation event {event}, " - # f"token={event.token}, number of synths: {len(synths)}, " - # f"synths: {synths}, tokens: {self._tokenToSynth}") - # continue - # synth = synths[event.token] - # if isinstance(event.data, float): - # synth.set(param=event.param, value=event.data, delay=event.delay) - # elif len(event.data) == 2: - # t, v = event.data - # delay = event.delay + t - # synth.set(param=event.param, value=v, delay=delay) - # else: - # synth.automate(param=event.param, pairs=event.data, delay=event.delay) - def __enter__(self): """ Performs initialization of the context @@ -1047,7 +1021,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.show() def playSample(self, - source: int | str | TableProxy | tuple[np.ndarray, int] | audiosample.Sample, + source: int | str | csoundengine.tableproxy.TableProxy | tuple[np.ndarray, int] | audiosample.Sample, delay=0., dur=-1, chan=1, @@ -1080,6 +1054,7 @@ def playSample(self, """ if isinstance(source, audiosample.Sample): source = (source.samples, source.sr) + # TODO: make a FutureSynth event instead return self.session.playSample(source=source, delay=delay, dur=dur, chan=chan, gain=gain, speed=speed, loop=loop, pan=pos, skip=skip, fade=fade, crossfade=crossfade) @@ -1259,7 +1234,10 @@ def instr(self) -> csoundengine.instr.Instr: if isinstance(self.event, SynthEvent): return self.event.getPreset().getInstr() else: - return self.session.getInstr(self.event.instrname) + instr = self.session.getInstr(self.event.instrname) + if instr is None: + raise RuntimeError(f"Could not find this event's instr. {self=}") + return instr def set(self, param='', value: float = 0., delay=0., **kws) -> None: """ @@ -1302,7 +1280,6 @@ def automate(self, else: # A Session event self.event.automate(param=param, pairs=pairs, delay=delay, interpolation=mode, overtake=overtake) - # self.parent._automate(token=self.token, param=param, pairs=pairs, delay=delay, overtake=overtake) def stop(self, delay=0.) -> None: """ Stop this synth """ @@ -1370,6 +1347,7 @@ def automate(self, raise ValueError(f"Parameter '{param}' not known by any synth in this " f"group. Possible parameters: {possibleparams}.\n " f"Synths: {self.synths}") + return 0. def _setPfield(self, param: str, value: float, delay=0.) -> None: for synth in self.synths: diff --git a/setup.py b/setup.py index e4aefbc..ffa3790 100644 --- a/setup.py +++ b/setup.py @@ -68,7 +68,7 @@ def package_files(directory): "configdict>=2.10.0", "sndfileio>=1.9.1", "numpyx>=1.3.1", - "csoundengine>=2.10.0", + "csoundengine>=2.10.1", "pitchtools>=1.14.0", "risset>=2.8.0", "loristrck>=1.6.1",