Hi! Thanks for Prefect! I was wondering when we co...
# random
t
Hi! Thanks for Prefect! I was wondering when we could expect map support in Prefect 2? No worries if this can't be answered. Just curious!
โœ… 1
k
Even without mapping, you can achieve parallelism by using a for loop and submitting tasks. Is there something you are trying to do that needs mapping specifically? Currently, you can do:
Copy code
@flow
def myflow(items: List[str]):
    a = []
    b = []
    c = []
    for x in items:
        a.append(task_one(x))
        b.append(task_two(x))
        c.append(task_three(x))
t
I am trying to convert his Prefect 1.0 flow to Prefect 2.0:
Copy code
files = list_files(pattern)
# Apply the preprocessing function to all files.
preprocessed = preprocess.map(files)
# Apply the segmentation function to all preprocessed files.
segmented = segment.map(preprocessed)
# Take all raw files and apply the segmentation masks to them.
masked_bg = mask_bg.map(files, segmented)
k
The above structure can work for it I think:
Copy code
@flow
def myflow(items: List[str]):
    a = []
    b = []
    c = []
    for x in items:
        a.append(task_one(x))
        _ = task_two(x)
        b.append(_)
        c.append(task_three(x, _))
t
trying it out ๐Ÿ™‚
k
ah one sec. this wont be concurrent because of
_ = task_two(x)
being reassigned
Copy code
@flow
def myflow(items: List[str]):
    a = []
    b = []
    c = []
    for x in items:
        a.append(task_one(x))
        b.append(task_two(x))
        c.append(task_three(x, b[-1]))
t
why is enumerate necessary?
k
oops i was using index instead of the
[-1]
then changed. itโ€™s not. will edit above
t
I am running into
sqlite3.OperationalError: database is locked
Sorry, more context:
It runs through, at least all outputs are there but I get this database is locked exception.
and more context:
I am using
@task(cache_key_fn=task_input_hash)
k
Ok this is just Orion without Cloud right? I can try to replicate
t
Correct, all local.
These are my tasks:
Copy code
@task
def list_files(pattern: str):
    files = glob(pattern)
    return files

@task(cache_key_fn=task_input_hash)
def preprocess(in_file: str, target_dir: str):
    img = imread(in_file)
    img = median(img).astype(np.float32)
    img = np.clip((img - img.min())*255/(img.max() - img.min()), 0, 255)
    
    target_path = create_output(in_file, target_dir, suffix="_preprocessed")    
    imsave(target_path, img.astype(np.uint8), check_contrast=False)
    return target_path

@task(cache_key_fn=task_input_hash)
def segment(in_file: str, target_dir: str):
    img = imread(in_file)
    ts = threshold_otsu(img)
    labels = label(img > ts)
    
    target_path = create_output(in_file, target_dir, suffix="_SEG")
    imsave(target_path, labels.astype(np.int16), check_contrast=False)
    return target_path

@task(cache_key_fn=task_input_hash)
def mask_bg(in_file: str, mask: str, target_dir: str):
    img = imread(in_file)
    mask = imread(mask)
    img = img * (mask > 0).astype(np.int8)
    
    target_path = create_output(in_file, target_dir, suffix="_MASKBG")
    imsave(target_path, img.astype(img.dtype), check_contrast=False)
    return target_path
Basically every task loads the image data, does some computation and saves the file back to disk.
(suffix should probably also become a task argument, just to be safe with the hashing)
This is the flow:
Copy code
@flow(name="Background Masking")
def background_masking(pattern: str):
    files = list_files(pattern)
    
    preprocessed = []
    segmented = []
    masked = []
    for f in files.wait().result():
        preprocessed.append(preprocess(f, target_dir="/tmp/1k_blobs_preprocessed/"))
        segmented.append(segment(preprocessed[-1], target_dir="/tmp/1k_blobs_segmented/"))
        masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/tmp/1k_blobs_masked/"))
k
I think there will be an issue here though for the caching (not related to the flow structure)
t
Sorry for dropping so many things ๐Ÿ˜„
If I am reading this correctly I could get around it by making a for-loop for each queue and add
.wait().results()
on each queue.
Nope, this is wrong.
k
I pared it down to a minimum example and it seems to work:
Copy code
from prefect import task, flow 
from prefect.tasks import task_input_hash
import time

@task
def list_files():
    return ["1.txt", "2.txt", "3.txt", "4.txt", "5.txt", "6.txt"]

@task(cache_key_fn=task_input_hash)
def preprocess(in_file: str, target_dir: str):
    time.sleep(5)
    return f"{target_dir}-{in_file}"

@task(cache_key_fn=task_input_hash)
def segment(in_file: str, target_dir: str):
    return f"{target_dir}-{in_file}"

@task(cache_key_fn=task_input_hash)
def mask_bg(in_file: str, mask: str, target_dir: str):
    return f"{target_dir}-mask-{in_file}"

@flow(name="Background Masking")
def background_masking():
    files = list_files()
    preprocessed = []
    segmented = []
    masked = []
    for f in files.wait().result():
        preprocessed.append(preprocess(f, target_dir="/preprocessed/"))
        segmented.append(segment(preprocessed[-1], target_dir="/segmented/"))
        masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/masked/"))

background_masking()
This looks similar to your structure right?
t
I think so
k
And the parallelism was achieved. What scale are you running? Wondering if the db gets affected at a large number
t
Interesting, works on my system as well!
I am using 10 files currently.
Running it a second time with this change:
Copy code
@task
def list_files():
    return ["1.txt", "2.txt", "3.txt", "4.txt", "5.txt", "6.txt", "7.txt", "8.txt", "9.txt", "10.txt"]
fails for me ๐Ÿ˜•
k
WHAT!? Ok let me try
Ok that works for me but I presume it will fail at some scale. One sec will blow up this list
t
I am running this from a jupyter notebook, is that a problem?
k
Copy code
@task
def list_files():
    return 5*["1.txt", "2.txt", "3.txt", "4.txt", "5.txt", "6.txt", "7.txt","8.txt","9.txt","10.txt"]
this works for me though. What Orion version are you on?
Ah I dunno. Let me try.
I canโ€™t replicate on notebook. Can you try on script? and if you are on
prefect==2.0b2
then we should open an issue
t
I am on 2.0b2, will try it with a script.
My orion.db is stored on a file-server, maybe that is the problem.
Will provide a different orion.db directory which is hopefully faster.
I have started the orion UI in parallel and see now this error there:
Copy code
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: PRAGMA journal_mode = WAL;]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
k
Ok will need to ask someone on the team and get back to you
t
Thanks a lot for your time and help! Much appreciated ๐Ÿ™‚
k
Of course!
So the database lock does happen for large scale, but you should not be seeing it at this scale. Something is pretty weird. Maybe resetting the database can help. For large loads though, you can spin up a postgres database too. More info here . But something does seem wrong with your current SQLite db
t
Thanks!
I will try out a couple more things over the next days. Also trying it locally on my laptop instead. Might be something else in the setup here..
Gotta run. Thanks again.
z
If youโ€™re storing a SQLite database on a remote file system then the file-locking necessary to prevent concurrent writes is likely to fail.
โ˜๏ธ 1
๐Ÿ™ 1
The list of WAL disadvantages at https://sqlite.org/wal.html may be helpful
@terrence we should highlight this somewhere in the documentation
๐Ÿ‘ 1
a
@Marvin open "Orion: add docs on concurrent writes to SQLite DB"
t
Thanks for looking into this. What would be the fix for this? Is there something I can do about that? In my case all our data is stored on a remote file system, including our home directories. Which would mean that any local flow execution will run into this problem on our system.
a
You can switch to Postgres or even directly to Cloud 2.0 (there is free tier, no credit card required). SQLite doesn't handle concurrent writes well at scale in remote environments therefore you would need to switch to Postgres
t
I think this would be the solution for production deployment and probably also the way we would go. But for local quick test runs it would still fail if it defaults to sqlite. Or am I missing something? Thanks for your help.
a
Yes exactly. That's why you may switch to Postgres, even locally, here is how you can do that https://orion-docs.prefect.io/concepts/database/
๐Ÿ™ 1
t
I am back ๐Ÿ˜… I setup a postgres DB and set the env variable accordingly. Now if I start prefect orion I get:
Copy code
sqlalchemy.exc.ProgrammingError: (sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.UndefinedFunctionError'>: function gen_random_uuid() does not exist
HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
[SQL:
CREATE TABLE flow (
        id UUID DEFAULT (GEN_RANDOM_UUID()) NOT NULL,
        created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
        updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
        name VARCHAR NOT NULL,
        tags JSONB DEFAULT '[]' NOT NULL,
        CONSTRAINT pk_flow PRIMARY KEY (id),
        CONSTRAINT uq_flow__name UNIQUE (name)
)

]
(Background on this error at: <https://sqlalche.me/e/14/f405>)
Did I mess up my postgres setup or is it a problem with orion?
It is very likely that my postgres setup is wrong, but just in case I wanted to ask here as well.
k
I think this is postgres version related. What version did you use?
a
What can help here is removing the SQLite DB, then pointing to Postgres DB and then resetting the DB so that Prefect can rebuild the schema:
Copy code
rm ~/.prefect/orion.db
export PREFECT_ORION_DATABASE_CONNECTION_URL="<postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/orion>"

prefect orion database reset -y
prefect orion start
t
(PostgreSQL) 10.19, maybe a bit old?
@Anna Geller tried to reset the db, but got the same error.
k
I think you need at least postgres 13
๐Ÿ‘ 2
t
FYI: This issue is resolved by using a postgres (>= 13) DB as you suggested.
๐Ÿ‘ 1
269 Views