Skip to content

Commit

Permalink
Add executionStore (#827)
Browse files Browse the repository at this point in the history
* Extract execution store

* Fix executing nodes highlight

* nit

* nit

* nit
  • Loading branch information
huchenlei authored Sep 14, 2024
1 parent fef780a commit f983f42
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 100 deletions.
9 changes: 9 additions & 0 deletions src/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { useQueuePendingTaskCountStore } from './stores/queueStore'
import type { ToastMessageOptions } from 'primevue/toast'
import { useToast } from 'primevue/usetoast'
import { i18n } from './i18n'
import { useExecutionStore } from './stores/executionStore'
const isLoading = computed<boolean>(() => useWorkspaceStore().spinner)
const theme = computed<string>(() =>
Expand Down Expand Up @@ -123,10 +124,17 @@ const onReconnected = () => {
})
}
const executionStore = useExecutionStore()
app.workflowManager.executionStore = executionStore
watchEffect(() => {
app.menu.workflows.buttonProgress.style.width = `${executionStore.executionProgress}%`
})
onMounted(() => {
api.addEventListener('status', onStatus)
api.addEventListener('reconnecting', onReconnecting)
api.addEventListener('reconnected', onReconnected)
executionStore.bindExecutionEvents()
try {
init()
} catch (e) {
Expand All @@ -138,6 +146,7 @@ onUnmounted(() => {
api.removeEventListener('status', onStatus)
api.removeEventListener('reconnecting', onReconnecting)
api.removeEventListener('reconnected', onReconnected)
executionStore.unbindExecutionEvents()
})
</script>

Expand Down
18 changes: 0 additions & 18 deletions src/scripts/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1591,36 +1591,18 @@ export class ComfyApp {
)

api.addEventListener('progress', ({ detail }) => {
if (
this.workflowManager.activePrompt?.workflow &&
this.workflowManager.activePrompt.workflow !==
this.workflowManager.activeWorkflow
)
return
this.progress = detail
this.graph.setDirtyCanvas(true, false)
})

api.addEventListener('executing', ({ detail }) => {
if (
this.workflowManager.activePrompt?.workflow &&
this.workflowManager.activePrompt.workflow !==
this.workflowManager.activeWorkflow
)
return
this.progress = null
this.runningNodeId = detail
this.graph.setDirtyCanvas(true, false)
delete this.nodePreviewImages[this.runningNodeId]
})

api.addEventListener('executed', ({ detail }) => {
if (
this.workflowManager.activePrompt?.workflow &&
this.workflowManager.activePrompt.workflow !==
this.workflowManager.activeWorkflow
)
return
const output = this.nodeOutputs[detail.display_node || detail.node]
if (detail.merge && output) {
for (const k in detail.output ?? {}) {
Expand Down
3 changes: 2 additions & 1 deletion src/scripts/changeTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ export class ChangeTracker {

// Store node outputs
api.addEventListener('executed', ({ detail }) => {
const prompt = app.workflowManager.queuedPrompts[detail.prompt_id]
const prompt =
app.workflowManager.executionStore.queuedPrompts[detail.prompt_id]
if (!prompt?.workflow) return
const nodeOutputs = (prompt.workflow.changeTracker.nodeOutputs ??= {})
const output = nodeOutputs[detail.node]
Expand Down
20 changes: 0 additions & 20 deletions src/scripts/ui/menu/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,6 @@ export class ComfyWorkflowsMenu {
})
}

#updateProgress = () => {
const prompt = this.app.workflowManager.activePrompt
let percent = 0
if (this.app.workflowManager.activeWorkflow === prompt?.workflow) {
const total = Object.values(prompt.nodes)
const done = total.filter(Boolean)
percent = (done.length / total.length) * 100
}
this.buttonProgress.style.width = percent + '%'
}

#updateActive = () => {
const active = this.app.workflowManager.activeWorkflow
this.button.tooltip = active.path
Expand All @@ -93,8 +82,6 @@ export class ComfyWorkflowsMenu {
this.#first = false
this.content.load()
}

this.#updateProgress()
}

#bindEvents() {
Expand All @@ -109,10 +96,6 @@ export class ComfyWorkflowsMenu {
this.unsaved = this.app.workflowManager.activeWorkflow.unsaved
})

this.app.workflowManager.addEventListener('execute', (e) => {
this.#updateProgress()
})

api.addEventListener('graphChanged', () => {
this.unsaved = true
})
Expand Down Expand Up @@ -371,9 +354,6 @@ export class ComfyWorkflowsContent {
app.workflowManager.addEventListener(e, () => this.updateOpen())
}
this.app.workflowManager.addEventListener('rename', () => this.load())
this.app.workflowManager.addEventListener('execute', (e) =>
this.#updateActive()
)
}

async load() {
Expand Down
70 changes: 9 additions & 61 deletions src/scripts/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,80 +17,32 @@ export function trimJsonExt(path: string) {
}

export class ComfyWorkflowManager extends EventTarget {
#activePromptId: string | null = null
executionStore: any = null

#unsavedCount = 0
#activeWorkflow: ComfyWorkflow

workflowLookup: Record<string, ComfyWorkflow> = {}
workflows: Array<ComfyWorkflow> = []
openWorkflows: Array<ComfyWorkflow> = []
queuedPrompts: Record<
string,
{ workflow?: ComfyWorkflow; nodes?: Record<string, boolean> }
> = {}
app: ComfyApp

get activeWorkflow() {
return this.#activeWorkflow ?? this.openWorkflows[0]
}

get activePromptId() {
return this.#activePromptId
return this.executionStore?.activePromptId
}

get activePrompt() {
return this.queuedPrompts[this.#activePromptId]
return this.executionStore?.activePrompt
}

constructor(app: ComfyApp) {
super()
this.app = app
ChangeTracker.init(app)

this.#bindExecutionEvents()
}

#bindExecutionEvents() {
// TODO: on reload, set active prompt based on the latest ws message

const emit = () =>
this.dispatchEvent(
new CustomEvent('execute', { detail: this.activePrompt })
)
let executing = null
api.addEventListener('execution_start', (e) => {
this.#activePromptId = e.detail.prompt_id

// This event can fire before the event is stored, so put a placeholder
this.queuedPrompts[this.#activePromptId] ??= { nodes: {} }
emit()
})
api.addEventListener('execution_cached', (e) => {
if (!this.activePrompt) return
for (const n of e.detail.nodes) {
this.activePrompt.nodes[n] = true
}
emit()
})
api.addEventListener('executed', (e) => {
if (!this.activePrompt) return
this.activePrompt.nodes[e.detail.node] = true
emit()
})
api.addEventListener('executing', (e) => {
if (!this.activePrompt) return

if (executing) {
// Seems sometimes nodes that are cached fire executing but not executed
this.activePrompt.nodes[executing] = true
}
executing = e.detail
if (!executing) {
delete this.queuedPrompts[this.#activePromptId]
this.#activePromptId = null
}
emit()
})
}

async loadWorkflows() {
Expand Down Expand Up @@ -174,15 +126,11 @@ export class ComfyWorkflowManager extends EventTarget {
}

storePrompt({ nodes, id }) {
this.queuedPrompts[id] ??= {}
this.queuedPrompts[id].nodes = {
...nodes.reduce((p, n) => {
p[n] = false
return p
}, {}),
...this.queuedPrompts[id].nodes
}
this.queuedPrompts[id].workflow = this.activeWorkflow
this.executionStore?.storePrompt({
nodes,
id,
workflow: this.activeWorkflow
})
}

/**
Expand Down
116 changes: 116 additions & 0 deletions src/stores/executionStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { ref, computed } from 'vue'
import { defineStore } from 'pinia'
import { api } from '../scripts/api'

export interface QueuedPrompt {
nodes: Record<string, boolean>
workflow?: any // TODO: Replace 'any' with the actual type of workflow
}

export const useExecutionStore = defineStore('execution', () => {
const activePromptId = ref<string | null>(null)
const queuedPrompts = ref<Record<string, QueuedPrompt>>({})
const executing = ref<string | null>(null)

const activePrompt = computed(() => queuedPrompts.value[activePromptId.value])

const totalNodesToExecute = computed(() => {
if (!activePrompt.value) return 0
return Object.values(activePrompt.value.nodes).length
})

const nodesExecuted = computed(() => {
if (!activePrompt.value) return 0
return Object.values(activePrompt.value.nodes).filter(Boolean).length
})

const executionProgress = computed(() => {
if (!activePrompt.value) return 0
const total = totalNodesToExecute.value
const done = nodesExecuted.value
return Math.round((done / total) * 100)
})

function bindExecutionEvents() {
api.addEventListener('execution_start', handleExecutionStart)
api.addEventListener('execution_cached', handleExecutionCached)
api.addEventListener('executed', handleExecuted)
api.addEventListener('executing', handleExecuting)
}

function unbindExecutionEvents() {
api.removeEventListener('execution_start', handleExecutionStart)
api.removeEventListener('execution_cached', handleExecutionCached)
api.removeEventListener('executed', handleExecuted)
api.removeEventListener('executing', handleExecuting)
}

function handleExecutionStart(e: CustomEvent) {
activePromptId.value = e.detail.prompt_id
queuedPrompts.value[activePromptId.value] ??= { nodes: {} }
}

function handleExecutionCached(e: CustomEvent) {
if (!activePrompt.value) return
for (const n of e.detail.nodes) {
activePrompt.value.nodes[n] = true
}
}

function handleExecuted(e: CustomEvent) {
if (!activePrompt.value) return
activePrompt.value.nodes[e.detail.node] = true
}

function handleExecuting(e: CustomEvent) {
if (!activePrompt.value) return

if (executing.value) {
// Seems sometimes nodes that are cached fire executing but not executed
activePrompt.value.nodes[executing.value] = true
}
executing.value = e.detail
if (!executing.value) {
delete queuedPrompts.value[activePromptId.value]
activePromptId.value = null
}
}

function storePrompt({
nodes,
id,
workflow
}: {
nodes: string[]
id: string
workflow: any
}) {
queuedPrompts.value[id] ??= { nodes: {} }
const queuedPrompt = queuedPrompts.value[id]
queuedPrompt.nodes = {
...nodes.reduce((p, n) => {
p[n] = false
return p
}, {}),
...queuedPrompt.nodes
}
queuedPrompt.workflow = workflow

console.debug(
`queued task ${id} with ${Object.values(queuedPrompt.nodes).length} nodes`
)
}

return {
activePromptId,
queuedPrompts,
executing,
activePrompt,
totalNodesToExecute,
nodesExecuted,
executionProgress,
bindExecutionEvents,
unbindExecutionEvents,
storePrompt
}
})

0 comments on commit f983f42

Please sign in to comment.