Skip to content

Commit

Permalink
fix: removed event listeners and move reload to CRMD
Browse files Browse the repository at this point in the history
  • Loading branch information
Arun-KumarH committed Jul 4, 2024
1 parent 010f5f8 commit 40a1727
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 111 deletions.
19 changes: 2 additions & 17 deletions src/accessControlService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import _ from 'lodash-es';
import { Server } from '@restorecommerce/chassis-srv';
import { Events } from '@restorecommerce/kafka-client';
import { CommandInterface } from '@restorecommerce/chassis-srv';
import { ResourceManager } from './resourceManager.js';
import { ResourceManager, PolicySetService } from './resourceManager.js';
import { RedisClientType } from 'redis';
import { AccessController } from './core/accessController.js';
import { loadPoliciesFromDoc } from './core/utils.js';
Expand Down Expand Up @@ -35,22 +35,7 @@ export class AccessControlService implements AccessControlServiceImplementation
}
async loadPolicies(): Promise<void> {
this.logger.info('Loading policies');

const policiesCfg = this.cfg.get('policies');
const loadType = policiesCfg?.type;
switch (loadType) {
case 'local':
const path: string = policiesCfg?.path;
this.accessController = await loadPoliciesFromDoc(this.accessController, path);
this.logger.silly('Policies from local files loaded');
break;
case 'database':
const policySetService = this.resourceManager.getResourceService('policy_set');
const policySets: Map<string, PolicySetWithCombinables> = await policySetService.load() || new Map();
this.accessController.policySets = policySets;
this.logger.silly('Policies from database loaded');
break;
}
this.accessController.loadPolicies(this.resourceManager);
}

clearPolicies(): void {
Expand Down
21 changes: 20 additions & 1 deletion src/core/accessController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import { Logger } from 'winston';
import { createClient, RedisClientType } from 'redis';
import { Topic } from '@restorecommerce/kafka-client';
import { verifyACLList } from './verifyACL.js';
import { conditionMatches } from './utils.js';
import { conditionMatches, loadPoliciesFromDoc } from './utils.js';
import { PolicySetService, ResourceManager } from '../resourceManager.js';

export class AccessController {
policySets: Map<string, PolicySetWithCombinables>;
Expand Down Expand Up @@ -75,6 +76,24 @@ export class AccessController {
this.userService = userService;
}

async loadPolicies(resourceManager: ResourceManager): Promise<void> {
const policiesCfg = this.cfg.get('policies');
const loadType = policiesCfg?.type;
switch (loadType) {
case 'local':
const path: string = policiesCfg?.path;
await loadPoliciesFromDoc(this, path);
this.logger.silly('Policies from local files loaded');
break;
case 'database':
const policySetService: PolicySetService = resourceManager.getResourceService('policy_set');
const policySets: Map<string, PolicySetWithCombinables> = await policySetService.load() || new Map();
this.policySets = policySets;
this.logger.silly('Policies from database loaded');
break;
}
}

clearPolicies(): void {
this.policySets.clear();
}
Expand Down
131 changes: 52 additions & 79 deletions src/resourceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ export class RuleService extends ServiceBase<RuleListResponse, RuleList> impleme
return this.getRules();
}

// async reloadRules(result: DeepPartial<RuleListResponse>): Promise<void> {
// const policySets = _.cloneDeep(_accessController.policySets);
// if (result?.items?.length > 0) {
// for (let item of result.items) {
// const rule: Rule = marshallResource(item?.payload, 'rule');
// for (let [, policySet] of policySets) {
// for (let [, policy] of (policySet).combinables) {
// if (!_.isNil(policy) && policy.combinables.has(rule.id)) {
// _accessController.updateRule(policySet.id, policy.id, rule);
// }
// }
// }
// }
// }
// }

async getRules(ruleIDs?: string[]): Promise<Map<string, Rule>> {
const filters = ruleIDs ? makeFilter(ruleIDs) : {};
const result = await super.read(ReadRequest.fromPartial({ filters }), {});
Expand Down Expand Up @@ -141,20 +157,8 @@ export class RuleService extends ServiceBase<RuleListResponse, RuleList> impleme

async superUpsert(request: RuleList, ctx: any): Promise<DeepPartial<RuleListResponse>> {
const result = await super.upsert(request, ctx);
const policySets = _.cloneDeep(_accessController.policySets);

if (result?.items?.length > 0) {
for (let item of result.items) {
const rule: Rule = marshallResource(item?.payload, 'rule');
for (let [, policySet] of policySets) {
for (let [, policy] of (policySet).combinables) {
if (!_.isNil(policy) && policy.combinables.has(rule.id)) {
_accessController.updateRule(policySet.id, policy.id, rule);
}
}
}
}
}
// const policySets: Map<string, PolicySetWithCombinables> = await policySetService.load() || new Map();
// this.policySets = policySets;
return result;
}

Expand Down Expand Up @@ -185,20 +189,7 @@ export class RuleService extends ServiceBase<RuleListResponse, RuleList> impleme
return { operation_status: acsResponse.operation_status };
}
const result = await super.create(request, ctx);
const policySets = _.cloneDeep(_accessController.policySets);

if (result?.items?.length > 0) {
for (let item of result.items) {
const rule: Rule = marshallResource(item?.payload, 'rule');
for (let [, policySet] of policySets) {
for (let [, policy] of (policySet).combinables) {
if (!_.isNil(policy) && policy.combinables.has(rule.id)) {
_accessController.updateRule(policySet.id, policy.id, rule);
}
}
}
}
}
await this.reloadRules(result);
return result;
}

Expand Down Expand Up @@ -257,6 +248,7 @@ export class RuleService extends ServiceBase<RuleListResponse, RuleList> impleme
return { operation_status: acsResponse.operation_status };
}
const result = await super.update(request, ctx);
await this.reloadRules(result);
return result;
}

Expand Down Expand Up @@ -285,7 +277,7 @@ export class RuleService extends ServiceBase<RuleListResponse, RuleList> impleme
if (acsResponse.decision != Response_Decision.PERMIT) {
return { operation_status: acsResponse.operation_status };
}
const result = await super.upsert(request, ctx);
const result = await this.superUpsert(request, ctx);
return result;
}

Expand Down Expand Up @@ -390,32 +382,35 @@ export class PolicyService extends ServiceBase<PolicyListResponse, PolicyList> i
return this.getPolicies();
}

// async reloadPolicies(result: DeepPartial<PolicyListResponse>): Promise<void> {
// const policySets = _.cloneDeep(_accessController.policySets);
// if (result?.items?.length > 0) {
// for (let item of result.items) {
// for (let [, policySet] of policySets) {
// if (policySet.combinables.has(item.payload?.id)) {
// const policy: PolicyWithCombinables = marshallResource(item.payload, 'policy');

// if (_.has(item.payload, 'rules') && !_.isEmpty(item.payload.rules)) {
// policy.combinables = await ruleService.getRules(item.payload.rules);

// if (policy.combinables.size != item?.payload?.rules?.length) {
// for (let id of item.payload.rules) {
// if (!policy.combinables.has(id)) {
// policy.combinables.set(id, null);
// }
// }
// }
// }
// _accessController.updatePolicy(policySet.id, policy);
// }
// }
// }
// }
// }

async superUpsert(request: PolicyList, ctx: any): Promise<DeepPartial<PolicyListResponse>> {
const result = await super.upsert(request, ctx);
const policySets = _.cloneDeep(_accessController.policySets);

if (result?.items?.length > 0) {
for (let item of result.items) {
for (let [, policySet] of policySets) {
if (policySet.combinables.has(item.payload?.id)) {
const policy: PolicyWithCombinables = marshallResource(item.payload, 'policy');

if (_.has(item.payload, 'rules') && !_.isEmpty(item.payload.rules)) {
policy.combinables = await ruleService.getRules(item.payload.rules);

if (policy.combinables.size != item?.payload?.rules?.length) {
for (let id of item.payload.rules) {
if (!policy.combinables.has(id)) {
policy.combinables.set(id, null);
}
}
}
}
_accessController.updatePolicy(policySet.id, policy);
}
}
}
}
await _accessController.loadPolicies();
return result;
}

Expand Down Expand Up @@ -445,30 +440,7 @@ export class PolicyService extends ServiceBase<PolicyListResponse, PolicyList> i
return { operation_status: acsResponse.operation_status };
}
const result = await super.create(request, ctx);
const policySets = _.cloneDeep(_accessController.policySets);

if (result?.items?.length > 0) {
for (let item of result.items) {
for (let [, policySet] of policySets) {
if (policySet.combinables.has(item.payload?.id)) {
const policy: PolicyWithCombinables = marshallResource(item.payload, 'policy');

if (_.has(item.payload, 'rules') && !_.isEmpty(item.payload.rules)) {
policy.combinables = await ruleService.getRules(item.payload.rules);

if (policy.combinables.size != item?.payload?.rules?.length) {
for (let id of item.payload.rules) {
if (!policy.combinables.has(id)) {
policy.combinables.set(id, null);
}
}
}
}
_accessController.updatePolicy(policySet.id, policy);
}
}
}
}
await this.reloadPolicies(result);

return result;
}
Expand Down Expand Up @@ -541,6 +513,7 @@ export class PolicyService extends ServiceBase<PolicyListResponse, PolicyList> i
return { operation_status: acsResponse.operation_status };
}
const result = await super.update(request, ctx);
await this.reloadPolicies(result);
return result;
}

Expand Down Expand Up @@ -569,7 +542,7 @@ export class PolicyService extends ServiceBase<PolicyListResponse, PolicyList> i
if (acsResponse.decision != Response_Decision.PERMIT) {
return { operation_status: acsResponse.operation_status };
}
const result = await super.upsert(request, ctx);
const result = await this.superUpsert(request, ctx);
return result;
}

Expand Down Expand Up @@ -985,7 +958,7 @@ export class PolicySetService extends ServiceBase<PolicySetListResponse, PolicyS
if (acsResponse.decision != Response_Decision.PERMIT) {
return { operation_status: acsResponse.operation_status };
}
const result = await super.upsert(request, ctx);
const result = await this.superUpsert(request, ctx);
return result;
}
}
Expand Down
16 changes: 2 additions & 14 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,6 @@ export class Worker {
_.assign({}, kafkaConfig, policySetConfig, policyConfig, ruleConfig));

kafkaConfig = this.cfg.get('events:kafka');
const acsEvents = [
'policy_setCreated',
'policy_setModified',
'policy_setDeleted',
'policyCreated',
'policyModified',
'policyDeleted',
'ruleCreated',
'ruleModified',
'ruleDeleted',
];
const hierarchicalScopesResponse = 'hierarchicalScopesResponse';
const events = new Events(kafkaConfig, this.logger); // Kafka
await events.start();
Expand Down Expand Up @@ -236,14 +225,13 @@ export class Worker {

this.logger.info('Access control service started correctly!');
await accessControlService.loadPolicies();
this.logger.info('Access control service policies loaded successfully');

const that = this;
const commandTopic = await events.topic(this.cfg.get('events:kafka:topics:command:topic'));
const eventListener = async (msg: any,
context: any, config: any, eventName: string): Promise<any> => {
if (acsEvents.indexOf(eventName) > -1) {
await accessControlService.loadPolicies();
} else if (eventName === hierarchicalScopesResponse) {
if (eventName === hierarchicalScopesResponse) {
// Add subject_id to waiting list
const hierarchical_scopes = msg?.hierarchical_scopes ? msg.hierarchical_scopes : [];
const tokenDate = msg?.token;
Expand Down

0 comments on commit 40a1727

Please sign in to comment.