Matthew Blau
04/30/2021, 7:22 PMEnda Peng
04/30/2021, 7:45 PM@task
process_file(x):
with Flow(xxx):
[process_file(x) for x in file_names]
It works fine with LocalRun
◦ pro: Easy to set up, dask has the friendly api to control resource
◦ con: I have to set up the same running env for every worker added in this dask cluster.
• Option 2: Multiple flows + multiple agents: 1-on-1 mapping between flow and file
e.g I could create 10 docker agents running on 10 hosts. Then in a script I create and run 100 flows, each flow processes one file. Let prefect distribute the flow for me
◦ pro: computation module is shipped with docker image, it is set up free.
◦ con: Not sure whether prefect is supposed to do the work load distribution duty. Even if yes, it is hard to control the resource consumption
• Option 3: Single flow + K8s:
Build image for my computation module and register with K8S first. Within one flow, it create k8s tasks which request for creating 100 pods to process the file.
◦ pro:
▪︎ k8s could deal with the workload distribution. Adding nodes could be easy.
▪︎ Any agent would be file as long as it could talk to k8s api
◦ con: complexity of setting up k8s?
Appreciate for any thoughts and input here!Carter Kwon
04/30/2021, 8:08 PM20,000 * 0.0025 = $50
or 20,000 * 0.005 = $100
depending on the plan you're on?Belal Aboabdo
04/30/2021, 9:59 PMprefect build -p my_flow.py
which throws this usage error
Usage: prefect [OPTIONS] COMMAND [ARGS]...
Try 'prefect -h' for help.
Error: No such command 'build'.
Exited with code exit status 2
Sean Perry
04/30/2021, 10:33 PMJeff Payne
05/01/2021, 5:40 AMAdam Roderick
05/01/2021, 2:53 PMTrevor Kramer
05/01/2021, 6:47 PMfrom prefect import Flow, task
@task
def add_ten(x, y):
return x + y
@task()
def log_result(x):
print(x)
with Flow('simple map') as flow:
mapped_result = add_ten.map([1, 2, 3], [10, 11, 12])
log_result(mapped_result)
if __name__ == '__main__':
from prefect.run_configs import LocalRun
flow.run_config = LocalRun()
flow.run()
I was expecting this code to return 9 results instead of the 3 actually returned. Is there a way to have map do the pairwise enumeration? I was assuming because neither argument was marked as unmapped then they would both be looped over.Robert Bastian
05/01/2021, 11:23 PMwith Flow("testing") as flow:
a = poll.map(poll_interval=[5,10])
b = poll.map(poll_interval=[4,9])
flow.run(executor=LocalDaskExecutor())
[2021-05-01 18:20:08-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'testing'
[2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Starting task run...
[2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Finished task run for task with final state: 'Mapped'
[2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Starting task run...
[2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Finished task run for task with final state: 'Mapped'
[2021-05-01 18:20:09-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Starting task run...
[2021-05-01 18:20:09-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Starting task run...
[2021-05-01 18:20:14-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Finished task run for task with final state: 'Success'
[2021-05-01 18:20:19-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Finished task run for task with final state: 'Success'
[2021-05-01 18:20:19-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Starting task run...
[2021-05-01 18:20:19-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Starting task run...
[2021-05-01 18:20:23-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Finished task run for task with final state: 'Success'
[2021-05-01 18:20:28-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Finished task run for task with final state: 'Success'
[2021-05-01 18:20:28-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Thx!Jason Prado
05/02/2021, 2:17 AMpython myflow.py
and read Secrets within my flow. I’ve added the Secrets in the Cloud UI and authenticated with the prefect CLI. Is the right model that “running a flow locally without an agent never hits the server” or am I mistaken?Newskooler
05/02/2021, 11:55 AMEnda Peng
05/02/2021, 4:15 PMheart beat check
.Rehan Razzaque Rajput
05/03/2021, 7:33 AMYohann
05/03/2021, 7:40 AMPeter Roelants
05/03/2021, 8:24 AMclass GetKafkaConfig(prefect.Task):
"""
Get Kafka configuration
"""
def run(
self,
timeout: timedelta
) -> KafkaConfig:
return KafkaConfig(
broker_address=os.environ['KAFKA_BROKER_ADDRESS'],
topic_name=os.environ['KAFKA_TOPIC_NAME'],
timeout=timeout
)
I would like to visualize these parameters in my prefect UI (and ideally make them editable), similar to how Parameter tasks can be visualized (e.g. as is demonstrated in https://docs.prefect.io/orchestration/tutorial/hello-flow-run-parameter-config.png▾
merlin
05/03/2021, 9:22 AMDomantas
05/03/2021, 10:15 AMg.suijker
05/03/2021, 12:04 PMRanu Goldan
05/03/2021, 12:52 PMGage Toschlog
05/03/2021, 3:40 PMBelal Aboabdo
05/03/2021, 5:59 PMdocker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.41/images/create?tag=0.14.0-python3.9&fromImage=prefecthq%2Fprefect>: Not Found ("manifest for prefecthq/prefect:0.14.0-python3.9 not found: manifest unknown: manifest unknown")
Nathan Atkins
05/03/2021, 6:04 PM@task(task_run_name=name_fn)
where name_fn() dynamically generates the task name form the kwargs that are passed to it.
This all works great when I'm running with the UI. When I run directly by calling flow.run() the set_task_run_name()
in engine/task_runner.py is stubbed out and doesn't call my name_fn().
I can see that in TaskRunner.run() getting the call to set_task_run_name()
isn't totally straight forward.
What would it take to get set_task_run_name()
to work when running directly without the UI?Sean Perry
05/03/2021, 6:09 PMJoseph Loss
05/03/2021, 7:14 PMJoseph Loss
05/03/2021, 7:17 PMD:\venv\poetry\.venv\lib\site-packages\prefect\utilities\logging.py:123:
UserWarning: Failed to write logs with error:
ClientError('400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>\n\n
The following error messages were provided by the GraphQL server:\n\n
INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at\n
"input.logs[0].flow_run_id"; Expected non-nullable type UUID! not to be null.\n
INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at\n "input.logs[2].flow_run_id"; Expected non-nullable type UUID! not to be null.\n
INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at\n "input.logs[4].flow_run_id"; Expected non-nullable type UUID! not to be null.\n
Braun Reyes
05/03/2021, 7:22 PMEnda Peng
05/03/2021, 8:15 PMprefect register
So I try to replicate the behavior of building storage by writing my own dockerfile. A question here is how does the file healthcheck.py
and flow1.flow
come into scope? Below is the output after I call register command with docker storage
Step 7/12 : COPY flow1.flow /opt/prefect/flows/flow1.prefect
---> ea7572a6c0e7
Step 8/12 : COPY healthcheck.py /opt/prefect/healthcheck.py
Trevor Kramer
05/04/2021, 1:27 AMjaehoon
05/04/2021, 2:30 AMUnexpected error: ReferenceError('weakly-referenced object no longer exists')
on prefect Run logs
api res count is almost 2000, error no occurs in local env
someone please help me..!Stéphan Taljaard
05/04/2021, 8:18 AMprefect server create-tenant
when using prefect server?Stéphan Taljaard
05/04/2021, 8:18 AMprefect server create-tenant
when using prefect server?Mariia Kerimova
05/04/2021, 1:13 PMStéphan Taljaard
05/04/2021, 1:22 PMMariia Kerimova
05/04/2021, 1:26 PM