https://prefect.io logo
#random
Title
t

Tim-Oliver

04/04/2022, 2:39 PM
Hi! Thanks for Prefect! I was wondering when we could expect map support in Prefect 2? No worries if this can't be answered. Just curious!
1
k

Kevin Kho

04/04/2022, 2:43 PM
Even without mapping, you can achieve parallelism by using a for loop and submitting tasks. Is there something you are trying to do that needs mapping specifically? Currently, you can do:
Copy code
@flow
def myflow(items: List[str]):
    a = []
    b = []
    c = []
    for x in items:
        a.append(task_one(x))
        b.append(task_two(x))
        c.append(task_three(x))
t

Tim-Oliver

04/04/2022, 2:52 PM
I am trying to convert his Prefect 1.0 flow to Prefect 2.0:
Copy code
files = list_files(pattern)
# Apply the preprocessing function to all files.
preprocessed = preprocess.map(files)
# Apply the segmentation function to all preprocessed files.
segmented = segment.map(preprocessed)
# Take all raw files and apply the segmentation masks to them.
masked_bg = mask_bg.map(files, segmented)
k

Kevin Kho

04/04/2022, 2:54 PM
The above structure can work for it I think:
Copy code
@flow
def myflow(items: List[str]):
    a = []
    b = []
    c = []
    for x in items:
        a.append(task_one(x))
        _ = task_two(x)
        b.append(_)
        c.append(task_three(x, _))
t

Tim-Oliver

04/04/2022, 2:55 PM
trying it out 🙂
k

Kevin Kho

04/04/2022, 2:56 PM
ah one sec. this wont be concurrent because of
_ = task_two(x)
being reassigned
Copy code
@flow
def myflow(items: List[str]):
    a = []
    b = []
    c = []
    for x in items:
        a.append(task_one(x))
        b.append(task_two(x))
        c.append(task_three(x, b[-1]))
t

Tim-Oliver

04/04/2022, 3:00 PM
why is enumerate necessary?
k

Kevin Kho

04/04/2022, 3:01 PM
oops i was using index instead of the
[-1]
then changed. it’s not. will edit above
t

Tim-Oliver

04/04/2022, 3:05 PM
I am running into
sqlite3.OperationalError: database is locked
Sorry, more context:
It runs through, at least all outputs are there but I get this database is locked exception.
and more context:
I am using
@task(cache_key_fn=task_input_hash)
k

Kevin Kho

04/04/2022, 3:07 PM
Ok this is just Orion without Cloud right? I can try to replicate
t

Tim-Oliver

04/04/2022, 3:07 PM
Correct, all local.
These are my tasks:
Copy code
@task
def list_files(pattern: str):
    files = glob(pattern)
    return files

@task(cache_key_fn=task_input_hash)
def preprocess(in_file: str, target_dir: str):
    img = imread(in_file)
    img = median(img).astype(np.float32)
    img = np.clip((img - img.min())*255/(img.max() - img.min()), 0, 255)
    
    target_path = create_output(in_file, target_dir, suffix="_preprocessed")    
    imsave(target_path, img.astype(np.uint8), check_contrast=False)
    return target_path

@task(cache_key_fn=task_input_hash)
def segment(in_file: str, target_dir: str):
    img = imread(in_file)
    ts = threshold_otsu(img)
    labels = label(img > ts)
    
    target_path = create_output(in_file, target_dir, suffix="_SEG")
    imsave(target_path, labels.astype(np.int16), check_contrast=False)
    return target_path

@task(cache_key_fn=task_input_hash)
def mask_bg(in_file: str, mask: str, target_dir: str):
    img = imread(in_file)
    mask = imread(mask)
    img = img * (mask > 0).astype(np.int8)
    
    target_path = create_output(in_file, target_dir, suffix="_MASKBG")
    imsave(target_path, img.astype(img.dtype), check_contrast=False)
    return target_path
Basically every task loads the image data, does some computation and saves the file back to disk.
(suffix should probably also become a task argument, just to be safe with the hashing)
This is the flow:
Copy code
@flow(name="Background Masking")
def background_masking(pattern: str):
    files = list_files(pattern)
    
    preprocessed = []
    segmented = []
    masked = []
    for f in files.wait().result():
        preprocessed.append(preprocess(f, target_dir="/tmp/1k_blobs_preprocessed/"))
        segmented.append(segment(preprocessed[-1], target_dir="/tmp/1k_blobs_segmented/"))
        masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/tmp/1k_blobs_masked/"))
k

Kevin Kho

04/04/2022, 3:10 PM
I think there will be an issue here though for the caching (not related to the flow structure)
t

Tim-Oliver

04/04/2022, 3:10 PM
Sorry for dropping so many things 😄
If I am reading this correctly I could get around it by making a for-loop for each queue and add
.wait().results()
on each queue.
Nope, this is wrong.
k

Kevin Kho

04/04/2022, 3:16 PM
I pared it down to a minimum example and it seems to work:
Copy code
from prefect import task, flow 
from prefect.tasks import task_input_hash
import time

@task
def list_files():
    return ["1.txt", "2.txt", "3.txt", "4.txt", "5.txt", "6.txt"]

@task(cache_key_fn=task_input_hash)
def preprocess(in_file: str, target_dir: str):
    time.sleep(5)
    return f"{target_dir}-{in_file}"

@task(cache_key_fn=task_input_hash)
def segment(in_file: str, target_dir: str):
    return f"{target_dir}-{in_file}"

@task(cache_key_fn=task_input_hash)
def mask_bg(in_file: str, mask: str, target_dir: str):
    return f"{target_dir}-mask-{in_file}"

@flow(name="Background Masking")
def background_masking():
    files = list_files()
    preprocessed = []
    segmented = []
    masked = []
    for f in files.wait().result():
        preprocessed.append(preprocess(f, target_dir="/preprocessed/"))
        segmented.append(segment(preprocessed[-1], target_dir="/segmented/"))
        masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/masked/"))

background_masking()
This looks similar to your structure right?
t

Tim-Oliver

04/04/2022, 3:17 PM
I think so
k

Kevin Kho

04/04/2022, 3:18 PM
And the parallelism was achieved. What scale are you running? Wondering if the db gets affected at a large number
t

Tim-Oliver

04/04/2022, 3:19 PM
Interesting, works on my system as well!
I am using 10 files currently.
Running it a second time with this change:
Copy code
@task
def list_files():
    return ["1.txt", "2.txt", "3.txt", "4.txt", "5.txt", "6.txt", "7.txt", "8.txt", "9.txt", "10.txt"]
fails for me 😕
k

Kevin Kho

04/04/2022, 3:21 PM
WHAT!? Ok let me try
Ok that works for me but I presume it will fail at some scale. One sec will blow up this list
t

Tim-Oliver

04/04/2022, 3:23 PM
I am running this from a jupyter notebook, is that a problem?
k

Kevin Kho

04/04/2022, 3:23 PM
Copy code
@task
def list_files():
    return 5*["1.txt", "2.txt", "3.txt", "4.txt", "5.txt", "6.txt", "7.txt","8.txt","9.txt","10.txt"]
this works for me though. What Orion version are you on?
Ah I dunno. Let me try.
I can’t replicate on notebook. Can you try on script? and if you are on
prefect==2.0b2
then we should open an issue
t

Tim-Oliver

04/04/2022, 3:25 PM
I am on 2.0b2, will try it with a script.
My orion.db is stored on a file-server, maybe that is the problem.
Will provide a different orion.db directory which is hopefully faster.
I have started the orion UI in parallel and see now this error there:
Copy code
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: PRAGMA journal_mode = WAL;]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
k

Kevin Kho

04/04/2022, 3:37 PM
Ok will need to ask someone on the team and get back to you
t

Tim-Oliver

04/04/2022, 3:37 PM
Thanks a lot for your time and help! Much appreciated 🙂
k

Kevin Kho

04/04/2022, 3:38 PM
Of course!
So the database lock does happen for large scale, but you should not be seeing it at this scale. Something is pretty weird. Maybe resetting the database can help. For large loads though, you can spin up a postgres database too. More info here . But something does seem wrong with your current SQLite db
t

Tim-Oliver

04/04/2022, 4:06 PM
Thanks!
I will try out a couple more things over the next days. Also trying it locally on my laptop instead. Might be something else in the setup here..
Gotta run. Thanks again.
z

Zanie

04/04/2022, 6:05 PM
If you’re storing a SQLite database on a remote file system then the file-locking necessary to prevent concurrent writes is likely to fail.
☝️ 1
🙏 1
The list of WAL disadvantages at https://sqlite.org/wal.html may be helpful
@terrence we should highlight this somewhere in the documentation
👍 1
a

Anna Geller

04/04/2022, 6:18 PM
@Marvin open "Orion: add docs on concurrent writes to SQLite DB"
t

Tim-Oliver

04/05/2022, 6:25 AM
Thanks for looking into this. What would be the fix for this? Is there something I can do about that? In my case all our data is stored on a remote file system, including our home directories. Which would mean that any local flow execution will run into this problem on our system.
a

Anna Geller

04/05/2022, 9:32 AM
You can switch to Postgres or even directly to Cloud 2.0 (there is free tier, no credit card required). SQLite doesn't handle concurrent writes well at scale in remote environments therefore you would need to switch to Postgres
t

Tim-Oliver

04/05/2022, 10:08 AM
I think this would be the solution for production deployment and probably also the way we would go. But for local quick test runs it would still fail if it defaults to sqlite. Or am I missing something? Thanks for your help.
a

Anna Geller

04/05/2022, 10:34 AM
Yes exactly. That's why you may switch to Postgres, even locally, here is how you can do that https://orion-docs.prefect.io/concepts/database/
🙏 1
t

Tim-Oliver

04/05/2022, 1:53 PM
I am back 😅 I setup a postgres DB and set the env variable accordingly. Now if I start prefect orion I get:
Copy code
sqlalchemy.exc.ProgrammingError: (sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.UndefinedFunctionError'>: function gen_random_uuid() does not exist
HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
[SQL:
CREATE TABLE flow (
        id UUID DEFAULT (GEN_RANDOM_UUID()) NOT NULL,
        created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
        updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
        name VARCHAR NOT NULL,
        tags JSONB DEFAULT '[]' NOT NULL,
        CONSTRAINT pk_flow PRIMARY KEY (id),
        CONSTRAINT uq_flow__name UNIQUE (name)
)

]
(Background on this error at: <https://sqlalche.me/e/14/f405>)
Did I mess up my postgres setup or is it a problem with orion?
It is very likely that my postgres setup is wrong, but just in case I wanted to ask here as well.
k

Kevin Kho

04/05/2022, 1:55 PM
I think this is postgres version related. What version did you use?
a

Anna Geller

04/05/2022, 2:07 PM
What can help here is removing the SQLite DB, then pointing to Postgres DB and then resetting the DB so that Prefect can rebuild the schema:
Copy code
rm ~/.prefect/orion.db
export PREFECT_ORION_DATABASE_CONNECTION_URL="<postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/orion>"

prefect orion database reset -y
prefect orion start
t

Tim-Oliver

04/05/2022, 2:09 PM
(PostgreSQL) 10.19, maybe a bit old?
@Anna Geller tried to reset the db, but got the same error.
k

Kevin Kho

04/05/2022, 3:02 PM
I think you need at least postgres 13
👍 2
t

Tim-Oliver

04/06/2022, 3:36 PM
FYI: This issue is resolved by using a postgres (>= 13) DB as you suggested.
👍 1
62 Views