Source code for mesonic.processor

import logging
import warnings
from collections import defaultdict
from copy import copy
from typing import Callable, Dict, List, Optional

from mesonic.backend.bases import EventHandler
from mesonic.events import Event
from mesonic.timeline import TimeBundle

_LOGGER = logging.getLogger(__name__)


[docs] class BundleProcessor: """The BundleProcessor preprocesses the TimeBundles and sends them to multiple event_handlers. Parameters ---------- event_handlers : List[EventHandler] List of EventHandlers that should handle the TimeBundle contents. latency : float, optional the latency that will be added to the scheduled time, by default 0.2 """ def __init__(self, event_handlers: List[EventHandler], latency=0.2) -> None: super().__init__() self.latency = latency self.event_handlers = { handler.get_etype(): handler for handler in event_handlers } self.selected_tracks: Optional[List[int]] = None """Optional list of selected tracks. If this is not None all Event with a track not in selected_tracks won't be passed.""" self.event_filters: List[Callable[[Event], Optional[Event]]] = [] """Optional event info filter function. Should return None for filtering. If this is not None all Event will be filtered using this function."""
[docs] def add_handler(self, handler: EventHandler): """Add an EventHandler This will overwrite an old EventHandler if they handle the same Type. Parameters ---------- handler : EventHandler The new EventHandler. """ self.event_handlers[handler.get_etype()] = handler
[docs] def process_bundle( self, bundle: TimeBundle, scheduled_time: float, reversed: bool, **kwargs ) -> None: """Process the Bundle at the scheduled time according to reversed bool. Parameters ---------- bundle : TimeBundle TimeBundle to process. scheduled_time : float Timepoint for which this TimeBundle is scheduled. reversed : bool Wether this TimeBundle should be reversed. """ events = self.prepare_events(bundle.events, self.selected_tracks) splitted_events = self.split_events(events) # call the event handler for each type for etype, events_of_type in splitted_events.items(): if etype in self.event_handlers: self.event_handlers[etype].handle( time=scheduled_time + self.latency, events=events_of_type, reversed=reversed, **kwargs, )
[docs] def prepare_events( self, events: List[Event], selected_tracks: Optional[List[int]] = None, ) -> List[Event]: """Filter and transform Events using the provided filters. Note that the filters are applied before selecting the Tracks. This means that the function could be used to change the track of an Event prior to the selection of tracks. Parameters ---------- events : List[Event] A list of Events that should be prepared. filters : List[Callable[[Event], Optional[Event]]] An a list of filter functions that take an Event and return a transformed Event or None to discard the Event. selected_tracks : Optional[List[int]] List of selected tracks, default None If this is not None all Event with a track not in selected_tracks won't be passed. Returns ------- List[Event] The list of Events after the transformation. """ events = copy(events) def event_selection(event: Event) -> Optional[Event]: """Filter events with a combined function.""" bad_filters = [] for filter_fun in self.event_filters: try: filtered_event = filter_fun(event) except Exception as exception: bad_filters.append(filter_fun) warnings.warn( f"event_filter {filter_fun} raised exception of type {type(exception)} ({exception}), removing it" ) else: if filtered_event is None: return None event = filtered_event if bad_filters: self.event_filters = [ event_filter for event_filter in self.event_filters if event_filter not in bad_filters ] # select the tracks if selected_tracks is not None: return event if event and event.track in selected_tracks else None return event return list(filter(None, map(event_selection, events)))
[docs] @classmethod def split_events(cls, events: List[Event]) -> Dict[type, List[Event]]: """Split events by type Parameters ---------- events : List[Event] A list of Events. Returns ------- Dict[type, List[Event]] Dictionary with types as keys and belonging Events as value. """ splitted_events = defaultdict(list) for event in events: splitted_events[type(event)].append(event) return splitted_events