-
Notifications
You must be signed in to change notification settings - Fork 150
Added lambda that integrates Forklift #411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5e9f7e2
f057066
0d1cf1b
da7ecb3
e24ea7a
ebadfec
cdd4cd8
89c5e65
565ecab
c60ab34
9b10bfc
b4ce803
e4a2cc1
9300d3d
88f7adb
5c5e310
92e4ff9
46a5197
29c5f94
adec419
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,230 @@ | ||
| ''' | ||
| This Lambda function implements the Forklift algorithm to generate zygote trees for Python workloads. | ||
| https://pages.cs.wisc.edu/~yuanzhuo/assets/pdf/forklift.pdf | ||
| ''' | ||
|
|
||
| import heapq | ||
| import traceback | ||
| from collections import defaultdict, namedtuple | ||
| import pandas as pd | ||
| from flask import Flask, request, jsonify | ||
|
|
||
| Candidate = namedtuple('Candidate', ['parent', 'child_pkgV', 'utility']) | ||
| QueueEntry = namedtuple('QueueEntry', ['neg_utility', 'uid', 'candidate']) | ||
|
|
||
| app = Flask(__name__) | ||
|
|
||
|
|
||
| def parse_workload(workload): | ||
| func_packages = {} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are the keys/values going to be? A comment would be nice. |
||
|
|
||
| for func in workload.get("funcs", []): | ||
| func_name = func.get("name", "") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is a function without a name valid input? If somebody gives us a garbage file, do we want to silently carry on, or crash in an obvious way? I'm not a fan of the .get's -- if there are specific cases where it is valid to have a field or not, then let's comment to justify it. |
||
| if not func_name: | ||
| continue | ||
|
|
||
| packages = set() | ||
| meta = func.get("meta", {}) | ||
| req_txt = meta.get("requirements_txt", "") | ||
|
|
||
| for line in req_txt.split("\n"): | ||
| line = line.strip() | ||
| if "==" in line and not line.startswith("#"): | ||
| packages.add(line) | ||
|
|
||
| if packages: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need the conditional? Is it a problem to have a function with no packages? |
||
| func_packages[func_name] = packages | ||
|
|
||
| # count call frequencies | ||
| call_counts = defaultdict(int) | ||
| for call in workload.get("calls", []): | ||
| name = call.get("name", "") | ||
| if name: | ||
| call_counts[name] += 1 | ||
|
|
||
| # rows=calls, columns=package==version, values=0/1 | ||
| rows = {} | ||
| if call_counts: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it valid to have an input with zero calls? Do we want to complicate the function with this case? |
||
| for func_name, packages in func_packages.items(): | ||
| count = max(1, call_counts.get(func_name, 1)) | ||
| for i in range(count): | ||
| rows[f"{func_name}_{i}"] = packages.copy() | ||
| else: | ||
| # if no call counts, just add one call per function | ||
| for func_name, packages in func_packages.items(): | ||
| rows[func_name] = packages.copy() | ||
|
|
||
| if not rows: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another check I'm not sure about |
||
| return pd.DataFrame(dtype=int) | ||
|
|
||
| # sorted list of all unique packages across all calls, to ensure consistent column ordering | ||
| all_pkgs = sorted(set().union(*rows.values())) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm working too hard to parse this, maybe a few lines will be more readable? |
||
| # create DataFrame with 1s for packages used by each call, 0s otherwise (sparse representation) | ||
| records = [{pkg: 1 for pkg in pkgs} for pkgs in rows.values()] | ||
| # reindex to ensure all columns are present and in the same order, filling missing values with 0 (dense matrix) | ||
| df = pd.DataFrame(records, index=list(rows.keys())) | ||
| df = df.reindex(columns=all_pkgs, fill_value=0).fillna(0).astype(int) | ||
| return df | ||
|
|
||
|
|
||
| def parse_deps(deps_json): | ||
| deps = {} | ||
| for pkg_name, versions in deps_json.items(): | ||
| for version, dep_strings in versions.items(): | ||
| pkgV = f"{pkg_name}=={version}" | ||
| deps[pkgV] = [] | ||
| for dep_str in dep_strings.keys(): | ||
| dep_packages = set(dep_str.split(",")) if dep_str else set() | ||
| deps[pkgV].append(dep_packages) | ||
| return deps | ||
|
|
||
|
|
||
| class ZygoteTree: | ||
| def __init__(self, calls, deps): | ||
| self.calls = calls | ||
| self.deps = deps | ||
| self.root = None | ||
| self.candidateQ = [] | ||
|
|
||
| def check_candidate_validity(self, parent, child_pkgV): | ||
| loaded_pkgs = parent.all_packages() | ||
| loaded_names = {p.split("==")[0] for p in loaded_pkgs} | ||
| # skip if already loaded or version conflict with ancestor | ||
| if child_pkgV in loaded_pkgs or child_pkgV.split("==")[0] in loaded_names: | ||
| return False | ||
| # all dependencies of child_pkgV must be satisfied by ancestors | ||
| dep_sets = self.deps.get(child_pkgV, []) | ||
| if dep_sets: | ||
| for dep in dep_sets[0]: | ||
| if dep != child_pkgV and dep not in loaded_pkgs: | ||
| return False | ||
| return True | ||
|
|
||
| def enqueue_top_child_candidate(self, parent): | ||
| best_candidate = None | ||
|
|
||
| for child_pkgV in parent.calls.columns: | ||
| if self.check_candidate_validity(parent, child_pkgV): | ||
| # utility = sum of the package column (usage frequency) | ||
| utility = int(parent.calls[child_pkgV].sum()) | ||
| # keep track of the best candidate with the highest utility | ||
| if utility > 0 and (best_candidate is None or utility > best_candidate.utility): | ||
| best_candidate = Candidate(parent, child_pkgV, utility) | ||
|
|
||
| # push the best candidate for this parent into the priority queue | ||
| if best_candidate is not None: | ||
| heapq.heappush(self.candidateQ, QueueEntry(-best_candidate.utility, id(best_candidate), best_candidate)) | ||
|
|
||
| def add_child_node(self, candidate): | ||
| parent = candidate.parent | ||
| child_pkgV = candidate.child_pkgV | ||
|
|
||
| # rows that import child_pkgV move to the child, remaining rows stay with the parent | ||
| child_calls = parent.calls[parent.calls[child_pkgV] != 0].copy() | ||
|
|
||
| child = Node(calls=child_calls, packages={child_pkgV}) | ||
| child.parent = parent | ||
| parent.children.append(child) | ||
|
|
||
| parent.calls = parent.calls.drop(child_calls.index).copy() | ||
|
|
||
| self.enqueue_top_child_candidate(parent) | ||
| self.enqueue_top_child_candidate(child) | ||
|
|
||
| def build_tree(self, desired_nodes): | ||
| self.candidateQ = [] # priority queue of candidates with highest utility first | ||
|
|
||
| # start from a root with all calls and no preloaded packages. | ||
| self.root = Node(self.calls, set()) | ||
|
|
||
| # initialize the candidate queue with the root's best child candidate | ||
| self.enqueue_top_child_candidate(self.root) | ||
|
|
||
| while desired_nodes > 0 and self.candidateQ: | ||
| best_candidate = heapq.heappop(self.candidateQ).candidate | ||
| self.add_child_node(best_candidate) | ||
| desired_nodes -= 1 | ||
|
|
||
| def to_dict(self): | ||
| return self.root.to_dict() | ||
|
|
||
|
|
||
| class Node: | ||
| def __init__(self, calls, packages=None, parent=None): | ||
| self.calls = calls | ||
| self.packages = packages or set() # packages pre-loaded at this node | ||
| self.children = [] | ||
| self.parent = parent | ||
|
|
||
| def all_packages(self): | ||
| # gets all the current packages at this node, including those inherited from parents | ||
| if self.parent is None: | ||
| return self.packages.copy() | ||
| return self.parent.all_packages() | self.packages # combine this set with parent's packages | ||
|
|
||
| def to_dict(self): | ||
| return { | ||
| "packages": sorted(list(self.packages)), | ||
| "children": [child.to_dict() for child in self.children] | ||
| } | ||
|
|
||
|
|
||
| @app.route("/", methods=["POST"]) | ||
| def f(): | ||
| ''' | ||
| Expected input format: | ||
|
|
||
| { | ||
| "workload": { | ||
| "funcs": [ | ||
| { | ||
| "name": "function_name", | ||
| "meta": { | ||
| "requirements_txt": "package1==version\npackage2==version\n..." | ||
| } | ||
| }, | ||
| ... | ||
| ], | ||
| "calls": [ | ||
| {"name": "function_name"}, | ||
| ... | ||
| ] | ||
| }, | ||
| "deps": { | ||
| <package_name>: { | ||
| <version>: { | ||
| <comma_separated_deps>: <call_frequency>, | ||
| ... | ||
| }, | ||
| ... | ||
| }, | ||
| ... | ||
| }, | ||
| "num_nodes": <int> | ||
| } | ||
| ''' | ||
|
|
||
| try: | ||
| event = request.get_json() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps the body of this try should be it's own function to cleanly separate logic: one function for core logic, another (which calls the former) that adds error handling. |
||
|
|
||
| workload_data = event.get("workload") | ||
| deps_data = event.get("deps") | ||
| num_nodes = event.get("num_nodes") | ||
|
|
||
| # parse inputs | ||
| calls = parse_workload(workload_data) | ||
| deps = parse_deps(deps_data) | ||
|
|
||
| # build tree | ||
| tree = ZygoteTree(calls, deps) | ||
| tree.build_tree(num_nodes) | ||
|
|
||
| result = tree.to_dict() | ||
|
|
||
| return jsonify(result), 200 | ||
|
|
||
| except Exception as e: | ||
| return jsonify({ | ||
| "error": str(e), | ||
| "traceback": traceback.format_exc() | ||
| }), 500 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| flask |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| # | ||
| # This file is autogenerated by pip-compile with Python 3.12 | ||
| # | ||
| # | ||
| blinker==1.9.0 | ||
| # via flask | ||
| click==8.3.2 | ||
| # via flask | ||
| flask==3.1.3 | ||
| # via -r requirements.in | ||
| itsdangerous==2.2.0 | ||
| # via flask | ||
| jinja2==3.1.6 | ||
| # via flask | ||
| markupsafe==3.0.3 | ||
| # via | ||
| # flask | ||
| # jinja2 | ||
| # werkzeug | ||
| werkzeug==3.1.8 | ||
| # via flask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is it going to return? describe the pandas df, and what the rows+columns mean