Tim-Oliver
04/04/2022, 2:39 PMKevin Kho
@flow
def myflow(items: List[str]):
a = []
b = []
c = []
for x in items:
a.append(task_one(x))
b.append(task_two(x))
c.append(task_three(x))
Tim-Oliver
04/04/2022, 2:52 PMfiles = list_files(pattern)
# Apply the preprocessing function to all files.
preprocessed = preprocess.map(files)
# Apply the segmentation function to all preprocessed files.
segmented = segment.map(preprocessed)
# Take all raw files and apply the segmentation masks to them.
masked_bg = mask_bg.map(files, segmented)
Kevin Kho
@flow
def myflow(items: List[str]):
a = []
b = []
c = []
for x in items:
a.append(task_one(x))
_ = task_two(x)
b.append(_)
c.append(task_three(x, _))
Tim-Oliver
04/04/2022, 2:55 PMKevin Kho
_ = task_two(x)
being reassigned@flow
def myflow(items: List[str]):
a = []
b = []
c = []
for x in items:
a.append(task_one(x))
b.append(task_two(x))
c.append(task_three(x, b[-1]))
Tim-Oliver
04/04/2022, 3:00 PMKevin Kho
[-1]
then changed. it’s not. will edit aboveTim-Oliver
04/04/2022, 3:05 PMsqlite3.OperationalError: database is locked
@task(cache_key_fn=task_input_hash)
Kevin Kho
Tim-Oliver
04/04/2022, 3:07 PM@task
def list_files(pattern: str):
files = glob(pattern)
return files
@task(cache_key_fn=task_input_hash)
def preprocess(in_file: str, target_dir: str):
img = imread(in_file)
img = median(img).astype(np.float32)
img = np.clip((img - img.min())*255/(img.max() - img.min()), 0, 255)
target_path = create_output(in_file, target_dir, suffix="_preprocessed")
imsave(target_path, img.astype(np.uint8), check_contrast=False)
return target_path
@task(cache_key_fn=task_input_hash)
def segment(in_file: str, target_dir: str):
img = imread(in_file)
ts = threshold_otsu(img)
labels = label(img > ts)
target_path = create_output(in_file, target_dir, suffix="_SEG")
imsave(target_path, labels.astype(np.int16), check_contrast=False)
return target_path
@task(cache_key_fn=task_input_hash)
def mask_bg(in_file: str, mask: str, target_dir: str):
img = imread(in_file)
mask = imread(mask)
img = img * (mask > 0).astype(np.int8)
target_path = create_output(in_file, target_dir, suffix="_MASKBG")
imsave(target_path, img.astype(img.dtype), check_contrast=False)
return target_path
@flow(name="Background Masking")
def background_masking(pattern: str):
files = list_files(pattern)
preprocessed = []
segmented = []
masked = []
for f in files.wait().result():
preprocessed.append(preprocess(f, target_dir="/tmp/1k_blobs_preprocessed/"))
segmented.append(segment(preprocessed[-1], target_dir="/tmp/1k_blobs_segmented/"))
masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/tmp/1k_blobs_masked/"))
Kevin Kho
Tim-Oliver
04/04/2022, 3:10 PM.wait().results()
on each queue.Kevin Kho
from prefect import task, flow
from prefect.tasks import task_input_hash
import time
@task
def list_files():
return ["1.txt", "2.txt", "3.txt", "4.txt", "5.txt", "6.txt"]
@task(cache_key_fn=task_input_hash)
def preprocess(in_file: str, target_dir: str):
time.sleep(5)
return f"{target_dir}-{in_file}"
@task(cache_key_fn=task_input_hash)
def segment(in_file: str, target_dir: str):
return f"{target_dir}-{in_file}"
@task(cache_key_fn=task_input_hash)
def mask_bg(in_file: str, mask: str, target_dir: str):
return f"{target_dir}-mask-{in_file}"
@flow(name="Background Masking")
def background_masking():
files = list_files()
preprocessed = []
segmented = []
masked = []
for f in files.wait().result():
preprocessed.append(preprocess(f, target_dir="/preprocessed/"))
segmented.append(segment(preprocessed[-1], target_dir="/segmented/"))
masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/masked/"))
background_masking()
Tim-Oliver
04/04/2022, 3:17 PMKevin Kho
Tim-Oliver
04/04/2022, 3:19 PM@task
def list_files():
return ["1.txt", "2.txt", "3.txt", "4.txt", "5.txt", "6.txt", "7.txt", "8.txt", "9.txt", "10.txt"]
Kevin Kho
Tim-Oliver
04/04/2022, 3:23 PMKevin Kho
@task
def list_files():
return 5*["1.txt", "2.txt", "3.txt", "4.txt", "5.txt", "6.txt", "7.txt","8.txt","9.txt","10.txt"]
this works for me though. What Orion version are you on?prefect==2.0b2
then we should open an issueTim-Oliver
04/04/2022, 3:25 PMsqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: PRAGMA journal_mode = WAL;]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
Kevin Kho
Tim-Oliver
04/04/2022, 3:37 PMKevin Kho
Tim-Oliver
04/04/2022, 4:06 PMZanie
Anna Geller
Marvin
04/04/2022, 6:18 PMTim-Oliver
04/05/2022, 6:25 AMAnna Geller
Tim-Oliver
04/05/2022, 10:08 AMAnna Geller
Tim-Oliver
04/05/2022, 1:53 PMsqlalchemy.exc.ProgrammingError: (sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.UndefinedFunctionError'>: function gen_random_uuid() does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
[SQL:
CREATE TABLE flow (
id UUID DEFAULT (GEN_RANDOM_UUID()) NOT NULL,
created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
name VARCHAR NOT NULL,
tags JSONB DEFAULT '[]' NOT NULL,
CONSTRAINT pk_flow PRIMARY KEY (id),
CONSTRAINT uq_flow__name UNIQUE (name)
)
]
(Background on this error at: <https://sqlalche.me/e/14/f405>)
Kevin Kho
Anna Geller
rm ~/.prefect/orion.db
export PREFECT_ORION_DATABASE_CONNECTION_URL="<postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/orion>"
prefect orion database reset -y
prefect orion start
Tim-Oliver
04/05/2022, 2:09 PMKevin Kho
Tim-Oliver
04/06/2022, 3:36 PM