Skip to content
Dan Debrunner edited this page May 31, 2017 · 1 revision

Examples of Java source functions

Supplier implementations returning tuples.

Samples where each get() call returns a tuple to be submitted to a stream.

Simple lambda with capture

import java.util.Random;

import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.Topology;
import com.ibm.streamsx.topology.context.StreamsContext.Type;
import com.ibm.streamsx.topology.context.StreamsContextFactory;

public class Sources {

    public static void main(String[] args) throws Exception {
        
        Topology topo = new Topology();   
        
        // The lambda function maintains a reference to r which
        // is serializable, thus the Supplier can be serialized
        // and used within the Streams runtime environment.
         
        // Note that if a Streams application bundle was created
        // and submitted multiple times, each job would start with
        // a deserialized instance of r that contained the same
        // initial state, i.e. the seed. Thus each application
        // will produce the same sequence of numbers.
        
        Random r = new Random();       
        TStream<Double> doubles = topo.endlessSource(() -> r.nextDouble());        
        doubles.print();
        
        StreamsContextFactory.getStreamsContext(Type.BUNDLE).submit(topo).get();
    }
}

Broken source with non-serializable capture

This is an incorrect example where the Supplier as a lambda function attempts to capture a local variable that is not serializable. The subsequent examples show how such state might be setup at runtime.

Such non-serializable fields in a real Supplier might be an OutputStream or a socket etc.

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.concurrent.TimeUnit;

import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.Topology;
import com.ibm.streamsx.topology.context.StreamsContext.Type;
import com.ibm.streamsx.topology.context.StreamsContextFactory;

public class SourcesFail {

    public static void main(String[] args) throws Exception {
        
        Topology topo = new Topology();              
        MemoryMXBean mem = ManagementFactory.getMemoryMXBean();     
        
        // This lambda maintains a reference to mem but mem is not
        // serializable so the Supplier cannot be serialized for
        // execution in a Streams environment.
        // (which is natural as it's a object monitoring the JVM
        // declaring the topology, not the one that will run the topology).
        //
        // So this fails, to implement the required functionality
        // the MXBean must be obtained at runtime (see next examples).
        TStream<Long> memusage = topo.periodicSource(() -> mem.getHeapMemoryUsage().getUsed(),
                1, TimeUnit.SECONDS);        
        memusage.print();
        
        StreamsContextFactory.getStreamsContext(Type.STANDALONE).submit(topo).get();
    }
}

Source setting transient field with runtime check

Non-serializable state is maintained in a transient field that is set on first access.

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.concurrent.TimeUnit;

import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.Topology;
import com.ibm.streamsx.topology.context.StreamsContext.Type;
import com.ibm.streamsx.topology.context.StreamsContextFactory;
import com.ibm.streamsx.topology.function.Supplier;

public class SourcesMem {

    public static void main(String[] args) throws Exception {
        
        Topology topo = new Topology();              
        TStream<Long> memusage = topo.periodicSource(new UsedMem(), 1, TimeUnit.SECONDS);    
        memusage.print();
        
        StreamsContextFactory.getStreamsContext(Type.STANDALONE).submit(topo).get();
    }
    
    /**
     * Example of Supplier that sets a transient field based upon if it
     * is null when accessed.
     */
    public static class UsedMem implements Supplier<Long> {
        private static final long serialVersionUID = 1L;
        
        private transient MemoryMXBean mem;

        @Override
        public Long get() {
            if (mem == null)
                mem = ManagementFactory.getMemoryMXBean();  
            return mem.getHeapMemoryUsage().getUsed();
        }        
    }
}

Source setting transient field using Java serializable readObject

Non-serializable state is maintained in a transient field that is set using readObject when the object is deserialized.

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.concurrent.TimeUnit;

import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.Topology;
import com.ibm.streamsx.topology.context.StreamsContext.Type;
import com.ibm.streamsx.topology.context.StreamsContextFactory;
import com.ibm.streamsx.topology.function.Supplier;

public class SourcesMemReadObject {

    public static void main(String[] args) throws Exception {
        
        Topology topo = new Topology();              
        TStream<Long> memusage = topo.periodicSource(new UsedMemReadObject(), 1, TimeUnit.SECONDS);    
        memusage.print();
        
        StreamsContextFactory.getStreamsContext(Type.STANDALONE).submit(topo).get();
    }
    
    /**
     * Example of Supplier that creates a transient field using
     * the standard serialization technique: readObject.
     */
    public static class UsedMemReadObject implements Supplier<Long> {
        private static final long serialVersionUID = 1L;
        
        private transient MemoryMXBean mem;

        @Override
        public Long get() {                  
            return mem.getHeapMemoryUsage().getUsed();
        }
        
        /**
         * Standard Java serialization technique for setting
         * transient fields after deserialization.
         */
        private void readObject(java.io.ObjectInputStream in)
                throws IOException, ClassNotFoundException {
            
            in.defaultReadObject();
            
            mem = ManagementFactory.getMemoryMXBean();
        }   
    }
}

Source setting transient field using Initializable

Non-serializable state is maintained in a transient field that is set using initialize method. This has the benefit of being able to set state based upon the environment the function is executing in.

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.concurrent.TimeUnit;

import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.Topology;
import com.ibm.streamsx.topology.context.StreamsContext.Type;
import com.ibm.streamsx.topology.context.StreamsContextFactory;
import com.ibm.streamsx.topology.function.FunctionContext;
import com.ibm.streamsx.topology.function.Initializable;
import com.ibm.streamsx.topology.function.Supplier;

public class SourcesMemInitializable {

    public static void main(String[] args) throws Exception {
        
        Topology topo = new Topology();              
        TStream<Long> memusage = topo.periodicSource(new UsedMemInitializable(), 1, TimeUnit.SECONDS);    
        memusage.print();
        
        StreamsContextFactory.getStreamsContext(Type.STANDALONE).submit(topo).get();
    }
    
    /**
     * Example of Supplier that creates a transient field using
     * the standard serialization technique: readObject.
     */
    public static class UsedMemInitializable implements Supplier<Long>, Initializable {
        private static final long serialVersionUID = 1L;
        
        private transient MemoryMXBean mem;

        @Override
        public Long get() {                  
            return mem.getHeapMemoryUsage().getUsed();
        }
        
        /**
         * Set a transient field using a initialize function.
         * 
         * This allows setting transient fields based upon the
         * context the function is running in.
         */
        @Override
        public void initialize(FunctionContext functionContext) throws Exception {
             mem = ManagementFactory.getMemoryMXBean();
        }   
    }
}