Skip to content

Commit

Permalink
Make Python code work with Pulsar Function localrun (#1930)
Browse files Browse the repository at this point in the history
  • Loading branch information
massakam authored and merlimat committed Jun 8, 2018
1 parent 7b0703d commit fd47532
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def main():
try:
source_topics_serde_classname_dict = json.loads(args.source_topics_serde_classname)
except ValueError:
log.critical("Cannot decode source_topics_serde_classname. This argument must be specifed as a JSON")
Log.critical("Cannot decode source_topics_serde_classname. This argument must be specifed as a JSON")
sys.exit(1)
if not source_topics_serde_classname_dict:
log.critical("source_topics_serde_classname cannot be empty")
Log.critical("source_topics_serde_classname cannot be empty")
for topics, serde_classname in source_topics_serde_classname_dict.items():
sourceSpec.topicsToSerDeClassName[topics] = serde_classname
function_details.source.MergeFrom(sourceSpec)
Expand Down
5 changes: 3 additions & 2 deletions pulsar-functions/instance/src/main/python/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
import log

Log = log.Log
PULSARFUNCTIONAPIROOT = 'functions'
PULSAR_API_ROOT = 'pulsar'
PULSAR_FUNCTIONS_API_ROOT = 'functions'

def import_class(from_path, full_class_name):
kclass = import_class_from_path(from_path, full_class_name)
if kclass is None:
our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
api_dir = os.path.join(our_dir, PULSARFUNCTIONAPIROOT)
api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT)
kclass = import_class_from_path(api_dir, full_class_name)
return kclass

Expand Down

0 comments on commit fd47532

Please sign in to comment.