Skip to content

Commit

Permalink
Integrating zeromq
Browse files Browse the repository at this point in the history
  • Loading branch information
luke-marshall committed Feb 20, 2019
1 parent 1d00a57 commit cd4f40d
Show file tree
Hide file tree
Showing 18 changed files with 619 additions and 251 deletions.
Binary file modified .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
marketsim/io/examples
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.pythonPath": "/Users/lukemarshall/.local/share/virtualenvs/deeplearning-ZwAlzCIh/bin/python"
}
6 changes: 2 additions & 4 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ name = "pypi"

[packages]
keras-rl = "*"
gym = {version = "*", extras = ["atari"]}
gym = {version = "*",extras = ["atari"]}
"h5py" = "*"
pillow = "*"
tensorflow = "==1.10.0"
keras = "*"
pyzmq = "*"
colored = "*"

[dev-packages]

Expand Down
314 changes: 72 additions & 242 deletions Pipfile.lock

Large diffs are not rendered by default.

151 changes: 151 additions & 0 deletions electricity_market.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# An events-based electricity market simulator.
import config
import generators

class Electricity_Market():
# Called when a new electricity market object is created.
def __init__(self, dispatch_callback):
# Keep track of the current time period.
self.time_period = 0
# Store the number of minutes per time period
self.period_length_mins = 30
# generators is a list of generator objects.
self.generators = {g:generators.Coal_Generator(label=g, capacity_MW = config.generators[g]['capacity_MW'], ramp_rate_MW_per_min = config.generators[g]['ramp_rate_MW_per_min'], srmc = config.generators[g]['srmc'], lrmc = config.generators[g]['lrmc']) for g in config.generators}
# bids is a dictthat stores a bid value for each generator
self.bidstack = {g : None for g in config.generators}
# Function that is called when dispatch is ready
self.dispatch_callback = dispatch_callback
# Store the latest state
self.latest_state = None

print("SIM NEM initialised.")

# Adds a generator object to the list of generators.
def add_generator(self, generator):
# self.generators.append(generator)
# bids is a dictthat stores a bid value for each generator
# self.bidstack[generator.label] = None
print("SIM Generator added", generator.label)

# Add a bid for this time period.
def add_bid(self, gen_label, price, volume):
print("SIM Adding bid", gen_label, price, volume)
# Add the bid
self.bidstack[gen_label] = {'price':price, 'volume':volume}
# Check if all bids have been submitted (ie. its time for dispatch)
all_submitted = True
for g in self.bidstack:
# If we find that a generator hasn't yet submitted a bid set all_submitted to false.
if not self.bidstack[g]:
all_submitted = False
# If all bids are submitted
if all_submitted:
self.dispatch()

def MW_to_MWh(self, MW):
return MW * (float(self.period_length_mins / 60.0))

def MWh_to_MW(self, MWh):
return MWh * (60.0 / float(self.period_length_mins))

def dispatch(self):
print("SIM Dispatching")
demand = self.get_current_demand_MWh()
# Convert bidstack dict to a list of dicts containing names and dicts.
bids = [{'gen_label':gen_label, 'price': self.bidstack[gen_label]['price'], 'volume':self.bidstack[gen_label]['volume']} for gen_label in self.bidstack]
# Sort the list of bids from smallest to largest price
bids = sorted(bids, key=lambda bid: bid['price'])
# Perform economic dispatch
dispatch = {} #variable to store dispatch volume
unmet_demand_MWh = demand #variable to store remaining unment demand
price = -1000.0
for bid in bids: #iterate through each bid in the bidstack

amount_dispatch_requested = max(min(bid['volume'], unmet_demand_MWh),0) #calculate amount to requet from generator model
required_generator_power_MW = self.MWh_to_MW(amount_dispatch_requested) #Calculate required power.
amount_dispatched_MW = self.generators[bid['gen_label']].request_output_MW(required_generator_power_MW, self.period_length_mins) #Request that the gen dispatch at this power.
amount_dispatched_MWh = self.MW_to_MWh(amount_dispatched_MW)
dispatch[bid['gen_label']] = amount_dispatched_MWh #record the generator's dispatch
unmet_demand_MWh = max(unmet_demand_MWh - amount_dispatched_MWh, 0) #recalc unmet demand
price = bid['price']
print("bid", bid, "dispatched", amount_dispatched_MWh, "unmet", unmet_demand_MWh)
print("Finished Economic Dispatch")
# Price floor
price = max(0, price)
# Price ceiling
if price > 14200:
price = 0

done = False
if self.time_period % 250 == 0:
done = True

print("Gen states:", [self.generators[g].current_output_MW for g in self.generators])
self.latest_state = {
'price':price,
'unmet_demand_MWh':unmet_demand_MWh,
'demand':demand,
'next_demand':self.get_next_demand_MWh(),
'dispatch':dispatch,
'minimum_next_output_MWh':{label : g.get_minimum_next_output_MWh(self.period_length_mins) for label, g in self.generators.iteritems()}, #dict comprehension, see: https://www.datacamp.com/community/tutorials/python-dictionary-comprehension
'maximum_next_output_MWh':{label : g.get_maximum_next_output_MWh(self.period_length_mins) for label, g in self.generators.iteritems()},
'lrmc':{label : float(g.get_lrmc()) for label, g in self.generators.iteritems()},
'srmc':{label : float(g.get_srmc()) for label, g in self.generators.iteritems()},
'done':False,
'fresh_reset':False,
'bids':{b['gen_label']: b for b in bids},
}

# Reset the bidstack
self.bidstack = {g : None for g in config.generators}
# Step time forward by 1
self.time_period += 1
# Return the Dispatch object by passing it to the calback for emission via websockets.
print("SIM Calling Dispatch Callback")
self.dispatch_callback(self.latest_state)



def reset(self, reset_callback):


if not self.latest_state:
self.latest_state = {
'price':0,
'unmet_demand_MWh':0,
'demand':self.get_current_demand_MWh(),
'next_demand':self.get_next_demand_MWh(),
'dispatch': { label : 0 for label, g in self.generators.iteritems()},
'minimum_next_output_MWh':{label : g.get_minimum_next_output_MWh(self.period_length_mins) for label, g in self.generators.iteritems()}, #dict comprehension, see: https://www.datacamp.com/community/tutorials/python-dictionary-comprehension
'maximum_next_output_MWh':{label : g.get_maximum_next_output_MWh(self.period_length_mins) for label, g in self.generators.iteritems()},
'lrmc':{label : float(g.get_lrmc()) for label, g in self.generators.iteritems()},
'srmc':{label : float(g.get_srmc()) for label, g in self.generators.iteritems()},
'done':False,
'fresh_reset':True
}

# Call the callbacks to notify of a reset,
reset_callback(self.latest_state)
# Also send a dispatch callback to provide a response to the next decision.
# self.dispatch_callback(new_state)

def get_current_demand_MWh(self):
hour = float(self.time_period % 48) / 2
if hour < 7 or hour > 18:
return 80
elif hour > 12 and hour < 16:
return 120
else:
return 100

def get_next_demand_MWh(self):

next_period = self.time_period + 1
next_hour = float(next_period % 48) / 2
if next_hour < 7 or next_hour > 18:
return 80
elif next_hour > 12 and next_hour < 16:
return 120
else:
return 100

Binary file added marketsim/.DS_Store
Binary file not shown.
Binary file added marketsim/io/.DS_Store
Binary file not shown.
58 changes: 58 additions & 0 deletions marketsim/io/clients/asyncclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Adapted from the ZMQ example code by Felipe Cruz <felipecruz@loogica.net> - released originally under the MIT/X11 License

import zmq
import sys

from random import randint, random
import threading
from colored import fg, bg, attr, stylize

def tprint(msg, color=1):
"""like print, but won't get newlines confused with multiple threads"""
sys.stdout.write(stylize( msg + '\n', fg(color)))
sys.stdout.flush()

class ClientTask():
"""ClientTask"""
def __init__(self, id):
self.id = id

def run(self):
context = zmq.Context()
socket = context.socket(zmq.DEALER)
identity = u'worker-%d' % self.id
socket.identity = identity.encode('ascii')
socket.connect('tcp://localhost:5570')
tprint('Client %s started' % (identity),color=self.id+1)
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
reqs = 0
while True:
reqs = reqs + 1
tprint('Req #%d sent..' % (reqs),color=self.id+1)
socket.send_string(u'request #%d' % (reqs))
for i in range(5):
tprint("Polling Attempt: "+str(i), color=self.id+1)
sockets = dict(poll.poll(1000))
if socket in sockets:
msg = socket.recv()
tprint('Client %s received: %s' % (identity, msg),color=self.id+1)

socket.close()
context.term()

def main():
"""main function"""

for i in range(10):
client = ClientTask(i)
client.start()


if __name__ == "__main__":
# Runs a multithreaded test set of clients. Not suitable for tensorflow - testing only.
for i in range(3):
client = ClientTask(i)
t = threading.Thread(target=client.run)
t.start()

3 changes: 2 additions & 1 deletion marketsim/io/clients/hello.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# Do 10 requests, waiting each time for a response
for request in range(10):
print("Sending request %s …" % request)

data = {
'id':'Nyngan',
'bids':[50,50,50,50,50,50,50,50,50,50],
Expand All @@ -29,4 +30,4 @@

# Get the reply.
message = socket.recv()
print("Received reply %s [ %s ]" % (request, message))
print("Received reply %s: %s " % (request, message))
32 changes: 32 additions & 0 deletions marketsim/io/clients/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@


#
# Hello World client in Python
# Connects REQ socket to tcp://localhost:5555
# Sends "Hello" to server, expects "World" back
#

import zmq
import json

context = zmq.Context()

# Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

# Do 10 requests, waiting each time for a response
for request in range(10):
print("Sending request %s …" % request)
data = {
'id':'Holsworthy',
'bids':[50,50,50,50,50,50,50,50,50,50],
}
data_str = json.dumps(data)
socket.send_string(data_str)
# socket.send(b"Hello")

# Get the reply.
message = socket.recv()
print("Received reply %s: %s " % (request, message))
38 changes: 38 additions & 0 deletions marketsim/io/clients/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@


# Task worker
# Connects PULL socket to tcp://localhost:5557
# Collects workloads from ventilator via that socket
# Connects PUSH socket to tcp://localhost:5558
# Sends results to sink via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Process tasks forever
while True:
s = receiver.recv()
print(s)

# Simple progress indicator for the viewer
sys.stdout.write('.')
sys.stdout.flush()

# Do the work
time.sleep(int(s)*0.001)

# Send results to sink
sender.send(b'')
2 changes: 2 additions & 0 deletions marketsim/io/run_servers.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pipenv run python -m marketsim.io.clients.asyncclient &
pipenv run python -m marketsim.io.servers.asyncserver &
Loading

0 comments on commit cd4f40d

Please sign in to comment.