David Ojeda
04/03/2020, 4:28 PMAlex Cano
04/03/2020, 4:36 PMFlowRunner
to actually run your flows (either that or calling flow.run()
.Dylan
Dylan
David Ojeda
04/03/2020, 5:00 PMDylan
Dylan
David Ojeda
04/03/2020, 6:16 PMetl.py
as follows:
import time
import prefect
from prefect import Flow, task
from prefect.environments import RemoteEnvironment
@task
def extract():
logger = prefect.context['logger']
<http://logger.info|logger.info>('extract')
return [1,2,3]
@task
def transform(data):
logger = prefect.context['logger']
<http://logger.info|logger.info>('transform on %s', data)
time.sleep(10)
return data * 10
@task
def load(data):
logger = prefect.context['logger']
<http://logger.info|logger.info>('load version on %s', data)
def main():
environment = RemoteEnvironment(
executor='prefect.engine.executors.DaskExecutor',
executor_kwargs={
'address': '<tcp://localhost:8786>',
}
)
with Flow('etl-example', environment=environment) as flow:
e = extract()
t = transform.map(e)
l = load.map(t)
flow.register()
if __name__ == '__main__':
main()
So I need to register the flow by executing this script and then I can manually trigger it on the UI, and when I run an agent with prefect agent start
, I saw how the dask scheduler and dask worker (that I had started before) received and processed the units of work.
All of this was done locally, so now that I have understood the structure a bit better, I will try to replicate this on my cluster. I think I will start simple, which is by not using the KubernetesAgent (which will require a rbac installation), but just create a pod that will have the agent.Dylan
localhost
sources by default so you will have to dig into the configuration a bit to make sure they send state information to the Prefect SchedulerDylan
David Ojeda
04/03/2020, 6:42 PMDylan
Dylan
Dylan
Dylan
Dylan
Dylan
prefect server start
is configured to run locally at the moment, you’ll need to update the configuration with the address of the Prefect Scheduler in your k8s cluster so that the flow run knows where to send the state updates.Dylan
David Ojeda
04/03/2020, 9:38 PMDylan