https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • e

    Emma Rizzi

    01/14/2022, 4:57 PM
    Hi! Do you know if there are any limitation towards aliases in automation actions ? I tried to perform the same action on my personnal adress and a gmail alias, the alias test never arrived on my inbox šŸ˜ž
    k
    a
    • 3
    • 9
  • h

    Henrietta Salonen

    01/14/2022, 5:06 PM
    Hey, anyone fetching data from Salesforce using Prefect and care to share their approach for doing that?
    k
    d
    • 3
    • 16
  • w

    William Jevne

    01/14/2022, 5:24 PM
    Hi all, I am working on a flow of flows and am wondering if there is an easy way to pass user provided context to the sub-flows.
    k
    • 2
    • 2
  • y

    Yusuf Khan

    01/14/2022, 5:34 PM
    Looking at this: https://github.com/PrefectHQ/prefect/issues/3375 I'm trying to get ShellTask to run powershell. I have the powershell executable on the path in the machine, as "pwsh".
    run_in_pwsh = ShellTask(name="Powershell run", shell="pwsh")
    this is what I was trying to run. Then within the flow I had:
    run_in_pwsh(command='ls')
    The documentation for the shell argument says 'shell to run the command with; defaults to "bash"'. I assumed it would accept whatever as long as that kicked off in the terminal correctly? What I'm actually trying to do is run an azure command line utility called 'azcopy' (which is not part of the generic az cli). I need to do it both on a windows machine and a linux machine. Having separate scripts is fine. Any thoughts for how I could/should do this on windows?
    k
    m
    • 3
    • 34
  • j

    Jason Motley

    01/14/2022, 7:02 PM
    Has anyone ever run into an issue where they get the errorĀ 
    Nonetype object has no attribute 'to_sql
    Ā butĀ onlyĀ in production? I've spot checked and my load statement is constructed identically, registration is fine, etc. Local runs go fine as well.
    k
    • 2
    • 3
  • a

    Andrey Tatarinov

    01/14/2022, 7:20 PM
    Hi! I'm trying to setup third party (sqlalchemy) logs to output in flow logs. I'm using Docker storage and KubernetesAgent. I do something similar to https://docs.prefect.io/core/concepts/logging.html#extra-loggers in my register_flow.py script and I see some logs when Prefect is capturing flow object for serialization, hence I know it's working. But during the run, no logs from sqlalchemy are written. It seems that logging configuration is not serialized during flow serialization. What is the preferred way to setup third-party logging in flows?
    k
    • 2
    • 8
  • p

    Philip MacMenamin

    01/14/2022, 8:24 PM
    I've stood up a prefect server instance within a network, I'm able to see the ip:8080, and I'm able to see ip:4200, but if I look at 8080 it's telling me that it cannot connect to:
    Connecting to Prefect Server at ip/graphql:4200
    k
    • 2
    • 2
  • j

    Jon Ruhnke

    01/14/2022, 9:35 PM
    I'm having a problem creating a prefect flow to process a large number of xml files (40k) without running out of ram. Is this the right place to ask for help?
    k
    • 2
    • 18
  • g

    Greg Adams

    01/14/2022, 9:38 PM
    Hi again! Is there a storage/deployment pattern that allows me to include custom modules for my flow at registration time, rather than rebuilding the docker image whenever I want to update them? I thought the Git storage might include the extra python files but it’s not liking it (maybe I’m doing it all wrong?)
    k
    • 2
    • 4
  • j

    Josh

    01/14/2022, 11:01 PM
    I’m running into a mypy issue with Prefect Tasks. Mypy will product an error
    error: <nothing> not callable
    for the task run when I try to test it out
    class MyTask(Task):
        def run(self): 
            # do something
            return True
    
    if __name__ == "__main__":
        my_task = MyTask()
        with Flow("My Flow") as flow:
            my_task()
        flow.run()
    k
    • 2
    • 3
  • a

    Amber Papillon

    01/16/2022, 3:04 AM
    Hey guys, quick question. Has this been implemented yet? https://github.com/PrefectHQ/prefect/issues/2254
    k
    • 2
    • 4
  • p

    Philipp Eisen

    01/16/2022, 11:54 AM
    EDIT: This was because my dask runners were not accessing the same orion database. Hey! I was testing to run orion with a DaskCluster that is deployed in kubernetes; I’m starting the flow locally and point to the daskcluster on localhost - (using port-forwarding for the scheduler) When running a local DaskCluster it works fine I’m always getting this error:
    distributed.worker - WARNING - Compute Failed
    Function:  orchestrate_task_run
    args:      ()
    kwargs:    {'task': <prefect.tasks.Task object at 0x7f02f0f56430>, 'task_run': TaskRun(id=UUID('2acf899f-67f5-4717-9665-c91f730f3719'), created=datetime.datetime(2022, 1, 16, 11, 45, 38, 995585, tzinfo=datetime.timezone.utc), updated=datetime.datetime(2022, 1, 16, 11, 45, 39, 19000, tzinfo=datetime.timezone.utc), name='get-product-b7ee3036-0', flow_run_id=UUID('a3e75090-2e7d-42f6-8dda-b00600f70b12'), task_key='b7ee3036fbe1354fe2fbf30215a316c4', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=10, retry_delay_seconds=0.0), tags=[], state_id=UUID('36aa692f-175d-4bff-81ed-e57f2228cdfa'), task_inputs={}, state_type=StateType.PENDING, run_count=0, expected_start_time=datetime.datetime(2022, 1, 16, 11, 45, 38, 988955, tzinfo=datetime.timezone.utc), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta
    Exception: "ValueError('Invalid task run: 2acf899f-67f5-4717-9665-c91f730f3719')"
    Is there something obvious I’m missing?
    a
    • 2
    • 3
  • t

    Tao Bian

    01/16/2022, 9:39 PM
    Hi, I am having a flow scheduled daily run, and I tried to get the timestamp inside the flow, why I got the exact same timestamp written into database every day?
    @task
    def write_timestamp_into_database():
        ...
    
    with Flow("sample-flow", daily_schedule) as flow:
        timestamp = str(datetime.datetime.now())
        write_timestamp_into_database(timestamp)
    a
    • 2
    • 2
  • s

    Sultan Orazbayev

    01/17/2022, 12:18 AM
    Hello, if anyone is using prefect on a SLURM cluster, I am interested in connecting to learn about the experience.
    k
    a
    a
    • 4
    • 8
  • n

    Noam Gal

    01/17/2022, 8:12 AM
    Hi all, I'm a newbie to prefect. I have created a flow that uses to
    prefect.Parameter
    Tasks. The type of the parameters are just native
    str
    and
    int
    The flow's logic uses some other tasks that are using the parameter tasks. Those tasks are using some other helper functions that help me reuse code and make the code more readable. f I want that my helper function to use one of the parameter (I just need the value, not the task itself) I need to set it as a prefect task by itself and when calling it from other task it should be called with
    .run
    since inside the task it isn't in the context of a flow. For example:
    import prefect
    from prefect import Parameter, Flow, task
    with Flow("my flow") as my_flow:
    id = Parameter("id", required=True)  # int value
    description = Parameter("description", required=True). # str value
    result1 = my_task1(id, description)
    result2 = my_task2(id, description)
    my_reduce_task(result1, result2)
    @task
    def my_task1(id, description):
    val1 = calc_logic_func1()
    return shared_task.run(id, val1)
    @task
    def my_task2(id, description):
    val2 = calc_logic_func2()
    return shared_task.run(id, val2)
    @task
    def shared_task(id: int, value:int) -> int:
    return ...
    def calc_logic_func1() -> int:
    return ...
    In the example above I want to use a helper function
    shared_task
    with the integer
    id
    value but since
    id
    is a
    prefect Parameter Task
    , therefore shared_task itself must be a task and calling it from other task (e.g.
    my_task1
    it should be called with
    shared_task.run
    . Well, this is how I understand this so far. Is there any other way to use it? (not setting
    shared_task
    as a task OR not calling it with
    .run
    since
    my_task1
    is already called from `my_flow`context) If this is the right way to use it - are there any other effects on the flow run (I guess
    my_task1
    will execute
    shared_task
    itself in the same agent) Thanks!
    a
    • 2
    • 3
  • f

    Florian Kühnlenz

    01/17/2022, 9:37 AM
    Hi, we just had a two flows becoming stuck without any apparent reason. All tasks had been finished but the flow would remain as running, therefore blocking others. Any idea how to debug what was going on? Manually setting the state resolved the problem.
    a
    • 2
    • 15
  • t

    Tom Klein

    01/17/2022, 11:49 AM
    Hey! šŸ™‹ā€ā™‚ļø We’re trying to use this (excellent) example: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py And we’re missing some permissions on our end for K8s operations - however, I noticed that when i ran this flow --- even though the first step (delete k8s job) failed, it proceeded to perform the next steps (e.g. create job, which it does have permissions for) am i missing something about how this should work? shouldn’t a failure in a task lead to halting the entire flow (by default, without explicitly playing with triggers)?
    a
    • 2
    • 4
  • m

    Marwan Sarieddine

    01/17/2022, 1:53 PM
    Hi folks, question about the lazarus process. Why would lazarus try to reschedule a flow run if it reaches a successful state ?
    a
    • 2
    • 10
  • b

    Bruno Murino

    01/17/2022, 4:29 PM
    Hi everyone — I’m trying to pass tags to the ECS Run task, but it doesn’t look like the tags are being propagated. Is this the right way to pass tags to the ECS tasks that the Prefect Agent will create?
    a
    • 2
    • 6
  • m

    Miguel Angel

    01/17/2022, 6:00 PM
    Hello everyone! Does anyone have worked with dask `futures`within prefect flows context? I've basically want to perfom some future computations in order to parallelized parquet reading and dataframe concatenation. The following snippet shows a MWE using dask futures and client.
    import dask.dataframe as dd
    from dask.distributed import Client
    from s3fs import S3FileSystem
    
    s3 = S3FileSystem()
    client = Client()
    folder_list = [
        "file1",
        "file2",
        "file3",
        "file4",
        "file5",
        "file6",
        "file7",
        "file8",
    ]
    file_list = list(
        map(lambda folder: f"<s3://my-bucket/parquet/{folder}/*.parquet>", folder_list,)
    )
    dataframe_list = client.map(dd.read_parquet, file_list, gather_statistics=False)
    
    dataframe = client.submit(dd.concat, dataframe_list)
    
    mean_value = client.submit(lambda x: ["some_data_column"].mean(), dataframe)
    
    mean_compute = client.submit(lambda x: x.compute(), mean_value)
    
    print(mean_compute.result())
    a
    k
    • 3
    • 9
  • a

    Andreas Eisenbarth

    01/17/2022, 7:27 PM
    Hello! I have encountered a very weird behavior and have no more ideas what could cause it. We do batch processing and use
    create_flow_run
    with
    map
    to create multiple flows, each with a different dict of parameters. On one server, all created flows receive the same
    flow_run_id
    , which means they overwrite their logs and we only see one in Prefect UI. (Locally I cannot reproduce it and every child flow has a different flow run ID. This server is running in docker, and in that setup
    create_flow_run
    was working correctly previously.) Does anyone have ideas? (Example code attached)
    k
    a
    • 3
    • 6
  • m

    Matt Alhonte

    01/17/2022, 7:38 PM
    This rules so hard. Wanna find a way to include it in my Prefect pipelines. https://github.com/stepchowfun/typical
    a
    • 2
    • 2
  • s

    Samay Kapadia

    01/17/2022, 10:46 PM
    I’m running into the weirdest error. Trying to make prefect cloud work with my kubernetes cluster. The error says
    No module named '/Users/sa/'
    . Why does it want my home directory to be a module? More details inside
    k
    • 2
    • 5
  • y

    Yusuf Khan

    01/17/2022, 11:03 PM
    I have a task failing with following error: Unexpected error: ValueError('ctypes objects containing pointers cannot be pickled') Before making this a prefect flow script it was executing fine. Its a small script running on a rasberry pi using the PiCamera module. There are other non-dependent tasks that are working alright. Any thoughts from anyone? Googling this didn't yield much
    āœ… 1
    k
    • 2
    • 2
  • s

    Son Nguyen

    01/18/2022, 9:30 AM
    Hi, I’m launching a new Prefect server with
    prefect server start
    and everything started correctly. But in the UI, when I click into a flow, it’s not redirected to flow detail page. It looks like the following docker images version introduced a new bug
    prefecthq/apollo              core-0.15.12   d8519b0544d0   5 days ago     324MB
    prefecthq/server              core-0.15.12   d828f40dbf19   5 days ago     403MB
    prefecthq/ui                  core-0.15.12   5edd4fee96ed   3 weeks ago    225MB
    because it worked fine with this version
    prefecthq/ui                  core-0.15.11   6fac027b4605   4 weeks ago     225MB
    prefecthq/server              core-0.15.11   f6280189d6a5   6 weeks ago     402MB
    prefecthq/apollo              core-0.15.11   d1b07b3c9a57   6 weeks ago     324MB
    a
    • 2
    • 8
  • a

    Akharin Sukcharoen

    01/18/2022, 9:50 AM
    How can I fix the twice run scheduling? It make my server overload. Thank you in advance.
    a
    • 2
    • 8
  • e

    Emma Rizzi

    01/18/2022, 9:56 AM
    Hi! Do you have any idea of what is causing this error :
    Failed to load and execute Flow's environment: FlowStorageError("An error occurred while unpickling the flow:\n Ā TypeError('an integer is required (got type bytes)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n Ā - prefect: (flow built with '0.15.10', currently running with '0.15.12')\n Ā - python: (flow built with '3.7.11', currently running with '3.9.9')")
    ? I search this slack for insights, I use prefect Cloud with a docker agent on a VM, I upgraded prefect to 0.15.12 on both agent and development machine
    a
    • 2
    • 6
  • m

    Malthe Karbo

    01/18/2022, 11:03 AM
    Hi everyone, I am having some trouble using the DaskExecutor with Fargate mode. I get the following error (after successfully running all flows):
    RuntimeError: IOLoop is closed
    Flow example in thread and pinned versions as well
    a
    k
    • 3
    • 59
  • a

    Aaron Pickering

    01/18/2022, 11:45 AM
    Hi everyone, I'm trying to use "SnowflakeQueriesFromFile" in a task and I'm getting a strange error. Not sure how to start debugging this, any ideas? Could it be something to do with the file path?
    "Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n NameError("name \'err\' is not defined")')"
    The task itself is straightforward, it looks like this:
    snowsql_obj = SnowflakeQueriesFromFile(account=SNOWFLAKE_ACCOUNT, user=SNOWFLAKE_USER, password=SNOWFLAKE_PWD, file_path="../../sql/amplitude_raw.sql")
    snowsql_obj.run()
    a
    • 2
    • 6
  • s

    Samay Kapadia

    01/18/2022, 3:20 PM
    Hey all. Where can I find a prefect configuration reference page? Googling ā€œprefect configuration referenceā€ takes me to the core concepts page, which doesn’t actually tell me what configurations I can actually change
    šŸ‘€ 1
    k
    • 2
    • 7
Powered by Linen
Title
s

Samay Kapadia

01/18/2022, 3:20 PM
Hey all. Where can I find a prefect configuration reference page? Googling ā€œprefect configuration referenceā€ takes me to the core concepts page, which doesn’t actually tell me what configurations I can actually change
šŸ‘€ 1
k

Kevin Kho

01/18/2022, 3:25 PM
The full list can be found in the config.toml . Anything specific you are looking for?
s

Samay Kapadia

01/18/2022, 3:26 PM
Just wanted to use cloud secrets from my local setup
Thanks Kevin!
I’m trying to use Azure storage for storing my flows, and the connection string needs to be stored as a secret.
Although when I run
PREFECT__USE_LOCAL_SECRETS=false poetry run python main.py
I’m still getting
ValueError: Local Secret "prod-prefect-flows" was not found.
šŸ¤”
Ah okay, I just changed it to
PREFECT__*CLOUD*__USE_LOCAL_SECRETS=false
and it worked
k

Kevin Kho

01/18/2022, 3:39 PM
Ah ok you got it yep
View count: 4