Skip to content
This repository has been archived by the owner on Sep 26, 2024. It is now read-only.

Windowing

Riccardo Tommasini edited this page Aug 29, 2018 · 4 revisions

Window

A window is an interval [o,c) where o is the opening time intant, and c is the closing time instant. In Yasper, a window can be implemented using the Window Interface.

public interface Window {
    //opening
    long getO();
    
    //closing
    long getC();
}

Window Operator - cfr. operators

A window operator is a declarative specification that provides the necessary information for defining windows. In RSP-QL, a window operator is defined by the triple (a,b,t0), where a=window width, b=sliding parameter, t0 is the initial window point.

public interface WindowOperator<E,T> {

    String iri();

    boolean named();

    WindowAssigner<E,T> apply(RegisteredStream<E> s);
}

In Yasper, the WindowOperator<E,T> interface can be used to collect this declarative definition. It is worth to note that WindowOperator is more generic than RSP-QL one. The two parameters E and T`` characterize the related window assigner, which decouples the type of the data received from the stream to the type of the data maintained internally as Content. Indeed, the specification remains internal to the operator definition, and its application to a stream is captured in the homonyms method, where a WindowAssigner ``` is instantiated.

NB: the WindowOperator's apply method requires a RegisteredStream<E> , more details about it can be found here.

Window Assigner

The WindowAssigner<E,T> is the component that implements the windowing logic evaluating the four functional definition presented in the SECRET MODEL as well as in RSP-QL for RDF Streams.

public interface WindowAssigner<E, T> {

    Report report();

    Tick tick();

    Time time();

//retrieve the active content at t
    Content<T> getContent(long t);
//retrieve the list of all the contents active until t (included) 
    List<Content<T>> getContents(long t);

    TimeVarying<T> set(ContinuousQueryExecution content);

    void notify(E arg, long ts);

    String iri();

    boolean named();

    Content<T> compute(long t_e, Window w);

}

The WindowAssinger has two parameters:

  • E corresponds to the data type of the stream items pushed into through the notify method
  • T corresponds to the data type of the content maintained by the WindowAssigner internally.

Indeed, the WindowAssigner receives the data from the streams, assigned each element to one or more windows, evaluating the scope w.r.t. the Tick, checks whether is necessary to report w.r.t the Report implementation and, if necessary, signals the need to for the Content evaluation.

However, the stream types can be different from the type at which it is possible to handle the content.

For instance

In Yasper, and abstract WindowAssigner that extends java.util.Observable is provided. This class, as shown below, implements a method to evaluate the tick and offers two methods to be furtherly implemented.

  • windowing which implements the actual windowing algorithm;
  • compute which calculates the Content.

Notably, WindowAssigner is a parametric interface. It is up the developer to specify the type of the data that flows within the assigner, which is generically data-type agnostic (RDF Graphs, Triples, JSON Documents or Tuples).

public abstract class ObservableWindowAssigner<E> extends Observable implements WindowAssigner<E> {

    protected Tick tick;
    protected Report report;
    protected final Time time;
    protected final IRI iri;

    @Override
    public void report_grain(ReportGrain aw) {
        this.aw = aw;
    }

    @Override
    public void notify(E arg, long ts) {
        windowing(arg, ts);
    }

    protected Content<E> setVisible(long t_e, Window w, Content<E> c) {
        log.debug("Report [" + w.getO() + "," + w.getC() + ") with Content " + c + "");
        setChanged();
        notifyObservers(t_e);
        return c;
    }

    protected abstract Content<E> compute(long t_e, Window w);

    protected abstract void windowing(E arg, long ts);

}

A full example implementing WindowOperator and WindowAssigner for the C-SPARQL can be found here.

Clone this wiki locally