Viv Ian
04/20/2020, 6:20 PMManuel Mourato
04/20/2020, 9:09 PMMaxime Lavoie
04/20/2020, 9:43 PMDavid Ojeda
04/21/2020, 12:20 PMfrom prefect import Flow, Task
class Generate(Task):
def run(self):
return list(range(10))
class GetConfig(Task):
def run(self):
return {'offset': 1000}
class Process(Task):
def run(self, *, data):
result = data + 1000
<http://self.logger.info|self.logger.info>('Process of %d = %d', data, result)
return result
generate = Generate()
config = GetConfig()
process = Process()
with Flow('flow1') as flow1:
dataset = generate()
config_result = config()
clean = process.map(data=dataset)
clean.set_upstream(config_result)
print('Flow1 edges:', flow1.edges)
flow1.run()
with Flow('flow2') as flow2:
dataset = generate()
config_result = config()
clean = process.map(data=dataset, upstream_tasks=[config_result])
print('Flow2 edges:', flow2.edges)
flow2.run()
The traceback is:
[2020-04-21 12:16:45,325] INFO - prefect.TaskRunner | Task 'Process': Starting task run...
[2020-04-21 12:16:45,325] ERROR - prefect.TaskRunner | Task 'Process': unexpected error while running task: KeyError(0)
Traceback (most recent call last):
File "/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 256, in run
state = self.run_mapped_task(
File "/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 721, in run_mapped_task
upstream_state.result[i],
KeyError: 0
An Hoang
04/21/2020, 2:37 PMManuel Mourato
04/21/2020, 3:41 PM# CREATE STORAGE
storage=storage.build()
# ADD FLOWS TO STORAGE
storage.add_flow(test_flow)
storage.add_flow(test_flow2)
storage=storage.build()
test_flow.storage=storage
test_flow2.storage=storage
# REGISTER FLOWS IN UI/DB
test_flow.register(build=False)
test_flow2.register(build=False)
And they indeed appear in the UI (image bellow)
Then I start the agent in my cli (image bellow)
But when I try to run one of these flows, they stay permanently schedulled (image bellow)
What am I doing wrong?
TyJacques
04/21/2020, 4:21 PMJacques
04/21/2020, 4:22 PMBertrand GERVAIS
04/21/2020, 4:41 PMViv Ian
04/21/2020, 5:15 PMdocker system prune -a
for a fresh start 🙂
• created prefect service/deployments for k8s (using latest images)
• docker build my app image
• created service/deployments for my app for k8s
Any ideas? THANKS!David Ojeda
04/21/2020, 5:26 PMget_terminal_tasks()
to determine how to connect these graphs.
Something has changed on the way switch/merge
is managed and this created several terminal_tasks on one of my flows.
The solution for me was to set the reference task of each flow and use get_reference_task
instead of get_terminal_tasks
Anyways, needed to share the small win after a frustrating bug-hunting session (because the bug would present itself randomly due to set
not having an order)Christopher Harris
04/21/2020, 10:15 PMclass BasePipeline:
"""
A series of services that mutate a document object.
The flow consists of a single 'source',
followed by x number of 'processors', followed by a 'sink.'
The document object is passed along these services in a linear fashion.
A pipeline configuration object is used to define the type of services
and the order of the processors.
"""
def __init__(self, blueprints: "PipelineConfiguration"):
"""
Initializes and configures services from PipelineConfiguration
blueprints.
:param blueprints: a pipeline configuration object
"""
self.project = blueprints.project
self.source: Source = start_source(blueprints.source)
self.sinks: List[Sink] = [start_sink(sink_blueprint) for sink_blueprint in blueprints.sinks]
self.processors: List[Processor] = [start_processor(processor_blueprint) for processor_blueprint in blueprints.processors]
def __call__(self, metrics: bool = False, failfast: bool = False) -> None:
"""
Passes a document through the pipeline.
"""
for doc in self.source.pull():
for processor in self.processors:
processor(doc)
for sink in self.sinks:
sink.push(doc)
And here is my translation attempt:
@task
def init_source(project: str, source_config: Blueprint):
return start_source(project, source_config)
@task
def init_sink(project: str, sink_config: Blueprint):
return start_sink(project, sink_config)
@task
def init_processor(project: str, processor_config: Blueprint):
return start_processor(project, processor_config)
@task
def run_source(source: Source):
return source.pull()
@task
def run_sink(sink: Sink, data: Document):
return sink.push(data)
@task
def run_processor(processor: Processor, data: Document):
return processor(data)
def execute(pipeline_config: PipelineConfiguration):
project = pipeline_config.project
with Flow("test-flow") as flow:
# Service Initialization
source = init_sink(project, pipeline_config.source)
processors = [init_processor(project, processor_config) for processor_config in pipeline_config.processors]
sinks = [init_sink(project, sink_config) for sink_config in pipeline_config.sinks]
# Perform ETL
datastream = run_source(source)
for doc in datastream:
for processor in processors:
doc = run_processor(processor, doc)
for sink in sinks:
run_sink(sink, doc)
flow.run()
The issue i'm running into is it is saying datastream is not iterable. While this may be a syntax issue on my end, the output of source.pull() is a generator. Are generators supported in prefect? If not - does anyone have an optimal workaround?Matias Godoy
04/22/2020, 9:16 AMKlemen Strojan
04/22/2020, 10:10 AMAdisun Wheelock
04/22/2020, 4:33 PMLuke Orland
04/22/2020, 5:25 PMApril 22nd 2020 at 12:02:34pm EDT | CloudHandler
CRITICAL
Failed to write log with error: Object of type ndarray is not JSON serializable
Jacques
04/22/2020, 7:59 PMshared_context = prefect.utilities.context.Context()
and then passing that to run with my_flow.run(context=shared_context)
but this doesn't seem to solve my problem. Would appreciate any pointer in the right direction!Crawford Collins
04/22/2020, 8:55 PMMetaModel
class in the flow it will be successful. If i try to load it in the flow will crash at fit_model
.
# meta = init_meta_model(problem, tinydb, use_default_models)
# ^^^ this does not work
meta = MetaModel(problem, tinydb, use_default_models=True)
model_path = get_models(meta)
fit_models = fit_model.map(
model_path=model_path,
train_data=unmapped(train_data),
target=unmapped(train_target),
problem=unmapped(problem),
)
###
@task
def init_meta_model(problem, db, use_default_models=True):
meta = MetaModel(problem, db, use_default_models=True)
return meta
@task
def fit_model(model_path, train_data, target, problem):
model = joblib.load(model_path)
model.fit(X=train_data, y=target)
joblib.dump(model, model_path)
MetaModel.models = ["PATH_TO_MODEL"]
Jacques
04/22/2020, 9:05 PMBen Fogelson
04/22/2020, 9:58 PMprefect
is use control flow to have an optional task inline in a sequence of tasks. Something like:
with Flow('flow') as flow:
do_optional_step = Parameter('do_optional_step')
x = Parameter('x')
y = x + 1
y = ifelse(do_optional_step, 2*y, y)
This doesn’t work as intended, but is what I’d like to be able to do. If I were doing this as a pure python function, it would be
def run_flow_steps(x, do_optional_step):
y = x + 1
if do_optional_step:
y = 2*y
return y
Ben McNeill
04/23/2020, 2:32 PMAdisun Wheelock
04/23/2020, 3:22 PMDaskExecutor
, it will run first_condition
and second_condition
asynchronously correct? And if I use LocalExecutor
, it will run first_condition
and then second condition
synchronously?Vitor Avancini
04/23/2020, 4:19 PMAlexander Hirner
04/23/2020, 6:24 PMstore_safe_value
? At first sight, it seems to be redundant to `Result`'s formatted location.David Ojeda
04/23/2020, 7:30 PMui:8080
is obvious, but I could not make it work without adding graphql:4201
too, and I am not sure if that’s safe…Matthew Perry
04/23/2020, 8:40 PMserver1
. When I access the page at <http://server1:8080>
it starts making network calls to the graphql backend at <http://localhost:4200>
. Surely this is configurable but I can't seem to find out where. Here's what I've tried so far:
• editing service.ui.graphql_url
in my ~/.prefect/config.toml on server1
• creating a new env and setting environments.server1.services_host
to server1
• exporting PREFECT_SERVER__GRAPHQL_URL="<http://server1:4200>"
I must be missing something obvious here. Any ideas?Brad
04/23/2020, 11:04 PMBrad
04/23/2020, 11:05 PMBrad
04/23/2020, 11:12 PMAdisun Wheelock
04/24/2020, 5:58 PMdocker-compose.yml
(in prefect/cli) that is ran upon prefect server start
brings up a postgres instance. If I already had a postgres instance running, is it possible through the CLI to omit bringing up the postgres instance that is in the docker-compose.yml
? Similar to docker-compose up
.Adisun Wheelock
04/24/2020, 5:58 PMdocker-compose.yml
(in prefect/cli) that is ran upon prefect server start
brings up a postgres instance. If I already had a postgres instance running, is it possible through the CLI to omit bringing up the postgres instance that is in the docker-compose.yml
? Similar to docker-compose up
.postgres
service in that docker-compose.yml
but wondering if there was a better way.docker-compose.yml
josh
04/24/2020, 6:03 PM--postgres-port
Adisun Wheelock
04/24/2020, 6:04 PMjosh
04/24/2020, 6:04 PMAdisun Wheelock
04/24/2020, 6:05 PM